92 lines
2.9 KiB
Python
92 lines
2.9 KiB
Python
import asyncio, nest_asyncio
|
|
import cmd2
|
|
from functools import wraps
|
|
import logging
|
|
import threading
|
|
import types
|
|
|
|
MainLoop : asyncio.AbstractEventLoop = None
|
|
|
|
class RunSync:
|
|
_current_task : asyncio.Task = None
|
|
|
|
def StopCurrentRunningCoroutine():
|
|
if RunSync._current_task is not None:
|
|
RunSync._current_task.cancel()
|
|
|
|
def __init__(self, func):
|
|
wraps(func)(self)
|
|
|
|
def __call__(self, *args, **kwargs):
|
|
currentLoop = None
|
|
try:
|
|
currentLoop = asyncio.get_running_loop()
|
|
except RuntimeError:
|
|
logging.error("Not in an event loop")
|
|
pass
|
|
func = self.__wrapped__
|
|
if currentLoop is MainLoop:
|
|
task = asyncio.Task(func(*args, **kwargs))
|
|
RunSync._current_task = task
|
|
result = MainLoop.run_until_complete(task)
|
|
RunSync._current_task = None
|
|
return result
|
|
else:
|
|
return asyncio.run_coroutine_threadsafe(func(*args, **kwargs), MainLoop).result()
|
|
|
|
def __get__(self, instance, cls):
|
|
if instance is None:
|
|
return self
|
|
else:
|
|
return types.MethodType(self, instance)
|
|
|
|
class REPLApp(cmd2.Cmd):
|
|
def _io_worker(self, loop):
|
|
asyncio.set_event_loop(loop)
|
|
loop.run_forever()
|
|
|
|
async def input(self, prompt):
|
|
async def _input(prompt):
|
|
return self._read_command_line(prompt)
|
|
future = asyncio.run_coroutine_threadsafe(_input(prompt), self.ioLoop)
|
|
return await asyncio.wrap_future(future)
|
|
|
|
async def print(self, *args, **kwargs):
|
|
async def _print(*args, **kwargs):
|
|
print(*args, **kwargs)
|
|
future = asyncio.run_coroutine_threadsafe(_print(*args, **kwargs), self.ioLoop)
|
|
await asyncio.wrap_future(future)
|
|
|
|
def __init__(self):
|
|
nest_asyncio.apply()
|
|
global MainLoop
|
|
MainLoop = asyncio.get_running_loop()
|
|
super().__init__()
|
|
|
|
def preloop(self):
|
|
# 1. 设置忽略SIGINT
|
|
import signal
|
|
def signal_handler(sig, frame):
|
|
RunSync.StopCurrentRunningCoroutine()
|
|
signal.signal(signal.SIGINT, signal_handler)
|
|
|
|
# 2. 创建IO线程处理输入输出
|
|
self.ioLoop = asyncio.new_event_loop()
|
|
self.ioThread = threading.Thread(target=self._io_worker, args=(self.ioLoop,))
|
|
self.ioThread.start()
|
|
|
|
# 3. 设置console
|
|
self.saved_readline_settings = None
|
|
with self.sigint_protection:
|
|
self.saved_readline_settings = self._set_up_cmd2_readline()
|
|
|
|
def postloop(self):
|
|
# 1. 还原console设置
|
|
with self.sigint_protection:
|
|
if self.saved_readline_settings is not None:
|
|
self._restore_readline(self.saved_readline_settings)
|
|
|
|
# 2. 停止IO线程
|
|
# https://stackoverflow.com/questions/51642267/asyncio-how-do-you-use-run-forever
|
|
self.ioLoop.call_soon_threadsafe(self.ioLoop.stop)
|
|
self.ioThread.join() |