import asyncio, nest_asyncio import cmd2 from functools import wraps import logging import threading import types MainLoop : asyncio.AbstractEventLoop = None class RunSync: _current_task : asyncio.Task = None def StopCurrentRunningCoroutine(): if RunSync._current_task is not None: RunSync._current_task.cancel() def __init__(self, func): wraps(func)(self) def __call__(self, *args, **kwargs): currentLoop = None try: currentLoop = asyncio.get_running_loop() except RuntimeError: logging.error("Not in an event loop") pass func = self.__wrapped__ if currentLoop is MainLoop: task = asyncio.Task(func(*args, **kwargs)) RunSync._current_task = task result = MainLoop.run_until_complete(task) RunSync._current_task = None return result else: return asyncio.run_coroutine_threadsafe(func(*args, **kwargs), MainLoop).result() def __get__(self, instance, cls): if instance is None: return self else: return types.MethodType(self, instance) class REPLApp(cmd2.Cmd): def _io_worker(self, loop): asyncio.set_event_loop(loop) loop.run_forever() async def input(self, prompt): async def _input(prompt): return self._read_command_line(prompt) future = asyncio.run_coroutine_threadsafe(_input(prompt), self.ioLoop) return await asyncio.wrap_future(future) async def print(self, *args, **kwargs): async def _print(*args, **kwargs): print(*args, **kwargs) future = asyncio.run_coroutine_threadsafe(_print(*args, **kwargs), self.ioLoop) await asyncio.wrap_future(future) def __init__(self): nest_asyncio.apply() global MainLoop MainLoop = asyncio.get_running_loop() super().__init__() def preloop(self): # 1. 设置忽略SIGINT import signal def signal_handler(sig, frame): RunSync.StopCurrentRunningCoroutine() signal.signal(signal.SIGINT, signal_handler) # 2. 创建IO线程处理输入输出 self.ioLoop = asyncio.new_event_loop() self.ioThread = threading.Thread(target=self._io_worker, args=(self.ioLoop,)) self.ioThread.start() # 3. 设置console self.saved_readline_settings = None with self.sigint_protection: self.saved_readline_settings = self._set_up_cmd2_readline() def postloop(self): # 1. 还原console设置 with self.sigint_protection: if self.saved_readline_settings is not None: self._restore_readline(self.saved_readline_settings) # 2. 停止IO线程 # https://stackoverflow.com/questions/51642267/asyncio-how-do-you-use-run-forever self.ioLoop.call_soon_threadsafe(self.ioLoop.stop) self.ioThread.join()