From ae6c4cc0640c83c726648cc971d0f598ae9f83cf Mon Sep 17 00:00:00 2001 From: limil Date: Wed, 23 Oct 2024 09:14:06 +0800 Subject: [PATCH] =?UTF-8?q?=E7=A7=BB=E9=99=A4tui=EF=BC=8C=E8=AE=A1?= =?UTF-8?q?=E5=88=92=E4=BD=BF=E7=94=A8cmd2=E6=9D=A5=E5=AE=9E=E7=8E=B0?= =?UTF-8?q?=E4=BA=A4=E4=BA=92?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 194 ++++----------------------------------------- pikpakFs.py | 199 +++++++++++++++-------------------------------- readme.md | 2 +- requirements.txt | 25 ------ 4 files changed, 78 insertions(+), 342 deletions(-) delete mode 100644 requirements.txt diff --git a/main.py b/main.py index e18d895..d1d145f 100644 --- a/main.py +++ b/main.py @@ -1,189 +1,21 @@ -from textual import events -from textual.app import App, ComposeResult -from textual.widgets import Input, Log -from textual.containers import Horizontal, Vertical, Widget -from collections import deque +import cmd2 import sys -import asyncio -import argparse -import pikpakFs -import logging -import functools +from pikpakFs import PKVirtFs, VirtFsNode, DirNode, FileNode -class TextualLogHandler(logging.Handler): - def __init__(self, log_widget: Log): + +class PKApp(cmd2.Cmd): + def __init__(self): super().__init__() - self.log_widget = log_widget + self.fs = PKVirtFs(loginCachePath = "token.json", proxy = "http://127.0.0.1:10808") - def emit(self, record): - message = self.format(record) - self.log_widget.write_line(message) - -class HistoryInput(Input): - def __init__(self, placeholder: str = "", max_history: int = 20, *args, **kwargs): - super().__init__(placeholder=placeholder, *args, **kwargs) - self.block_input = False - self.history = deque(maxlen=max_history) # 历史记录列表 - self.history_view = list() - self.history_index = -1 # 当前历史索引,初始为 -1 - self.history_log = Log(auto_scroll=False) # 用于显示历史记录的日志小部件 - - def widget(self) -> Widget: - return Vertical(self, self.history_log) - - def reverseIdx(self, idx) -> int: - return len(self.history) - 1 - idx - - async def on_key(self, event: events.Key) -> None: - if self.block_input: - return - if event.key == "up": - if self.history_index == -1: - self.cursor_position = len(self.value) - await self.update_history_view() - return - self.history_index = max(0, self.history_index - 1) - elif event.key == "down": - self.history_index = min(len(self.history) - 1, self.history_index + 1) + async def do_login(self, args): + if len(args) < 2: + await self.fs.Login() else: - self.history_index = -1 - await self.update_history_view() - return + await self.fs.Login(args[0], args[1]) - if len(self.history) > 0 and self.history_index != -1: - self.value = self.history[self.reverseIdx(self.history_index)] - self.cursor_position = len(self.value) - await self.update_history_view() - - async def on_input_submitted(self, event: Input.Submitted) -> None: - user_input = event.value.strip() - if user_input: - self.history.append(user_input) - self.history_index = -1 - self.value = "" - await self.update_history_view() - - async def update_history_view(self): - self.history_log.clear() - self.history_view.clear() - - if self.history: - for idx, item in enumerate(self.history): - prefix = "> " if self.reverseIdx(idx) == self.history_index else " " - self.history_view.append(f"{prefix}{item}") - - self.history_log.write_lines(reversed(self.history_view)) - - scroll_height = self.history_log.scrollable_size.height - scroll_start = self.history_log.scroll_offset.y - current = self.history_index - - if current < scroll_start: - scroll_idx = min(max(0, current), len(self.history) - 1) - self.history_log.scroll_to(y = scroll_idx) - elif current >= scroll_start + scroll_height - 1: - self.history_log.scroll_to(y = current - scroll_height + 1) - - self.refresh() - - async def animate_ellipsis(self): - ellipsis = "" - try: - while True: - # 循环添加省略号(最多3个点) - if len(ellipsis) < 3: - ellipsis += "." - else: - ellipsis = "" - self.value = f"Waiting{ellipsis}" - await asyncio.sleep(0.5) - finally: - self.value = "" - pass - - async def wait_for(self, operation): - self.disabled = True - self.block_input = True - animation_task = asyncio.create_task(self.animate_ellipsis()) - await operation() - animation_task.cancel() - self.disabled = False - self.block_input = False - self.focus() -class InputLoggerApp(App): - CSS = """ - .divider { - width: 0.5%; - height: 100%; - background: #444444; - } - .log { - width: 80%; - height: 100%; - } - """ - - def setup_logger(self) -> None: - formatStr = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' - - logging.basicConfig( - filename='app.log', - filemode='a', - format=formatStr - ) - - logHandler = TextualLogHandler(self.log_widget) - - # 设置日志格式 - logHandler.setFormatter(logging.Formatter(formatStr)) - - # 获取根日志记录器,并添加自定义处理器 - root_logger = logging.getLogger() - root_logger.setLevel(logging.INFO) - root_logger.addHandler(logHandler) - - def write_to_console(self, content) -> None: - self.log_widget.write_line(content) - - def compose(self) -> ComposeResult: - self.input_widget = HistoryInput(placeholder="Input Command...") - self.log_widget = Log(classes="log", highlight=True) - - left_panel = self.input_widget.widget() - right_panel = self.log_widget - divider = Vertical(classes="divider") - - yield Horizontal(left_panel, divider, right_panel) - - def on_mount(self) -> None: - self.setup_logger() - self.fs = pikpakFs.VirtFs(loginCachePath = "token.json", proxy = "http://127.0.0.1:7897") - - async def handle_command(self, command) -> None: - try: - if command == "clear": - self.log_widget.clear() - elif command == "exit": - sys.exit(0) - elif command == "debug": - logger = logging.getLogger() - logger.setLevel(logging.DEBUG) - self.write_to_console("Done") - else: - self.write_to_console(await self.fs.HandlerCommand(command)) - except Exception as e: - logging.exception(e) - - async def on_input_submitted(self, event: Input.Submitted) -> None: - if event.input is not self.input_widget: - return - - user_input = event.value.strip() - self.write_to_console(f"> {user_input}") - await self.input_widget.wait_for(functools.partial(self.handle_command, user_input)) - -if __name__ == "__main__": - app = InputLoggerApp() - app.run() +if __name__ == '__main__': + app = PKApp() + sys.exit(app.cmdloop()) \ No newline at end of file diff --git a/pikpakFs.py b/pikpakFs.py index 3eaaca1..87a9871 100644 --- a/pikpakFs.py +++ b/pikpakFs.py @@ -31,17 +31,17 @@ class VirtFsNode: self.id = id self.name = name self.fatherId = fatherId + self.lastUpdate : datetime = None class DirNode(VirtFsNode): def __init__(self, id : str, name : str, fatherId : str): super().__init__(id, name, fatherId) self.childrenId : list[str] = [] - self.lastUpdate : datetime = None class FileNode(VirtFsNode): def __init__(self, id : str, name : str, fatherId : str): super().__init__(id, name, fatherId) - self.lastUpdate : datetime = None + self.url : str = None class PikpakToken: def __init__(self, username, password, access_token, refresh_token, user_id): @@ -59,20 +59,7 @@ class PikpakToken: data = json.loads(json_str) return cls(**data) -class VirtFs: - def __CalcMd5(self, text : str): - return md5(text.encode()).hexdigest() - - def __ToDir(self, node : VirtFsNode) -> DirNode: - if isinstance(node, DirNode): - return node - return None - - def __ToFile(self, node : VirtFsNode) -> FileNode: - if isinstance(node, FileNode): - return node - return None - +class PKVirtFs: def __init__(self, loginCachePath : str = None, proxy : str = None): self.nodes : Dict[str, VirtFsNode] = {} self.root = DirNode(None, "", None) @@ -133,7 +120,31 @@ class VirtFs: return True return False - async def __RefreshDirectory(self, dirNode : DirNode): + def ToDir(self, node : VirtFsNode) -> DirNode: + if isinstance(node, DirNode): + return node + return None + + def ToFile(self, node : VirtFsNode) -> FileNode: + if isinstance(node, FileNode): + return node + return None + + def GetFatherNode(self, node : VirtFsNode) -> VirtFsNode: + if node.fatherId == None: + return self.root + return self.nodes[node.fatherId] + + def FindChildInDirByName(self, dir : DirNode, name : str): + if dir is self.root and name == "": + return self.root + for childId in dir.childrenId: + node = self.nodes[childId] + if name == node.name: + return node + return None + + async def RefreshDirectory(self, dirNode : DirNode): dirInfo = await self.client.file_list(parent_id = dirNode.id) nodes = dirInfo["files"] dirNode.childrenId.clear() @@ -153,23 +164,14 @@ class VirtFs: dirNode.childrenId.append(id) dirNode.lastUpdate = datetime.now() - def __FindChildInDirByName(self, dir : DirNode, name : str): - if dir is self.root and name == "": - return self.root - for childId in dir.childrenId: - node = self.nodes[childId] - if name == node.name: - return node - return None - - async def __PathToNode(self, pathStr : str) -> VirtFsNode: - father, sonName = await self.__PathToFatherNodeAndNodeName(pathStr) - fatherDir = self.__ToDir(father) + async def PathToNode(self, pathStr : str) -> VirtFsNode: + father, sonName = await self.PathToFatherNodeAndNodeName(pathStr) + fatherDir = self.ToDir(father) if fatherDir == None: return None - return self.__FindChildInDirByName(father, sonName) + return self.FindChildInDirByName(father, sonName) - async def __PathToFatherNodeAndNodeName(self, pathStr : str) -> tuple[VirtFsNode, str]: + async def PathToFatherNodeAndNodeName(self, pathStr : str) -> tuple[VirtFsNode, str]: pathWalker = PathWalker(pathStr) father : VirtFsNode = None sonName : str = None @@ -180,40 +182,30 @@ class VirtFs: father = None break if spot == "..": - current = self.root if current.fatherId == None else self.nodes[current.fatherId] + current = self.GetFatherNode(current) continue - father = current - - currentDir = self.__ToDir(current) + currentDir = self.ToDir(current) if currentDir == None: current = None continue - if currentDir.lastUpdate == None: - await self.__RefreshDirectory(currentDir) - + await self.RefreshDirectory(currentDir) if spot == ".": continue - sonName = spot - current = self.__FindChildInDirByName(currentDir, spot) + current = self.FindChildInDirByName(currentDir, spot) if current != None: - currentDir = self.__ToDir(current) - if currentDir != None: - await self.__RefreshDirectory(currentDir) - father = self.root if current.fatherId == None else self.nodes[current.fatherId] + currentDir = self.ToDir(current) + if currentDir != None and currentDir.lastUpdate == None: + await self.RefreshDirectory(currentDir) + father = self.GetFatherNode(current) sonName = current.name return father, sonName - async def __MakeDir(self, node : DirNode, name : str) -> DirNode: - await self.client.create_folder(name, node.id) - await self.__RefreshDirectory(node) - return self.__ToDir(self.__FindChildInDirByName(node, name)) - - async def __NodeToPath(self, node : VirtFsNode) -> str: + async def NodeToPath(self, node : VirtFsNode) -> str: spots : list[str] = [""] current = node while current.id != None: @@ -224,110 +216,47 @@ class VirtFs: spots.append("") return "/".join(reversed(spots)) - async def login(self, username : str = None, password : str = None) -> str: + async def MakeDir(self, node : DirNode, name : str) -> DirNode: + await self.client.create_folder(name, node.id) + await self.RefreshDirectory(node) + return self.ToDir(self.FindChildInDirByName(node, name)) + + async def Login(self, username : str = None, password : str = None) -> None: if self.client != None and username == None and password == None: username = self.client.username password = self.client.password if self.client != None and self.client.username == username and self.client.password == password: - logging.info("already login, try refresh token") + logging.info("Already login, try refresh token") try: await self.client.refresh_access_token() self.__DumpLoginInfo() - return "success" + return except Exception: logging.info("Refresh access token failed! Try relogin") self.__InitClientByUsernamePassword(username, password) await self.client.login() self.__DumpLoginInfo() - return "success" - - async def ls(self, pathStr : str = "") -> str: - dirNode = self.__ToDir(await self.__PathToNode(pathStr)) - if dirNode == None: - return f"path not found or is file: {pathStr}" - result = [] - for childId in dirNode.childrenId: - node = self.nodes[childId] - result.append(node.name) - return "\n".join(result) - async def cd(self, pathStr : str = "") -> str: - dirNode = self.__ToDir(await self.__PathToNode(pathStr)) - if dirNode == None: - return f"path not found or is file: {pathStr}" - self.currentLocation = dirNode - return "success" + async def UpdateDownloadUrl(self, file : FileNode) -> None: + result = await self.client.get_download_url(file.id) + file.url = result["web_content_link"] - async def cwd(self) -> str: - path = await self.__NodeToPath(self.currentLocation) - return path if path != None else "cwd failed" - - async def geturl(self, pathStr : str) -> str: - fileNode = self.__ToFile(await self.__PathToNode(pathStr)) - if fileNode == None: - return f"path not found or is not file: {pathStr}" - - result = await self.client.get_download_url(fileNode.id) - return result["web_content_link"] - - async def mkdir(self, pathStr : str) -> str: - father, target = await self.__PathToFatherNodeAndNodeName(pathStr) - fatherDir = self.__ToDir(father) - if fatherDir == None: - return "Failed to locate" - if self.__FindChildInDirByName(fatherDir, target) != None: - return f"Path {pathStr} already existed" - await self.__MakeDir(fatherDir, target) - return "success" - - async def download(self, url : str, pathStr : str = "") -> str : + async def Download(self, url : str, dirNode : DirNode = None) -> None : + # 默认创建在当前目录下 # todo: 完善离线下载task相关 - dirNode = self.__ToDir(await self.__PathToNode(pathStr)) if dirNode == None: - return f"path not found or is file: {pathStr}" + dirNode = self.currentLocation + await self.client.offline_download(url, dirNode.id) - subFolderName = self.__CalcMd5(url) - newDirNode = await self.__MakeDir(dirNode, subFolderName) - if newDirNode == None: - return f"falied to create sub folder {subFolderName}" - - await self.client.offline_download(url, newDirNode.id) - return subFolderName - - async def update(self, pathStr : str = ""): - dirNode = self.__ToDir(await self.__PathToNode(pathStr)) - if dirNode == None: - return f"path not found or is file: {pathStr}" - await self.__RefreshDirectory(dirNode) - return "success" - - async def delete(self, pathStr : str): - father, name = await self.__PathToFatherNodeAndNodeName(pathStr) - fatherDir = self.__ToDir(father) + async def Delete(self, node : VirtFsNode) -> None: + father = self.GetFatherNode(node) + fatherDir = self.ToDir(father) if fatherDir == None: - return "Failed to locate" - node = self.__FindChildInDirByName(fatherDir, name) - if node == None: - return f"path {pathStr} not existed" - + raise Exception('Failed to locate') if self.currentLocation is node or self.__IsAncestorsOf(node, self.currentLocation): - return f"delete self or ancestor is not allowed" + raise Exception('Delete self or ancestor is not allowed') + await self.client.delete_to_trash([node.id]) - await self.__RefreshDirectory(fatherDir) - return "success" - - async def HandlerCommand(self, command): - result = re.findall(r'"(.*?)"|(\S+)', command) - filtered_result = [item for sublist in result for item in sublist if item] - - cmd = filtered_result[0] - args = filtered_result[1:] - - method = getattr(self, cmd) - if method == None: - return f"Unknown command: {cmd}" - output = await method(*args) - logging.info(f"{command} : {repr(output)}") - return output \ No newline at end of file + await self.RefreshDirectory(fatherDir) \ No newline at end of file diff --git a/readme.md b/readme.md index 1bd52e5..67d0b76 100644 --- a/readme.md +++ b/readme.md @@ -1,3 +1,3 @@ 在Pikpak Api基础上套了一层文件系统,更好自动化离线下载 -python main.py 运行 \ No newline at end of file +运行: python main.py \ No newline at end of file diff --git a/requirements.txt b/requirements.txt deleted file mode 100644 index ccc046d..0000000 --- a/requirements.txt +++ /dev/null @@ -1,25 +0,0 @@ -anyio==4.6.2.post1 -certifi==2024.8.30 -charset-normalizer==3.4.0 -DataRecorder==3.6.2 -DownloadKit==2.0.5 -et-xmlfile==1.1.0 -h11==0.14.0 -httpcore==1.0.6 -httpx==0.27.2 -idna==3.10 -linkify-it-py==2.0.3 -markdown-it-py==3.0.0 -mdit-py-plugins==0.4.2 -mdurl==0.1.2 -openpyxl==3.1.5 -PikPakAPI==0.1.10 -platformdirs==4.3.6 -Pygments==2.18.0 -requests==2.32.3 -rich==13.9.2 -sniffio==1.3.1 -textual==0.83.0 -typing_extensions==4.12.2 -uc-micro-py==1.0.3 -urllib3==2.2.3