pikpakfs/REPL.py
2025-08-02 20:01:13 +08:00

92 lines
2.9 KiB
Python

import asyncio, nest_asyncio
import cmd2
from functools import wraps
import logging
import threading
import types
MainLoop : asyncio.AbstractEventLoop = None
class RunSync:
_current_task : asyncio.Task = None
def StopCurrentRunningCoroutine():
if RunSync._current_task is not None:
RunSync._current_task.cancel()
def __init__(self, func):
wraps(func)(self)
def __call__(self, *args, **kwargs):
currentLoop = None
try:
currentLoop = asyncio.get_running_loop()
except RuntimeError:
logging.error("Not in an event loop")
pass
func = self.__wrapped__
if currentLoop is MainLoop:
task = asyncio.Task(func(*args, **kwargs))
RunSync._current_task = task
result = MainLoop.run_until_complete(task)
RunSync._current_task = None
return result
else:
return asyncio.run_coroutine_threadsafe(func(*args, **kwargs), MainLoop).result()
def __get__(self, instance, cls):
if instance is None:
return self
else:
return types.MethodType(self, instance)
class REPLApp(cmd2.Cmd):
def _io_worker(self, loop):
asyncio.set_event_loop(loop)
loop.run_forever()
async def input(self, prompt):
async def _input(prompt):
return self._read_command_line(prompt)
future = asyncio.run_coroutine_threadsafe(_input(prompt), self.ioLoop)
return await asyncio.wrap_future(future)
async def print(self, *args, **kwargs):
async def _print(*args, **kwargs):
print(*args, **kwargs)
future = asyncio.run_coroutine_threadsafe(_print(*args, **kwargs), self.ioLoop)
await asyncio.wrap_future(future)
def __init__(self):
nest_asyncio.apply()
global MainLoop
MainLoop = asyncio.get_running_loop()
super().__init__()
def preloop(self):
# 1. 设置忽略SIGINT
import signal
def signal_handler(sig, frame):
RunSync.StopCurrentRunningCoroutine()
signal.signal(signal.SIGINT, signal_handler)
# 2. 创建IO线程处理输入输出
self.ioLoop = asyncio.new_event_loop()
self.ioThread = threading.Thread(target=self._io_worker, args=(self.ioLoop,))
self.ioThread.start()
# 3. 设置console
self.saved_readline_settings = None
with self.sigint_protection:
self.saved_readline_settings = self._set_up_cmd2_readline()
def postloop(self):
# 1. 还原console设置
with self.sigint_protection:
if self.saved_readline_settings is not None:
self._restore_readline(self.saved_readline_settings)
# 2. 停止IO线程
# https://stackoverflow.com/questions/51642267/asyncio-how-do-you-use-run-forever
self.ioLoop.call_soon_threadsafe(self.ioLoop.stop)
self.ioThread.join()