diff --git a/main.py b/main.py index 5406ea6..e18d895 100644 --- a/main.py +++ b/main.py @@ -159,7 +159,7 @@ class InputLoggerApp(App): def on_mount(self) -> None: self.setup_logger() - self.fs = pikpakFs.VirtFs("", "", "", loginCachePath = "token.json") + self.fs = pikpakFs.VirtFs(loginCachePath = "token.json", proxy = "http://127.0.0.1:7897") async def handle_command(self, command) -> None: try: diff --git a/pikpakFs.py b/pikpakFs.py index 32a7834..3eaaca1 100644 --- a/pikpakFs.py +++ b/pikpakFs.py @@ -9,18 +9,22 @@ import os import logging class PathWalker(): - def __init__(self, pathStr : str, subDir : str = None, sep : str = "/"): - self.pathSpots : list[str] = [] + def __init__(self, pathStr : str, sep : str = "/"): + self.__pathSpots : list[str] = [] pathStr = pathStr.strip() if not pathStr.startswith(sep): - self.pathSpots.append(".") + self.__pathSpots.append(".") pathSpots = [spot.strip() for spot in pathStr.split(sep) if spot.strip() != ""] - self.pathSpots.extend(pathSpots) - if subDir != None: - self.pathSpots.append(subDir) + self.__pathSpots.extend(pathSpots) def IsAbsolute(self) -> bool: - return len(self.pathSpots) == 0 or self.pathSpots[0] != "." + return len(self.__pathSpots) == 0 or self.__pathSpots[0] != "." + + def AppendSpot(self, spot): + self.__pathSpots.append(spot) + + def Walk(self) -> list[str]: + return self.__pathSpots class VirtFsNode: def __init__(self, id : str, name : str, fatherId : str): @@ -29,12 +33,11 @@ class VirtFsNode: self.fatherId = fatherId class DirNode(VirtFsNode): - def __init__(self, id : str, name : str, fatherId : str, childrenId : list[str]): + def __init__(self, id : str, name : str, fatherId : str): super().__init__(id, name, fatherId) - self.childrenId = childrenId + self.childrenId : list[str] = [] self.lastUpdate : datetime = None - class FileNode(VirtFsNode): def __init__(self, id : str, name : str, fatherId : str): super().__init__(id, name, fatherId) @@ -60,11 +63,38 @@ class VirtFs: def __CalcMd5(self, text : str): return md5(text.encode()).hexdigest() - def __init__(self, username : str, password : str, proxy : str = None, loginCachePath : str = None): + def __ToDir(self, node : VirtFsNode) -> DirNode: + if isinstance(node, DirNode): + return node + return None + + def __ToFile(self, node : VirtFsNode) -> FileNode: + if isinstance(node, FileNode): + return node + return None + + def __init__(self, loginCachePath : str = None, proxy : str = None): + self.nodes : Dict[str, VirtFsNode] = {} + self.root = DirNode(None, "", None) + self.currentLocation = self.root + + self.loginCachePath = loginCachePath + self.proxyConfig = proxy + self.client : PikPakApi = None + self.__TryLoginFromCache() + + def __InitClientByToken(self, token : PikpakToken): + self.__InitClientByUsernamePassword(token.username, token.password) + self.client.access_token = token.access_token + self.client.refresh_token = token.refresh_token + self.client.user_id = token.user_id + self.client.encode_token() + + def __InitClientByUsernamePassword(self, username : str, password : str): httpx_client_args = None - if proxy != None: + if self.proxyConfig != None: httpx_client_args = { - "proxy": proxy, + "proxy": self.proxyConfig, "transport": httpx.AsyncHTTPTransport(retries=1), } @@ -72,14 +102,8 @@ class VirtFs: username = username, password = password, httpx_client_args=httpx_client_args) - - self.nodes : Dict[str, VirtFsNode] = {} - self.loginCachePath = loginCachePath - self.root = DirNode(None, "", None, []) - self.currentLocation = self.root - self.__LoginFromCache() - def __LoginFromCache(self): + def __TryLoginFromCache(self): if self.loginCachePath == None: return if not os.path.exists(self.loginCachePath): @@ -87,13 +111,7 @@ class VirtFs: with open(self.loginCachePath, 'r', encoding='utf-8') as file: content = file.read() token = PikpakToken.from_json(content) - if self.client.username != token.username or self.client.password != token.password: - logging.error("failed to load login info from cache, not match") - return - self.client.access_token = token.access_token - self.client.refresh_token = token.refresh_token - self.client.user_id = token.user_id - self.client.encode_token() + self.__InitClientByToken(token) logging.info("successfully load login info from cache") def __DumpLoginInfo(self): @@ -103,10 +121,17 @@ class VirtFs: token = PikpakToken(self.client.username, self.client.password, self.client.access_token, self.client.refresh_token, self.client.user_id) file.write(token.to_json()) logging.info("successfully dump login info to cache") - - async def __RefreshAccessToken(self): - result = await self.client.refresh_access_token() - return json.dumps(result, indent=4) + + def __IsAncestorsOf(self, nodeA : VirtFsNode, nodeB : VirtFsNode) -> bool: + if nodeB is nodeA: + return False + if nodeA is self.root: + return True + while nodeB.fatherId != None: + nodeB = self.nodes[nodeB.fatherId] + if nodeB is nodeA: + return True + return False async def __RefreshDirectory(self, dirNode : DirNode): dirInfo = await self.client.file_list(parent_id = dirNode.id) @@ -117,57 +142,76 @@ class VirtFs: child : VirtFsNode = None id = node["id"] name = node["name"] - + fatherId = dirNode.id if id in self.nodes: child = self.nodes[id] else: - if node["kind"].endswith("folder"): - child = DirNode(id, name, dirNode.id, []) - else: - child = FileNode(id, name, dirNode.id) + child = DirNode(id, name, fatherId) if node["kind"].endswith("folder") else FileNode(id, name, fatherId) self.nodes[id] = child - child.name = name + child.fatherId = fatherId dirNode.childrenId.append(id) - dirNode.lastUpdate = datetime.now() - async def __PathToNode(self, pathStr : str, subDir : str = None) -> VirtFsNode: - pathWalker = PathWalker(pathStr, subDir) - current : VirtFsNode = None - if pathWalker.IsAbsolute(): - current = self.root - else: - current = self.currentLocation + def __FindChildInDirByName(self, dir : DirNode, name : str): + if dir is self.root and name == "": + return self.root + for childId in dir.childrenId: + node = self.nodes[childId] + if name == node.name: + return node + return None + + async def __PathToNode(self, pathStr : str) -> VirtFsNode: + father, sonName = await self.__PathToFatherNodeAndNodeName(pathStr) + fatherDir = self.__ToDir(father) + if fatherDir == None: + return None + return self.__FindChildInDirByName(father, sonName) + + async def __PathToFatherNodeAndNodeName(self, pathStr : str) -> tuple[VirtFsNode, str]: + pathWalker = PathWalker(pathStr) + father : VirtFsNode = None + sonName : str = None + current = self.root if pathWalker.IsAbsolute() else self.currentLocation - for spot in pathWalker.pathSpots: + for spot in pathWalker.Walk(): if current == None: + father = None break if spot == "..": - if current.fatherId == None: - current = self.root - else: - current = self.nodes[current.fatherId] + current = self.root if current.fatherId == None else self.nodes[current.fatherId] continue - - if not isinstance(current, DirNode): - return None - currentDir : DirNode = current + father = current + + currentDir = self.__ToDir(current) + if currentDir == None: + current = None + continue + if currentDir.lastUpdate == None: await self.__RefreshDirectory(currentDir) if spot == ".": continue - else: - current = None - for childId in currentDir.childrenId: - node = self.nodes[childId] - if spot == node.name: - current = node - break + + sonName = spot + current = self.__FindChildInDirByName(currentDir, spot) + + if current != None: + currentDir = self.__ToDir(current) + if currentDir != None: + await self.__RefreshDirectory(currentDir) + father = self.root if current.fatherId == None else self.nodes[current.fatherId] + sonName = current.name + + return father, sonName - return current + async def __MakeDir(self, node : DirNode, name : str) -> DirNode: + await self.client.create_folder(name, node.id) + await self.__RefreshDirectory(node) + return self.__ToDir(self.__FindChildInDirByName(node, name)) async def __NodeToPath(self, node : VirtFsNode) -> str: spots : list[str] = [""] @@ -180,87 +224,110 @@ class VirtFs: spots.append("") return "/".join(reversed(spots)) - async def login(self): - result = await self.client.login() + async def login(self, username : str = None, password : str = None) -> str: + if self.client != None and username == None and password == None: + username = self.client.username + password = self.client.password + + if self.client != None and self.client.username == username and self.client.password == password: + logging.info("already login, try refresh token") + try: + await self.client.refresh_access_token() + self.__DumpLoginInfo() + return "success" + except Exception: + logging.info("Refresh access token failed! Try relogin") + + self.__InitClientByUsernamePassword(username, password) + await self.client.login() self.__DumpLoginInfo() - logging.debug(json.dumps(result, indent=4)) - return "Login Success" + return "success" async def ls(self, pathStr : str = "") -> str: - node = await self.__PathToNode(pathStr) - if node == None: - return f"path not found: {pathStr}" - if not isinstance(node, DirNode): - return f"path is not directory" - dirNode : DirNode = node - result = ["==== ls ===="] + dirNode = self.__ToDir(await self.__PathToNode(pathStr)) + if dirNode == None: + return f"path not found or is file: {pathStr}" + result = [] for childId in dirNode.childrenId: node = self.nodes[childId] result.append(node.name) return "\n".join(result) async def cd(self, pathStr : str = "") -> str: - node = await self.__PathToNode(pathStr) - if node == None: - return f"path not found: {pathStr}" - if not isinstance(node, DirNode): - return f"path is not directory" - dirNode : DirNode = node + dirNode = self.__ToDir(await self.__PathToNode(pathStr)) + if dirNode == None: + return f"path not found or is file: {pathStr}" self.currentLocation = dirNode - return "" + return "success" async def cwd(self) -> str: path = await self.__NodeToPath(self.currentLocation) - if path == None: - return f"cwd failed" - return path + return path if path != None else "cwd failed" async def geturl(self, pathStr : str) -> str: - node = await self.__PathToNode(pathStr) - if node == None: - return f"path not found: {pathStr}" - if not isinstance(node, FileNode): - return f"path is not file" - result = await self.client.get_download_url(node.id) - logging.debug(json.dumps(result, indent=4)) + fileNode = self.__ToFile(await self.__PathToNode(pathStr)) + if fileNode == None: + return f"path not found or is not file: {pathStr}" + + result = await self.client.get_download_url(fileNode.id) return result["web_content_link"] - async def offdown(self, url : str, pathStr : str = "") -> str : - node = await self.__PathToNode(pathStr) - if node == None: - return f"path not found: {pathStr}" - elif not isinstance(node, DirNode): - return f"path is not directory" + async def mkdir(self, pathStr : str) -> str: + father, target = await self.__PathToFatherNodeAndNodeName(pathStr) + fatherDir = self.__ToDir(father) + if fatherDir == None: + return "Failed to locate" + if self.__FindChildInDirByName(fatherDir, target) != None: + return f"Path {pathStr} already existed" + await self.__MakeDir(fatherDir, target) + return "success" + + async def download(self, url : str, pathStr : str = "") -> str : + # todo: 完善离线下载task相关 + dirNode = self.__ToDir(await self.__PathToNode(pathStr)) + if dirNode == None: + return f"path not found or is file: {pathStr}" subFolderName = self.__CalcMd5(url) - subNode = await self.__PathToNode(pathStr, subFolderName) - if subNode == None: - result = await self.client.create_folder(subFolderName, node.id) - logging.debug(json.dumps(result, indent=4)) - await self.__RefreshDirectory(node) - subNode = await self.__PathToNode(pathStr, subFolderName) - elif not isinstance(subNode, DirNode): - return f"path is not directory" - - if subNode == None: - return f"path not found: {pathStr}" - elif not isinstance(subNode, DirNode): - return f"path is not directory" - - result = await self.client.offline_download(url, subNode.id) - logging.debug(json.dumps(result, indent=4)) + newDirNode = await self.__MakeDir(dirNode, subFolderName) + if newDirNode == None: + return f"falied to create sub folder {subFolderName}" + await self.client.offline_download(url, newDirNode.id) return subFolderName + async def update(self, pathStr : str = ""): + dirNode = self.__ToDir(await self.__PathToNode(pathStr)) + if dirNode == None: + return f"path not found or is file: {pathStr}" + await self.__RefreshDirectory(dirNode) + return "success" + + async def delete(self, pathStr : str): + father, name = await self.__PathToFatherNodeAndNodeName(pathStr) + fatherDir = self.__ToDir(father) + if fatherDir == None: + return "Failed to locate" + node = self.__FindChildInDirByName(fatherDir, name) + if node == None: + return f"path {pathStr} not existed" + + if self.currentLocation is node or self.__IsAncestorsOf(node, self.currentLocation): + return f"delete self or ancestor is not allowed" + await self.client.delete_to_trash([node.id]) + await self.__RefreshDirectory(fatherDir) + return "success" async def HandlerCommand(self, command): result = re.findall(r'"(.*?)"|(\S+)', command) filtered_result = [item for sublist in result for item in sublist if item] - command = filtered_result[0] + cmd = filtered_result[0] args = filtered_result[1:] - method = getattr(self, command) + method = getattr(self, cmd) if method == None: - return f"Unknown command: {command}" - return await method(*args) \ No newline at end of file + return f"Unknown command: {cmd}" + output = await method(*args) + logging.info(f"{command} : {repr(output)}") + return output \ No newline at end of file