From 115bf2a3e26219fad3d46793a38594c42a4505cc Mon Sep 17 00:00:00 2001 From: limil Date: Tue, 29 Oct 2024 01:20:57 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BC=98=E5=8C=96=E5=8F=98=E9=87=8F?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 168 ++++++++++++++++++++++++++-------------------------- pikpakFs.py | 149 +++++++++++++++++++++++----------------------- 2 files changed, 156 insertions(+), 161 deletions(-) diff --git a/main.py b/main.py index 02ae5b1..b3bd96c 100644 --- a/main.py +++ b/main.py @@ -4,7 +4,7 @@ from functools import wraps import logging import threading import colorlog -from pikpakFs import VirtFsNode, DirNode, FileNode, PKVirtFs +from pikpakFs import VirtFsNode, DirNode, FileNode, PKVirtFs, IsDir, IsFile import os def RunSyncInLoop(loop): @@ -30,7 +30,6 @@ def ProvideDecoratorSelfArgs(decorator, argsProvider): namespace = args[0] return decorator(argsProvider(namespace))(func)(*args, **kwargs) return decorated - return wrapper class PikpakConsole(cmd2.Cmd): @@ -39,7 +38,7 @@ class PikpakConsole(cmd2.Cmd): RunSync = ProvideDecoratorSelfArgs(RunSyncInLoop, LoopProvider) - def _SetupLogging(self): + def _setup_logging(self): formatter = colorlog.ColoredFormatter( "%(log_color)s%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt='%Y-%m-%d %H:%M:%S', @@ -52,20 +51,12 @@ class PikpakConsole(cmd2.Cmd): 'CRITICAL': 'red,bg_white', } ) - handler = logging.StreamHandler() handler.setFormatter(formatter) - logger = logging.getLogger() logger.addHandler(handler) - logger.setLevel(logging.INFO) - def __init__(self): - super().__init__() - self._SetupLogging() - self.client = PKVirtFs("token.json", proxy="http://127.0.0.1:7897") - def IOWorker(self, loop): self.terminal_lock.acquire() # 我看cmdloop是这么做的,所以我也这么做 asyncio.set_event_loop(loop) @@ -86,6 +77,66 @@ class PikpakConsole(cmd2.Cmd): future = asyncio.run_coroutine_threadsafe(PrintOuput(output), self.ioLoop) await asyncio.wrap_future(future) + def ParserProvider(self): + return cmd2.Cmd2ArgumentParser() + + def AddPathParser(parserProvider): + def PathParserProvider(self): + parser = parserProvider(self) + parser.add_argument("path", help="path", default="", nargs="?", type=RunSyncInLoop(self.loop)(self.client.PathToNode)) + return parser + return PathParserProvider + + def AddFatherAndSonParser(parserProvider): + def PathParserProvider(self): + parser = parserProvider(self) + parser.add_argument("path", help="path", default="", nargs="?", type=RunSyncInLoop(self.loop)(self.client.PathToFatherNodeAndNodeName)) + return parser + return PathParserProvider + + def AddUrlParser(parserProvider): + def PathParserProvider(self): + parser = parserProvider(self) + parser.add_argument("url", help="url") + return parser + return PathParserProvider + + def AddUsernamePasswordParser(parserProvider): + def PathParserProvider(self): + parser = parserProvider(self) + parser.add_argument("username", help="username", nargs="?") + parser.add_argument("password", help="password", nargs="?") + return parser + return PathParserProvider + + async def PathCompleter(self, text, line, begidx, endidx, filterFiles): + father, sonName = await self.client.PathToFatherNodeAndNodeName(text) + if not IsDir(father): + return [] + matches = [] + matchesNode = [] + for childId in father.childrenId: + child = self.client.GetNodeById(childId) + if filterFiles and IsFile(child): + continue + if child.name.startswith(sonName): + self.display_matches.append(child.name) + if sonName == "": + matches.append(text + child.name) + elif text.endswith(sonName): + matches.append(text[:text.rfind(sonName)] + child.name) + matchesNode.append(child) + if len(matchesNode) == 1 and IsDir(matchesNode[0]): + matches[0] += "/" + self.allow_appended_space = False + self.allow_closing_quote = False + return matches + + def __init__(self): + super().__init__() + self._setup_logging() + self.client = PKVirtFs("token.json", proxy="http://127.0.0.1:7897") + async def Run(self): # 1. 设置忽略SIGINT import signal @@ -121,6 +172,8 @@ class PikpakConsole(cmd2.Cmd): self.ioLoop.call_soon_threadsafe(self.ioLoop.stop) thread.join() + # commands # + def do_debug(self, args): """ Enable debug mode @@ -134,57 +187,15 @@ class PikpakConsole(cmd2.Cmd): """ logging.getLogger().setLevel(logging.INFO) logging.info("Debug mode disabled") - - login_parser = cmd2.Cmd2ArgumentParser() - login_parser.add_argument("username", help="username", nargs="?") - login_parser.add_argument("password", help="password", nargs="?") @RunSync - @cmd2.with_argparser(login_parser) + @ProvideDecoratorSelfArgs(cmd2.with_argparser, AddUsernamePasswordParser(ParserProvider)) async def do_login(self, args): """ Login to pikpak """ await self.client.Login(args.username, args.password) await self.aoutput("Logged in successfully") - - def ParserProvider(self): - return cmd2.Cmd2ArgumentParser() - - def AddPathParser(parserProvider): - def PathParserProvider(self): - parser = parserProvider(self) - parser.add_argument("path", help="path", default="", nargs="?", type=RunSyncInLoop(self.loop)(self.client.PathToNode)) - return parser - return PathParserProvider - - - async def PathCompleter(self, text, line, begidx, endidx, filterFiles): - father, sonName = await self.client.PathToFatherNodeAndNodeName(text) - fatherDir = self.client.ToDir(father) - if fatherDir is None: - return [] - - matches = [] - matchesNode = [] - for childId in fatherDir.childrenId: - node = self.client.nodes[childId] - if filterFiles and isinstance(node, FileNode): - continue - if node.name.startswith(sonName): - self.display_matches.append(node.name) - if sonName == "": - matches.append(text + node.name) - elif text.endswith(sonName): - matches.append(text[:text.rfind(sonName)] + node.name) - matchesNode.append(node) - - if len(matchesNode) == 1 and self.client.ToDir(matchesNode[0]) is not None: - matches[0] += "/" - self.allow_appended_space = False - self.allow_closing_quote = False - - return matches @RunSync async def complete_ls(self, text, line, begidx, endidx): @@ -196,13 +207,14 @@ class PikpakConsole(cmd2.Cmd): """ List files in a directory """ - if isinstance(args.path, DirNode): - for childId in args.path.childrenId: - node = self.client.nodes[childId] - await self.aoutput(node.name) - elif isinstance(args.path, FileNode): - await self.client.UpdateDownloadUrl(args.path) - await self.aoutput(f"{args.path.name}: {args.path.url}") + node = args.path + if IsDir(node): + for childId in node.childrenId: + child = self.client.GetNodeById(childId) + await self.aoutput(child.name) + elif IsFile(node): + await self.client.Refresh(node) + await self.aoutput(f"{node.name}: {node.url}") else: await self.aoutput("Invalid path") @@ -216,10 +228,11 @@ class PikpakConsole(cmd2.Cmd): """ Change directory """ - if self.client.ToDir(args.path) is None: + node = args.path + if not IsDir(node): await self.aoutput("Invalid directory") return - self.client.currentLocation = args.path + self.client.currentLocation = node @RunSync async def do_cwd(self, args): @@ -250,13 +263,6 @@ class PikpakConsole(cmd2.Cmd): async def complete_mkdir(self, text, line, begidx, endidx): return await self.PathCompleter(text, line, begidx, endidx, filterFiles = True) - def AddFatherAndSonParser(parserProvider): - def PathParserProvider(self): - parser = parserProvider(self) - parser.add_argument("path", help="path", default="", nargs="?", type=RunSyncInLoop(self.loop)(self.client.PathToFatherNodeAndNodeName)) - return parser - return PathParserProvider - @RunSync @ProvideDecoratorSelfArgs(cmd2.with_argparser, AddFatherAndSonParser(ParserProvider)) async def do_mkdir(self, args): @@ -264,23 +270,14 @@ class PikpakConsole(cmd2.Cmd): Create a directory """ father, sonName = args.path - fatherDir = self.client.ToDir(father) - if fatherDir == None or sonName == "" or sonName == None: + if not IsDir(father) or sonName == "" or sonName == None: await self.aoutput("Invalid path") return - childNode = self.client.FindChildInDirByName(fatherDir, sonName) - if childNode is not None: + child = self.client.FindChildInDirByName(father, sonName) + if child is not None: await self.aoutput("Path already exists") return - for i in range(1, 10): - await self.client.MakeDir(fatherDir, sonName + str(i)) - - def AddUrlParser(parserProvider): - def PathParserProvider(self): - parser = parserProvider(self) - parser.add_argument("url", help="url") - return parser - return PathParserProvider + await self.client.MakeDir(father, sonName) @RunSync @ProvideDecoratorSelfArgs(cmd2.with_argparser, AddPathParser(AddUrlParser(ParserProvider))) @@ -288,10 +285,11 @@ class PikpakConsole(cmd2.Cmd): """ Download a file """ - if self.client.ToDir(args.path) is None: + node = args.path + if not IsDir(node): await self.aoutput("Invalid directory") return - await self.client.Download(args.url, args.path) + await self.client.Download(args.url, node) if __name__ == "__main__": diff --git a/pikpakFs.py b/pikpakFs.py index 360f243..1c959fd 100644 --- a/pikpakFs.py +++ b/pikpakFs.py @@ -42,6 +42,12 @@ class FileNode(VirtFsNode): super().__init__(id, name, fatherId) self.url : str = None +def IsDir(node : VirtFsNode) -> bool: + return isinstance(node, DirNode) + +def IsFile(node : VirtFsNode) -> bool: + return isinstance(node, FileNode) + class PikpakToken: def __init__(self, username, password, access_token, refresh_token, user_id): self.username = username @@ -67,16 +73,16 @@ class PKVirtFs: self.loginCachePath = loginCachePath self.proxyConfig = proxy self.client : PikPakApi = None - self.__TryLoginFromCache() + self._try_login_from_cache() - def __InitClientByToken(self, token : PikpakToken): - self.__InitClientByUsernamePassword(token.username, token.password) + def _init_client_by_token(self, token : PikpakToken): + self._init_client_by_username_and_password(token.username, token.password) self.client.access_token = token.access_token self.client.refresh_token = token.refresh_token self.client.user_id = token.user_id self.client.encode_token() - def __InitClientByUsernamePassword(self, username : str, password : str): + def _init_client_by_username_and_password(self, username : str, password : str): httpx_client_args = None if self.proxyConfig != None: httpx_client_args = { @@ -89,7 +95,7 @@ class PKVirtFs: password = password, httpx_client_args=httpx_client_args) - def __TryLoginFromCache(self): + def _try_login_from_cache(self): if self.loginCachePath is None: return if not os.path.exists(self.loginCachePath): @@ -97,10 +103,10 @@ class PKVirtFs: with open(self.loginCachePath, 'r', encoding='utf-8') as file: content = file.read() token = PikpakToken.from_json(content) - self.__InitClientByToken(token) + self._init_client_by_token(token) logging.info("successfully load login info from cache") - def __DumpLoginInfo(self): + def _dump_login_info(self): if self.loginCachePath is None: return with open(self.loginCachePath, 'w', encoding='utf-8') as file: @@ -108,31 +114,26 @@ class PKVirtFs: file.write(token.to_json()) logging.info("successfully dump login info to cache") - def __IsAncestorsOf(self, nodeA : VirtFsNode, nodeB : VirtFsNode) -> bool: + def _is_ancestors_of(self, nodeA : VirtFsNode, nodeB : VirtFsNode) -> bool: if nodeB is nodeA: return False if nodeA is self.root: return True - while nodeB.fatherId != None: + while nodeB.fatherId != self.root.id: nodeB = self.nodes[nodeB.fatherId] if nodeB is nodeA: return True return False - def ToDir(self, node : VirtFsNode) -> DirNode: - if isinstance(node, DirNode): - return node - return None - - def ToFile(self, node : VirtFsNode) -> FileNode: - if isinstance(node, FileNode): - return node - return None + def GetNodeById(self, id : str) -> VirtFsNode: + if id == self.root.id: + return self.root + return self.nodes[id] def GetFatherNode(self, node : VirtFsNode) -> VirtFsNode: - if node is self.root or node.fatherId == self.root.id: + if node is self.root: return self.root - return self.nodes[node.fatherId] + return self.GetNodeById(node.fatherId) def FindChildInDirByName(self, dir : DirNode, name : str): if dir is self.root and name == "": @@ -143,39 +144,46 @@ class PKVirtFs: return node return None - async def RefreshDirectory(self, dirNode : DirNode): - next_page_token = None - nodes = [] - while True: - dirInfo = await self.client.file_list(parent_id = dirNode.id, next_page_token=next_page_token) - next_page_token = dirInfo["next_page_token"] - currentPageNodes = dirInfo["files"] - nodes.extend(currentPageNodes) - if next_page_token is None or next_page_token == "": - break - dirNode.childrenId.clear() + async def Refresh(self, node : VirtFsNode): + if node.lastUpdate != None: + return - for node in nodes: - child : VirtFsNode = None - id = node["id"] - name = node["name"] - fatherId = dirNode.id - if id in self.nodes: - child = self.nodes[id] - else: - child = DirNode(id, name, fatherId) if node["kind"].endswith("folder") else FileNode(id, name, fatherId) - self.nodes[id] = child - child.name = name - child.fatherId = fatherId - dirNode.childrenId.append(id) - dirNode.lastUpdate = datetime.now() + if IsDir(node): + next_page_token = None + childrenInfo = [] + while True: + dirInfo = await self.client.file_list(parent_id = node.id, next_page_token=next_page_token) + next_page_token = dirInfo["next_page_token"] + currentPageNodes = dirInfo["files"] + childrenInfo.extend(currentPageNodes) + if next_page_token is None or next_page_token == "": + break + node.childrenId.clear() + + for childInfo in childrenInfo: + child : VirtFsNode = None + id = childInfo["id"] + name = childInfo["name"] + fatherId = node.id + if id in self.nodes: + child = self.nodes[id] + else: + child = DirNode(id, name, fatherId) if childInfo["kind"].endswith("folder") else FileNode(id, name, fatherId) + self.nodes[id] = child + child.name = name + child.fatherId = fatherId + node.childrenId.append(id) + elif IsFile(node): + result = await self.client.get_download_url(node.id) + node.url = result["web_content_link"] + + node.lastUpdate = datetime.now() async def PathToNode(self, pathStr : str) -> VirtFsNode: father, sonName = await self.PathToFatherNodeAndNodeName(pathStr) if sonName == "": return father - fatherDir = self.ToDir(father) - if fatherDir is None: + if not IsDir(father): return None return self.FindChildInDirByName(father, sonName) @@ -193,21 +201,18 @@ class PKVirtFs: current = self.GetFatherNode(current) continue father = current - currentDir = self.ToDir(current) - if currentDir is None: + if not IsDir(current): current = None continue - if currentDir.lastUpdate is None: - await self.RefreshDirectory(currentDir) + await self.Refresh(current) if spot == ".": continue sonName = spot - current = self.FindChildInDirByName(currentDir, spot) + current = self.FindChildInDirByName(current, spot) if current != None: - currentDir = self.ToDir(current) - if currentDir != None and currentDir.lastUpdate is None: - await self.RefreshDirectory(currentDir) + if IsDir(current): + await self.Refresh(current) father = self.GetFatherNode(current) sonName = current.name @@ -218,19 +223,13 @@ class PKVirtFs: return "/" spots : list[str] = [] current = node - while current.id != None: + while current is not self.root: spots.append(current.name) - if current.fatherId is None: - break - current = self.nodes[current.fatherId] + current = self.GetFatherNode(current) spots.append("") return "/".join(reversed(spots)) - async def MakeDir(self, node : DirNode, name : str) -> DirNode: - await self.client.create_folder(name, node.id) - await self.RefreshDirectory(node) - return self.ToDir(self.FindChildInDirByName(node, name)) - + # commands # async def Login(self, username : str = None, password : str = None) -> None: if self.client != None and username is None and password is None: username = self.client.username @@ -239,28 +238,26 @@ class PKVirtFs: if username == None and password == None: raise Exception("Username and password are required") - self.__InitClientByUsernamePassword(username, password) + self._init_client_by_username_and_password(username, password) await self.client.login() - self.__DumpLoginInfo() - - async def UpdateDownloadUrl(self, file : FileNode) -> None: - result = await self.client.get_download_url(file.id) - file.url = result["web_content_link"] + self._dump_login_info() + + async def MakeDir(self, node : DirNode, name : str) -> DirNode: + await self.client.create_folder(name, node.id) + await self.Refresh(node) + return self.FindChildInDirByName(node, name) async def Download(self, url : str, dirNode : DirNode) -> None : # 默认创建在当前目录下 # todo: 完善离线下载task相关 - if dirNode is None: - dirNode = self.currentLocation await self.client.offline_download(url, dirNode.id) async def Delete(self, node : VirtFsNode) -> None: father = self.GetFatherNode(node) - fatherDir = self.ToDir(father) - if fatherDir is None: + if not IsDir(father): raise Exception('Failed to locate') - if self.currentLocation is node or self.__IsAncestorsOf(node, self.currentLocation): + if self.currentLocation is node or self._is_ancestors_of(node, self.currentLocation): raise Exception('Delete self or ancestor is not allowed') await self.client.delete_to_trash([node.id]) - await self.RefreshDirectory(fatherDir) \ No newline at end of file + await self.Refresh(father) \ No newline at end of file