diff --git a/main.py b/main.py index d1d145f..bede3ee 100644 --- a/main.py +++ b/main.py @@ -1,21 +1,191 @@ +import asyncio, nest_asyncio import cmd2 -import sys -from pikpakFs import PKVirtFs, VirtFsNode, DirNode, FileNode +from functools import wraps +from aioconsole import ainput, aprint +import logging +import colorlog +from pikpakFs import VirtFsNode, DirNode, FileNode, PKVirtFs +import os +def RunSync(func): + @wraps(func) + def decorated(*args, **kwargs): + return asyncio.get_event_loop().run_until_complete(func(*args, **kwargs)) + return decorated + +def ProvideDecoratorSelfArgs(decorator, argsProvider): + def wrapper(func): + @wraps(func) + def decorated(*args, **kwargs): + namespace = args[0] + return decorator(argsProvider(namespace))(func)(*args, **kwargs) + return decorated + + return wrapper + +class PikpakConsole(cmd2.Cmd): + def _SetupLogging(self): + formatter = colorlog.ColoredFormatter( + "%(log_color)s%(asctime)s - %(levelname)s - %(name)s - %(message)s", + datefmt='%Y-%m-%d %H:%M:%S', + reset=True, + log_colors={ + 'DEBUG': 'cyan', + 'INFO': 'green', + 'WARNING': 'yellow', + 'ERROR': 'red', + 'CRITICAL': 'red,bg_white', + } + ) + + handler = logging.StreamHandler() + handler.setFormatter(formatter) + + logger = logging.getLogger() + logger.addHandler(handler) + + logger.setLevel(logging.INFO) -class PKApp(cmd2.Cmd): def __init__(self): super().__init__() - self.fs = PKVirtFs(loginCachePath = "token.json", proxy = "http://127.0.0.1:10808") + self._SetupLogging() + self.client = PKVirtFs("token.json", proxy="http://127.0.0.1:7897") + + async def Run(self): + saved_readline_settings = None + try: + # Get sigint protection while we set up readline for cmd2 + with self.sigint_protection: + saved_readline_settings = self._set_up_cmd2_readline() + stop = False + while not stop: + # Get sigint protection while we read the command line + line = await asyncio.to_thread(self._read_command_line, self.prompt) + # Run the command along with all associated pre and post hooks + stop = self.onecmd_plus_hooks(line) + finally: + # Get sigint protection while we restore readline settings + with self.sigint_protection: + if saved_readline_settings is not None: + self._restore_readline(saved_readline_settings) + + def do_debug(self, args): + """ + Enable debug mode + """ + logging.getLogger().setLevel(logging.DEBUG) + logging.debug("Debug mode enabled") + + def do_debugoff(self, args): + """ + Disable debug mode + """ + logging.getLogger().setLevel(logging.INFO) + logging.info("Debug mode disabled") + + login_parser = cmd2.Cmd2ArgumentParser() + login_parser.add_argument("username", help="username", nargs="?") + login_parser.add_argument("password", help="password", nargs="?") + + @RunSync + @cmd2.with_argparser(login_parser) async def do_login(self, args): - if len(args) < 2: - await self.fs.Login() - else: - await self.fs.Login(args[0], args[1]) + """ + Login to pikpak + """ + await self.client.Login(args.username, args.password) + await aprint("Logged in successfully") + + def PathParserProvider(self): + @RunSync + async def PathToNode(path): + path = await self.client.PathToNode(path) + if path is None: + raise ValueError("Invalid path") + return path + + path_parser = cmd2.Cmd2ArgumentParser() + path_parser.add_argument("path", help="path", default="", nargs="?", type=PathToNode) + return path_parser + + WithPathParser = ProvideDecoratorSelfArgs(cmd2.with_argparser, PathParserProvider) + + @RunSync + async def PathCompleter(self, text, line, begidx, endidx, filterFiles): + father, sonName = await self.client.PathToFatherNodeAndNodeName(text) + fatherDir = self.client.ToDir(father) + if fatherDir is None: + return [] + + matches = [] + matchesNode = [] + for childId in fatherDir.childrenId: + node = self.client.nodes[childId] + if filterFiles and isinstance(node, FileNode): + continue + if node.name.startswith(sonName): + self.display_matches.append(node.name) + if sonName == "": + matches.append(text + node.name) + elif text.endswith(sonName): + matches.append(text[:text.rfind(sonName)] + node.name) + matchesNode.append(node) + if len(matchesNode) == 1 and self.client.ToDir(matchesNode[0]) is not None: + matches[0] += "/" + self.allow_appended_space = False + self.allow_closing_quote = False + + return matches + + def complete_ls(self, text, line, begidx, endidx): + return self.PathCompleter(text, line, begidx, endidx, filterFiles = False) + + @RunSync + @WithPathParser + async def do_ls(self, args): + """ + List files in a directory + """ + if isinstance(args.path, DirNode): + for childId in args.path.childrenId: + node = self.client.nodes[childId] + await aprint(node.name) + elif isinstance(args.path, FileNode): + await aprint(args.path.name) + else: + await aprint("Invalid path") + + def complete_cd(self, text, line, begidx, endidx): + return self.PathCompleter(text, line, begidx, endidx, filterFiles = True) + + @RunSync + @WithPathParser + async def do_cd(self, args): + """ + Change directory + """ + if self.client.ToDir(args.path) is None: + await aprint("Invalid directory") + return + self.client.currentLocation = args.path + + @RunSync + async def do_cwd(self, args): + """ + Print current working directory + """ + await aprint(self.client.NodeToPath(self.client.currentLocation)) + + def do_clear(self, args): + """ + Clear the terminal screen + """ + os.system('cls' if os.name == 'nt' else 'clear') -if __name__ == '__main__': - app = PKApp() - sys.exit(app.cmdloop()) \ No newline at end of file +if __name__ == "__main__": + nest_asyncio.apply() + prog = PikpakConsole() + asyncio.run(prog.Run()) diff --git a/pikpakFs.py b/pikpakFs.py index 87a9871..677f803 100644 --- a/pikpakFs.py +++ b/pikpakFs.py @@ -11,10 +11,9 @@ import logging class PathWalker(): def __init__(self, pathStr : str, sep : str = "/"): self.__pathSpots : list[str] = [] - pathStr = pathStr.strip() if not pathStr.startswith(sep): self.__pathSpots.append(".") - pathSpots = [spot.strip() for spot in pathStr.split(sep) if spot.strip() != ""] + pathSpots = pathStr.split(sep) self.__pathSpots.extend(pathSpots) def IsAbsolute(self) -> bool: @@ -91,7 +90,7 @@ class PKVirtFs: httpx_client_args=httpx_client_args) def __TryLoginFromCache(self): - if self.loginCachePath == None: + if self.loginCachePath is None: return if not os.path.exists(self.loginCachePath): return @@ -102,7 +101,7 @@ class PKVirtFs: logging.info("successfully load login info from cache") def __DumpLoginInfo(self): - if self.loginCachePath == None: + if self.loginCachePath is None: return with open(self.loginCachePath, 'w', encoding='utf-8') as file: token = PikpakToken(self.client.username, self.client.password, self.client.access_token, self.client.refresh_token, self.client.user_id) @@ -131,7 +130,7 @@ class PKVirtFs: return None def GetFatherNode(self, node : VirtFsNode) -> VirtFsNode: - if node.fatherId == None: + if node.fatherId is None: return self.root return self.nodes[node.fatherId] @@ -166,11 +165,13 @@ class PKVirtFs: async def PathToNode(self, pathStr : str) -> VirtFsNode: father, sonName = await self.PathToFatherNodeAndNodeName(pathStr) + if sonName == "": + return father fatherDir = self.ToDir(father) - if fatherDir == None: + if fatherDir is None: return None return self.FindChildInDirByName(father, sonName) - + async def PathToFatherNodeAndNodeName(self, pathStr : str) -> tuple[VirtFsNode, str]: pathWalker = PathWalker(pathStr) father : VirtFsNode = None @@ -178,7 +179,7 @@ class PKVirtFs: current = self.root if pathWalker.IsAbsolute() else self.currentLocation for spot in pathWalker.Walk(): - if current == None: + if current is None: father = None break if spot == "..": @@ -186,10 +187,10 @@ class PKVirtFs: continue father = current currentDir = self.ToDir(current) - if currentDir == None: + if currentDir is None: current = None continue - if currentDir.lastUpdate == None: + if currentDir.lastUpdate is None: await self.RefreshDirectory(currentDir) if spot == ".": continue @@ -198,19 +199,21 @@ class PKVirtFs: if current != None: currentDir = self.ToDir(current) - if currentDir != None and currentDir.lastUpdate == None: + if currentDir != None and currentDir.lastUpdate is None: await self.RefreshDirectory(currentDir) father = self.GetFatherNode(current) sonName = current.name return father, sonName - async def NodeToPath(self, node : VirtFsNode) -> str: - spots : list[str] = [""] + def NodeToPath(self, node : VirtFsNode) -> str: + if node is self.root: + return "/" + spots : list[str] = [] current = node while current.id != None: spots.append(current.name) - if current.fatherId == None: + if current.fatherId is None: break current = self.nodes[current.fatherId] spots.append("") @@ -222,18 +225,12 @@ class PKVirtFs: return self.ToDir(self.FindChildInDirByName(node, name)) async def Login(self, username : str = None, password : str = None) -> None: - if self.client != None and username == None and password == None: + if self.client != None and username is None and password is None: username = self.client.username password = self.client.password - if self.client != None and self.client.username == username and self.client.password == password: - logging.info("Already login, try refresh token") - try: - await self.client.refresh_access_token() - self.__DumpLoginInfo() - return - except Exception: - logging.info("Refresh access token failed! Try relogin") + if username == None and password == None: + raise Exception("Username and password are required") self.__InitClientByUsernamePassword(username, password) await self.client.login() @@ -246,14 +243,14 @@ class PKVirtFs: async def Download(self, url : str, dirNode : DirNode = None) -> None : # 默认创建在当前目录下 # todo: 完善离线下载task相关 - if dirNode == None: + if dirNode is None: dirNode = self.currentLocation await self.client.offline_download(url, dirNode.id) async def Delete(self, node : VirtFsNode) -> None: father = self.GetFatherNode(node) fatherDir = self.ToDir(father) - if fatherDir == None: + if fatherDir is None: raise Exception('Failed to locate') if self.currentLocation is node or self.__IsAncestorsOf(node, self.currentLocation): raise Exception('Delete self or ancestor is not allowed') diff --git a/readme.md b/readme.md index 67d0b76..bf5552d 100644 --- a/readme.md +++ b/readme.md @@ -1,3 +1,7 @@ 在Pikpak Api基础上套了一层文件系统,更好自动化离线下载 -运行: python main.py \ No newline at end of file +运行: python main.py + +Todo: + +- [ ] 实现自定义根路径 \ No newline at end of file