pikpakfs/main.py
2024-10-29 01:20:57 +08:00

299 lines
10 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import asyncio, nest_asyncio
import cmd2
from functools import wraps
import logging
import threading
import colorlog
from pikpakFs import VirtFsNode, DirNode, FileNode, PKVirtFs, IsDir, IsFile
import os
def RunSyncInLoop(loop):
def decorator(func):
@wraps(func)
def decorated(*args, **kwargs):
currentLoop = None
try:
currentLoop = asyncio.get_running_loop()
except RuntimeError:
pass
if currentLoop is loop:
return loop.run_until_complete(func(*args, **kwargs))
else:
return asyncio.run_coroutine_threadsafe(func(*args, **kwargs), loop).result()
return decorated
return decorator
def ProvideDecoratorSelfArgs(decorator, argsProvider):
def wrapper(func):
@wraps(func)
def decorated(*args, **kwargs):
namespace = args[0]
return decorator(argsProvider(namespace))(func)(*args, **kwargs)
return decorated
return wrapper
class PikpakConsole(cmd2.Cmd):
def LoopProvider(self):
return self.loop
RunSync = ProvideDecoratorSelfArgs(RunSyncInLoop, LoopProvider)
def _setup_logging(self):
formatter = colorlog.ColoredFormatter(
"%(log_color)s%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt='%Y-%m-%d %H:%M:%S',
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red,bg_white',
}
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(logging.INFO)
def IOWorker(self, loop):
self.terminal_lock.acquire() # 我看cmdloop是这么做的所以我也这么做
asyncio.set_event_loop(loop)
try:
loop.run_forever()
finally:
self.terminal_lock.release()
async def ainput(self, prompt):
async def ReadInput(prompt):
return self._read_command_line(prompt)
future = asyncio.run_coroutine_threadsafe(ReadInput(prompt), self.ioLoop)
return await asyncio.wrap_future(future)
async def aoutput(self, output):
async def PrintOuput(output):
print(output)
future = asyncio.run_coroutine_threadsafe(PrintOuput(output), self.ioLoop)
await asyncio.wrap_future(future)
def ParserProvider(self):
return cmd2.Cmd2ArgumentParser()
def AddPathParser(parserProvider):
def PathParserProvider(self):
parser = parserProvider(self)
parser.add_argument("path", help="path", default="", nargs="?", type=RunSyncInLoop(self.loop)(self.client.PathToNode))
return parser
return PathParserProvider
def AddFatherAndSonParser(parserProvider):
def PathParserProvider(self):
parser = parserProvider(self)
parser.add_argument("path", help="path", default="", nargs="?", type=RunSyncInLoop(self.loop)(self.client.PathToFatherNodeAndNodeName))
return parser
return PathParserProvider
def AddUrlParser(parserProvider):
def PathParserProvider(self):
parser = parserProvider(self)
parser.add_argument("url", help="url")
return parser
return PathParserProvider
def AddUsernamePasswordParser(parserProvider):
def PathParserProvider(self):
parser = parserProvider(self)
parser.add_argument("username", help="username", nargs="?")
parser.add_argument("password", help="password", nargs="?")
return parser
return PathParserProvider
async def PathCompleter(self, text, line, begidx, endidx, filterFiles):
father, sonName = await self.client.PathToFatherNodeAndNodeName(text)
if not IsDir(father):
return []
matches = []
matchesNode = []
for childId in father.childrenId:
child = self.client.GetNodeById(childId)
if filterFiles and IsFile(child):
continue
if child.name.startswith(sonName):
self.display_matches.append(child.name)
if sonName == "":
matches.append(text + child.name)
elif text.endswith(sonName):
matches.append(text[:text.rfind(sonName)] + child.name)
matchesNode.append(child)
if len(matchesNode) == 1 and IsDir(matchesNode[0]):
matches[0] += "/"
self.allow_appended_space = False
self.allow_closing_quote = False
return matches
def __init__(self):
super().__init__()
self._setup_logging()
self.client = PKVirtFs("token.json", proxy="http://127.0.0.1:7897")
async def Run(self):
# 1. 设置忽略SIGINT
import signal
def signal_handler(sig, frame):
pass
signal.signal(signal.SIGINT, signal_handler)
# 2. 创建一个新的事件循环
self.loop = asyncio.get_running_loop()
self.ioLoop = asyncio.new_event_loop()
thread = threading.Thread(target=self.IOWorker, args=(self.ioLoop,))
thread.start()
# 3. 启动cmd2
saved_readline_settings = None
try:
# Get sigint protection while we set up readline for cmd2
with self.sigint_protection:
saved_readline_settings = self._set_up_cmd2_readline()
stop = False
while not stop:
# Get sigint protection while we read the command line
line = await self.ainput(self.prompt)
# Run the command along with all associated pre and post hooks
stop = self.onecmd_plus_hooks(line)
finally:
# Get sigint protection while we restore readline settings
with self.sigint_protection:
if saved_readline_settings is not None:
self._restore_readline(saved_readline_settings)
# https://stackoverflow.com/questions/51642267/asyncio-how-do-you-use-run-forever
self.ioLoop.call_soon_threadsafe(self.ioLoop.stop)
thread.join()
# commands #
def do_debug(self, args):
"""
Enable debug mode
"""
logging.getLogger().setLevel(logging.DEBUG)
logging.debug("Debug mode enabled")
def do_debugoff(self, args):
"""
Disable debug mode
"""
logging.getLogger().setLevel(logging.INFO)
logging.info("Debug mode disabled")
@RunSync
@ProvideDecoratorSelfArgs(cmd2.with_argparser, AddUsernamePasswordParser(ParserProvider))
async def do_login(self, args):
"""
Login to pikpak
"""
await self.client.Login(args.username, args.password)
await self.aoutput("Logged in successfully")
@RunSync
async def complete_ls(self, text, line, begidx, endidx):
return await self.PathCompleter(text, line, begidx, endidx, filterFiles = False)
@RunSync
@ProvideDecoratorSelfArgs(cmd2.with_argparser, AddPathParser(ParserProvider))
async def do_ls(self, args):
"""
List files in a directory
"""
node = args.path
if IsDir(node):
for childId in node.childrenId:
child = self.client.GetNodeById(childId)
await self.aoutput(child.name)
elif IsFile(node):
await self.client.Refresh(node)
await self.aoutput(f"{node.name}: {node.url}")
else:
await self.aoutput("Invalid path")
@RunSync
async def complete_cd(self, text, line, begidx, endidx):
return await self.PathCompleter(text, line, begidx, endidx, filterFiles = True)
@RunSync
@ProvideDecoratorSelfArgs(cmd2.with_argparser, AddPathParser(ParserProvider))
async def do_cd(self, args):
"""
Change directory
"""
node = args.path
if not IsDir(node):
await self.aoutput("Invalid directory")
return
self.client.currentLocation = node
@RunSync
async def do_cwd(self, args):
"""
Print current working directory
"""
await self.aoutput(self.client.NodeToPath(self.client.currentLocation))
def do_clear(self, args):
"""
Clear the terminal screen
"""
os.system('cls' if os.name == 'nt' else 'clear')
@RunSync
async def complete_rm(self, text, line, begidx, endidx):
return await self.PathCompleter(text, line, begidx, endidx, filterFiles = False)
@RunSync
@ProvideDecoratorSelfArgs(cmd2.with_argparser, AddPathParser(ParserProvider))
async def do_rm(self, args):
"""
Remove a file or directory
"""
await self.client.Delete(args.path)
@RunSync
async def complete_mkdir(self, text, line, begidx, endidx):
return await self.PathCompleter(text, line, begidx, endidx, filterFiles = True)
@RunSync
@ProvideDecoratorSelfArgs(cmd2.with_argparser, AddFatherAndSonParser(ParserProvider))
async def do_mkdir(self, args):
"""
Create a directory
"""
father, sonName = args.path
if not IsDir(father) or sonName == "" or sonName == None:
await self.aoutput("Invalid path")
return
child = self.client.FindChildInDirByName(father, sonName)
if child is not None:
await self.aoutput("Path already exists")
return
await self.client.MakeDir(father, sonName)
@RunSync
@ProvideDecoratorSelfArgs(cmd2.with_argparser, AddPathParser(AddUrlParser(ParserProvider)))
async def do_download(self, args):
"""
Download a file
"""
node = args.path
if not IsDir(node):
await self.aoutput("Invalid directory")
return
await self.client.Download(args.url, node)
if __name__ == "__main__":
nest_asyncio.apply()
prog = PikpakConsole()
asyncio.run(prog.Run())