From b9bc4223968460d15fd11f9c6d03ed2132f2e0ed Mon Sep 17 00:00:00 2001 From: limil Date: Sat, 2 Aug 2025 20:01:13 +0800 Subject: [PATCH] =?UTF-8?q?=E6=8F=90=E4=BA=A4REPL=E4=BB=A3=E7=A0=81?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- PikPakFileSystem.py | 363 ------------------------------------------- REPL.py | 92 +++++++++++ TaskManager.py | 366 -------------------------------------------- app.py | 43 ++++++ aria2helper.py | 72 --------- main.py | 352 ------------------------------------------ readme.md | 169 +------------------- 7 files changed, 140 insertions(+), 1317 deletions(-) delete mode 100644 PikPakFileSystem.py create mode 100644 REPL.py delete mode 100644 TaskManager.py create mode 100644 app.py delete mode 100644 aria2helper.py delete mode 100644 main.py diff --git a/PikPakFileSystem.py b/PikPakFileSystem.py deleted file mode 100644 index fa58f2b..0000000 --- a/PikPakFileSystem.py +++ /dev/null @@ -1,363 +0,0 @@ -import httpx -from pikpakapi import PikPakApi, DownloadStatus -from typing import Dict -from datetime import datetime -import json -import os -import logging -from typing import Any - -class NodeBase: - def __init__(self, id : str, name : str, fatherId : str): - self.id = id - self.name = name - self._father_id = fatherId - self.lastUpdate : datetime = None - -class DirNode(NodeBase): - def __init__(self, id : str, name : str, fatherId : str): - super().__init__(id, name, fatherId) - self.children_id : list[str] = [] - -class FileNode(NodeBase): - def __init__(self, id : str, name : str, fatherId : str): - super().__init__(id, name, fatherId) - self.url : str = None - -class PikPakFileSystem: - #region 内部接口 - def __init__(self, auth_cache_path : str = None, proxy_address : str = None, root_id : str = None): - # 初始化虚拟文件节点 - self._nodes : Dict[str, NodeBase] = {} - self._root : DirNode = DirNode(root_id, "", None) - self._cwd : DirNode = self._root - - # 初始化鉴权和代理信息 - self._auth_cache_path : str = auth_cache_path - self.proxy_address : str = proxy_address - self._pikpak_client : PikPakApi = None - self._try_login_from_cache() - - - #region 鉴权信息相关 - class PikPakToken: - def __init__(self, username : str, password : str, access_token : str, refresh_token : str, user_id : str): - self.username : str = username - self.password : str = password - self.access_token : str = access_token - self.refresh_token : str = refresh_token - self.user_id : str = user_id - - def to_json(self): - return json.dumps(self.__dict__) - - @classmethod - def from_json(cls, json_str): - data = json.loads(json_str) - return cls(**data) - - def _init_client_by_token(self, token : PikPakToken) -> None: - self._init_client_by_username_and_password(token.username, token.password) - self._pikpak_client.access_token = token.access_token - self._pikpak_client.refresh_token = token.refresh_token - self._pikpak_client.user_id = token.user_id - self._pikpak_client.encode_token() - - def _init_client_by_username_and_password(self, username : str, password : str) -> None: - httpx_client_args : Dict[str, Any] = None - if self.proxy_address != None: - httpx_client_args = { - "proxy": self.proxy_address, - "transport": httpx.AsyncHTTPTransport() - } - - self._pikpak_client = PikPakApi( - username = username, - password = password, - httpx_client_args=httpx_client_args) - - def _try_login_from_cache(self) -> None: - if self._auth_cache_path is None: - return - if not os.path.exists(self._auth_cache_path): - return - try: - with open(self._auth_cache_path, 'r', encoding='utf-8') as file: - content : str = file.read() - token : PikPakFileSystem.PikPakToken = PikPakFileSystem.PikPakToken.from_json(content) - self._init_client_by_token(token) - logging.info("successfully load login info from cache") - except Exception as e: - logging.error(f"failed to load login info from cache, exception occurred: {e}") - - def _dump_login_info(self) -> None: - if self._auth_cache_path is None: - return - with open(self._auth_cache_path, 'w', encoding='utf-8') as file: - token : PikPakFileSystem.PikPakToken = PikPakFileSystem.PikPakToken(self._pikpak_client.username, self._pikpak_client.password, self._pikpak_client.access_token, self._pikpak_client.refresh_token, self._pikpak_client.user_id) - file.write(token.to_json()) - logging.info("successfully dump login info to cache") - - #endregion - - #region 文件系统相关 - class PathWalker(): - def __init__(self, path : str, sep : str = "/"): - self._path_spots : list[str] = [] - if not path.startswith(sep): - self._path_spots.append(".") - path_spots : list[str] = path.split(sep) - self._path_spots.extend(path_spots) - - def IsAbsolute(self) -> bool: - return len(self._path_spots) == 0 or self._path_spots[0] != "." - - def AppendSpot(self, spot) -> None: - self._path_spots.append(spot) - - def Walk(self) -> list[str]: - return self._path_spots - - async def _get_node_by_id(self, id : str) -> NodeBase: - if id == self._root.id: - return self._root - if id not in self._nodes: - return None - return self._nodes[id] - - async def _get_father_node(self, node : NodeBase) -> NodeBase: - if node is self._root: - return self._root - return await self._get_node_by_id(node._father_id) - - async def _add_node(self, node : NodeBase) -> None: - self._nodes[node.id] = node - father = await self._get_father_node(node) - if father is not None and isinstance(father, DirNode): - father.children_id.append(node.id) - - async def _remove_node(self, node : NodeBase) -> None: - father = await self._get_father_node(node) - if father is not None and isinstance(father, DirNode): - father.children_id.remove(node.id) - self._nodes.pop(node.id) - - async def _find_child_in_dir_by_name(self, dir : DirNode, name : str) -> NodeBase: - if dir is self._root and name == "": - return self._root - for child_id in dir.children_id: - node = await self._get_node_by_id(child_id) - if node.name == name: - return node - return None - - async def _refresh(self, node : NodeBase): - if isinstance(node, DirNode): - if node.lastUpdate != None: - return - next_page_token : str = None - children_info : list[Dict[str, Any]] = [] - while True: - dir_info : Dict[str, Any] = await self._pikpak_client.file_list(parent_id = node.id, next_page_token=next_page_token) - next_page_token = dir_info["next_page_token"] - children_info.extend(dir_info["files"]) - if next_page_token is None or next_page_token == "": - break - - node.children_id.clear() - for child_info in children_info: - id : str = child_info["id"] - name : str = child_info["name"] - - child : NodeBase = await self._get_node_by_id(id) - if child is None: - if child_info["kind"].endswith("folder"): - child = DirNode(id, name, node.id) - else: - child = FileNode(id, name, node.id) - child.name = name - child._father_id = node.id - await self._add_node(child) - elif isinstance(node, FileNode): - result = await self._pikpak_client.get_download_url(node.id) - node.url = result["web_content_link"] - - node.lastUpdate = datetime.now() - - async def _path_to_node(self, path : str) -> NodeBase: - father, son_name = await self._path_to_father_node_and_son_name(path) - if son_name == "": - return father - if isinstance(father, DirNode): - return await self._find_child_in_dir_by_name(father, son_name) - return None - - async def _path_to_father_node_and_son_name(self, path : str) -> tuple[NodeBase, str]: - path_walker : PikPakFileSystem.PathWalker = PikPakFileSystem.PathWalker(path) - father : NodeBase = None - son_name : str = None - current : NodeBase = self._root if path_walker.IsAbsolute() else self._cwd - - for spot in path_walker.Walk(): - if current is None: - father = None - break - if spot == "..": - current = await self._get_father_node(current) - continue - father = current - if not isinstance(current, DirNode): - current = None - continue - await self._refresh(current) - if spot == ".": - continue - sonName = spot - current = await self._find_child_in_dir_by_name(current, spot) - - if current != None: - father = await self._get_father_node(current) - sonName = current.name - - return father, sonName - - async def _node_to_path(self, node : NodeBase, root : NodeBase = None) -> str: - if root is None: - root = self._root - if node is root: - return "/" - spots : list[str] = [] - current = node - while current is not root: - spots.append(current.name) - current = await self._get_father_node(current) - spots.append("") - return "/".join(reversed(spots)) - - async def _is_ancestors_of(self, node_a : NodeBase, node_b : NodeBase) -> bool: - if node_b is node_a: - return False - if node_a is self._root: - return True - while node_b._father_id != self._root.id: - node_b = await self._get_father_node(node_b) - if node_b is node_a: - return True - return False - #endregion - - #endregion - - #region 对外接口 - async def Login(self, username : str = None, password : str = None) -> None: - if self._pikpak_client != None and username is None and password is None: - username = self._pikpak_client.username - password = self._pikpak_client.password - - if username == None and password == None: - raise Exception("Username and password are required") - - self._init_client_by_username_and_password(username, password) - await self._pikpak_client.login() - self._dump_login_info() - - async def IsDir(self, path : str) -> bool: - node = await self._path_to_node(path) - return isinstance(node, DirNode) - - async def SplitPath(self, path : str) -> tuple[str, str]: - father, son_name = await self._path_to_father_node_and_son_name(path) - return await self._node_to_path(father), son_name - - async def GetFileUrlByNodeId(self, node_id : str) -> str: - node = await self._get_node_by_id(node_id) - if not isinstance(node, FileNode): - return None - await self._refresh(node) - return node.url - - async def GetFileUrlByPath(self, path : str) -> str: - node = await self._path_to_node(path) - if not isinstance(node, FileNode): - return None - await self._refresh(node) - return node.url - - async def GetChildrenNames(self, path : str, ignore_files : bool) -> list[str]: - node = await self._path_to_node(path) - if not isinstance(node, DirNode): - return [] - await self._refresh(node) - children_names : list[str] = [] - for child_id in node.children_id: - child = await self._get_node_by_id(child_id) - if ignore_files and isinstance(child, FileNode): - continue - children_names.append(child.name) - return children_names - - async def Delete(self, paths : list[str]) -> None: - nodes = [await self._path_to_node(path) for path in paths] - for node in nodes: - if await self._is_ancestors_of(node, self._cwd): - raise Exception("Cannot delete ancestors") - await self._pikpak_client.delete_to_trash([node.id for node in nodes]) - for node in nodes: - await self._remove_node(node) - - async def MakeDir(self, path : str) -> None: - father, son_name = await self._path_to_father_node_and_son_name(path) - result = await self._pikpak_client.create_folder(son_name, father.id) - id = result["file"]["id"] - name = result["file"]["name"] - son = DirNode(id, name, father.id) - await self._add_node(son) - - async def SetCwd(self, path : str) -> None: - node = await self._path_to_node(path) - if not isinstance(node, DirNode): - raise Exception("Not a directory") - self._cwd = node - - async def GetCwd(self) -> str: - return await self._node_to_path(self._cwd) - - async def GetChildren(self, node : NodeBase) -> list[NodeBase]: - if not isinstance(node, DirNode): - return [] - await self._refresh(node) - return [await self._get_node_by_id(child_id) for child_id in node.children_id] - - async def PathToNode(self, path : str) -> NodeBase: - node = await self._path_to_node(path) - if node is None: - return None - return node - - async def NodeToPath(self, from_node : NodeBase, to_node : NodeBase) -> str: - return await self._node_to_path(to_node, from_node) - - async def RemoteDownload(self, torrent : str, remote_base_path : str) -> tuple[str, str]: - node = await self._path_to_node(remote_base_path) - info = await self._pikpak_client.offline_download(torrent, node.id) - return info["task"]["file_id"], info["task"]["id"] - - async def QueryTaskStatus(self, task_id : str, node_id : str) -> DownloadStatus: - return await self._pikpak_client.get_task_status(task_id, node_id) - - async def UpdateNode(self, node_id : str) -> NodeBase: - node : NodeBase = await self._get_node_by_id(node_id) - if node is None: - info = await self._pikpak_client.offline_file_info(node_id) - kind = info["kind"] - parent_id = info["parent_id"] - name = info["name"] - if kind.endswith("folder"): - node = DirNode(node_id, name, parent_id) - else: - node = FileNode(node_id, name, parent_id) - await self._add_node(node) - node.lastUpdate = None - return node - - #endregion \ No newline at end of file diff --git a/REPL.py b/REPL.py new file mode 100644 index 0000000..00a6518 --- /dev/null +++ b/REPL.py @@ -0,0 +1,92 @@ +import asyncio, nest_asyncio +import cmd2 +from functools import wraps +import logging +import threading +import types + +MainLoop : asyncio.AbstractEventLoop = None + +class RunSync: + _current_task : asyncio.Task = None + + def StopCurrentRunningCoroutine(): + if RunSync._current_task is not None: + RunSync._current_task.cancel() + + def __init__(self, func): + wraps(func)(self) + + def __call__(self, *args, **kwargs): + currentLoop = None + try: + currentLoop = asyncio.get_running_loop() + except RuntimeError: + logging.error("Not in an event loop") + pass + func = self.__wrapped__ + if currentLoop is MainLoop: + task = asyncio.Task(func(*args, **kwargs)) + RunSync._current_task = task + result = MainLoop.run_until_complete(task) + RunSync._current_task = None + return result + else: + return asyncio.run_coroutine_threadsafe(func(*args, **kwargs), MainLoop).result() + + def __get__(self, instance, cls): + if instance is None: + return self + else: + return types.MethodType(self, instance) + +class REPLApp(cmd2.Cmd): + def _io_worker(self, loop): + asyncio.set_event_loop(loop) + loop.run_forever() + + async def input(self, prompt): + async def _input(prompt): + return self._read_command_line(prompt) + future = asyncio.run_coroutine_threadsafe(_input(prompt), self.ioLoop) + return await asyncio.wrap_future(future) + + async def print(self, *args, **kwargs): + async def _print(*args, **kwargs): + print(*args, **kwargs) + future = asyncio.run_coroutine_threadsafe(_print(*args, **kwargs), self.ioLoop) + await asyncio.wrap_future(future) + + def __init__(self): + nest_asyncio.apply() + global MainLoop + MainLoop = asyncio.get_running_loop() + super().__init__() + + def preloop(self): + # 1. 设置忽略SIGINT + import signal + def signal_handler(sig, frame): + RunSync.StopCurrentRunningCoroutine() + signal.signal(signal.SIGINT, signal_handler) + + # 2. 创建IO线程处理输入输出 + self.ioLoop = asyncio.new_event_loop() + self.ioThread = threading.Thread(target=self._io_worker, args=(self.ioLoop,)) + self.ioThread.start() + + # 3. 设置console + self.saved_readline_settings = None + with self.sigint_protection: + self.saved_readline_settings = self._set_up_cmd2_readline() + + def postloop(self): + # 1. 还原console设置 + with self.sigint_protection: + if self.saved_readline_settings is not None: + self._restore_readline(self.saved_readline_settings) + + # 2. 停止IO线程 + # https://stackoverflow.com/questions/51642267/asyncio-how-do-you-use-run-forever + self.ioLoop.call_soon_threadsafe(self.ioLoop.stop) + self.ioThread.join() \ No newline at end of file diff --git a/TaskManager.py b/TaskManager.py deleted file mode 100644 index ba28da6..0000000 --- a/TaskManager.py +++ /dev/null @@ -1,366 +0,0 @@ -from enum import Enum -from typing import Awaitable, Callable, Dict -import asyncio -import logging -import shortuuid -from PikPakFileSystem import PikPakFileSystem, FileNode, DirNode -from aria2helper import Aria2Status, addUri, tellStatus, pause, unpause -from pikpakapi import DownloadStatus -import random -import pickle - -DB_PATH = "task.db" - -class TaskStatus(Enum): - PENDING = "pending" - RUNNING = "running" - DONE = "done" - ERROR = "error" - PAUSED = "paused" - -class TorrentTaskStatus(Enum): - PENDING = "pending" - REMOTE_DOWNLOADING = "remote" - LOCAL_DOWNLOADING = "local" - DONE = "done" - -class FileDownloadTaskStatus(Enum): - PENDING = "pending" - DOWNLOADING = "downloading" - DONE = "done" - -class TaskBase: - TAG = "" - MAX_CONCURRENT_NUMBER = 5 - - def __init__(self): - self.id : str = shortuuid.uuid() - self.status : TaskStatus = TaskStatus.PENDING - self.worker : asyncio.Task = None - self.handler : Callable[..., Awaitable] = None - - def Resume(self): - if self.status in {TaskStatus.PAUSED, TaskStatus.ERROR}: - self.status = TaskStatus.PENDING - - def __getstate__(self): - state = self.__dict__.copy() - if 'handler' in state: - del state['handler'] - if 'worker' in state: - del state['worker'] - return state - - def __setstate__(self, state): - self.__dict__.update(state) - self.worker = None - self.handler = None - - -class TorrentTask(TaskBase): - TAG = "TorrentTask" - MAX_CONCURRENT_NUMBER = 5 - - def __init__(self, torrent : str): - super().__init__() - self.torrent_status : TorrentTaskStatus = TorrentTaskStatus.PENDING - self.torrent : str = torrent - self.info : str = "" - self.name : str = "" - - # 和PikPak交互需要的信息 - self.remote_base_path : str = None - self.node_id : str = None - self.task_id : str = None - -class FileDownloadTask(TaskBase): - TAG = "FileDownloadTask" - MAX_CONCURRENT_NUMBER = 5 - - def __init__(self, node_id : str, remote_path : str, owner_id : str): - super().__init__() - self.file_download_status : FileDownloadTaskStatus = FileDownloadTaskStatus.PENDING - self.node_id : str = node_id - self.remote_path : str = remote_path - self.owner_id : str = owner_id - self.gid : str = None - self.url : str = None - -async def TaskWorker(task : TaskBase): - try: - if task.status != TaskStatus.PENDING: - return - task.status = TaskStatus.RUNNING - await task.handler(task) - task.status = TaskStatus.DONE - except asyncio.CancelledError: - task.status = TaskStatus.PAUSED - except Exception as e: - logging.error(f"task failed, exception occurred: {e}") - task.status = TaskStatus.ERROR - -class TaskManager: - #region 内部实现 - def __init__(self, client : PikPakFileSystem): - self.taskQueues : Dict[str, list[TaskBase]] = {} - self.loop : asyncio.Task = None - self.client = client - - async def _loop(self): - while True: - try: - await asyncio.sleep(0.5) - for taskQueue in self.taskQueues.values(): - notRunningTasks = [task for task in taskQueue if task.worker is None or task.worker.done()] - runningTasksNumber = len(taskQueue) - len(notRunningTasks) - for task in [task for task in notRunningTasks if task.status == TaskStatus.PENDING]: - if runningTasksNumber >= task.MAX_CONCURRENT_NUMBER: - break - task.worker = asyncio.create_task(TaskWorker(task)) - runningTasksNumber += 1 - except Exception as e: - logging.error(f"task loop failed, exception occurred: {e}") - - async def _get_task_by_id(self, task_id : str) -> TaskBase: - for queue in self.taskQueues.values(): - for task in queue: - if task.id == task_id: - return task - return None - - #region 远程下载部分 - - async def _append_task(self, task : TaskBase): - queue = self.taskQueues.get(task.TAG, []) - queue.append(task) - self.taskQueues[task.TAG] = queue - - async def _get_torrent_queue(self): - if TorrentTask.TAG not in self.taskQueues: - self.taskQueues[TorrentTask.TAG] = [] - return self.taskQueues[TorrentTask.TAG] - - async def _get_file_download_queue(self, owner_id : str): - if FileDownloadTask.TAG not in self.taskQueues: - self.taskQueues[FileDownloadTask.TAG] = [] - queue = self.taskQueues[FileDownloadTask.TAG] - return [task for task in queue if task.owner_id == owner_id] - - async def _on_torrent_task_pending(self, task : TorrentTask): - task.node_id, task.task_id = await self.client.RemoteDownload(task.torrent, task.remote_base_path) - task.torrent_status = TorrentTaskStatus.REMOTE_DOWNLOADING - - async def _on_torrent_task_offline_downloading(self, task : TorrentTask): - wait_seconds = 3 - while True: - status = await self.client.QueryTaskStatus(task.task_id, task.node_id) - if status in {DownloadStatus.not_found, DownloadStatus.not_downloading, DownloadStatus.error}: - task.torrent_status = TorrentTaskStatus.PENDING - raise Exception(f"remote download failed, status: {status}") - elif status == DownloadStatus.done: - break - await asyncio.sleep(wait_seconds) - wait_seconds = wait_seconds * 1.5 - - task.torrent_status = TorrentTaskStatus.LOCAL_DOWNLOADING - - async def _on_torrent_local_downloading(self, task : TorrentTask): - node = await self.client.UpdateNode(task.node_id) - task.name = node.name - task.node_id = node.id - - if isinstance(node, FileNode): - await self._init_file_download_task(task.node_id, task.name, task.id) - elif isinstance(node, DirNode): - # 使用广度优先遍历 - queue : list[str] = [node] - while len(queue) > 0: - current = queue.pop(0) - for child in await self.client.GetChildren(current): - if isinstance(child, DirNode): - queue.append(child) - if isinstance(child, FileNode): - child_path = task.name + await self.client.NodeToPath(node, child) - await self._init_file_download_task(child.id, child_path, task.id) - else: - raise Exception("unknown node type") - - # 开始等待下载任务完成 - while True: - file_download_tasks = await self._get_file_download_queue(task.id) - all_number = len(file_download_tasks) - not_completed_number = 0 - paused_number = 0 - error_number = 0 - for file_download_task in file_download_tasks: - if file_download_task.status == TaskStatus.PAUSED: - paused_number += 1 - if file_download_task.status == TaskStatus.ERROR: - error_number += 1 - if file_download_task.status in {TaskStatus.PENDING, TaskStatus.RUNNING}: - not_completed_number += 1 - - running_number = all_number - not_completed_number - paused_number - error_number - task.info = f"{running_number}/{all_number} ({paused_number}|{error_number})" - - if not_completed_number > 0: - await asyncio.sleep(0.5) - continue - if error_number > 0: - raise Exception("file download failed") - if paused_number > 0: - raise asyncio.CancelledError() - break - - task.torrent_status = TorrentTaskStatus.DONE - - async def _on_torrent_task_cancelled(self, task : TorrentTask): - file_download_tasks = await self._get_file_download_queue(task.id) - for file_download_task in file_download_tasks: - if file_download_task.worker is not None: - file_download_task.worker.cancel() - - async def _torrent_task_handler(self, task : TorrentTask): - try: - while True: - if task.torrent_status == TorrentTaskStatus.PENDING: - await self._on_torrent_task_pending(task) - elif task.torrent_status == TorrentTaskStatus.REMOTE_DOWNLOADING: - await self._on_torrent_task_offline_downloading(task) - elif task.torrent_status == TorrentTaskStatus.LOCAL_DOWNLOADING: - await self._on_torrent_local_downloading(task) - else: - break - except asyncio.CancelledError: - await self._on_torrent_task_cancelled(task) - raise - #endregion - - - #region 文件下载部分 - async def _init_file_download_task(self, node_id : str, remote_path : str, owner_id : str) -> str: - queue = await self._get_file_download_queue(owner_id) - for task in queue: - if not isinstance(task, FileDownloadTask): - continue - if task.node_id == node_id: - if task.status in {TaskStatus.PAUSED, TaskStatus.ERROR}: - task.status = TaskStatus.PENDING - return task.id - task = FileDownloadTask(node_id, remote_path, owner_id) - task.handler = self._file_download_task_handler - await self._append_task(task) - return task.id - - async def _on_file_download_task_pending(self, task : FileDownloadTask): - task.url = await self.client.GetFileUrlByNodeId(task.node_id) - task.gid = await addUri(task.url, task.remote_path) - task.file_download_status = FileDownloadTaskStatus.DOWNLOADING - - async def _on_file_download_task_downloading(self, task : FileDownloadTask): - wait_seconds = 3 - while True: - status = await tellStatus(task.gid) - if status in {Aria2Status.REMOVED, Aria2Status.ERROR}: - task.file_download_status = FileDownloadTaskStatus.PENDING - raise Exception("failed to query status") - elif status == Aria2Status.PAUSED: - await unpause(task.gid) - elif status == Aria2Status.COMPLETE: - break - await asyncio.sleep(wait_seconds) - task.file_download_status = FileDownloadTaskStatus.DONE - - async def _file_download_task_handler(self, task : FileDownloadTask): - try: - while True: - if task.file_download_status == FileDownloadTaskStatus.PENDING: - await self._on_file_download_task_pending(task) - elif task.file_download_status == FileDownloadTaskStatus.DOWNLOADING: - await self._on_file_download_task_downloading(task) - else: - break - except asyncio.CancelledError: - gid = task.gid - if gid is not None: - await pause(gid) - raise - - #endregion - - def _load_tasks_from_db(self): - try: - self.taskQueues = pickle.load(open(DB_PATH, "rb")) - for queue in self.taskQueues.values(): - for task in queue: - if task.status == TaskStatus.RUNNING: - task.status = TaskStatus.PENDING - if isinstance(task, TorrentTask): - task.handler = self._torrent_task_handler - task.info = "" - if isinstance(task, FileDownloadTask): - task.handler = self._file_download_task_handler - except: - pass - - def _dump_tasks_to_db(self): - pickle.dump(self.taskQueues, open(DB_PATH, "wb")) - - #endregion - - #region 对外接口 - - def Start(self): - self._load_tasks_from_db() - if self.loop is None: - self.loop = asyncio.create_task(self._loop()) - - def Stop(self): - if self.loop is not None: - self.loop.cancel() - self.loop = None - self._dump_tasks_to_db() - - - async def CreateTorrentTask(self, torrent : str, remote_base_path : str) -> str: - task = TorrentTask(torrent) - task.remote_base_path = remote_base_path - task.handler = self._torrent_task_handler - await self._append_task(task) - return task.id - - async def PullRemote(self, path : str) -> str: - target = await self.client.PathToNode(path) - if target is None: - raise Exception("target not found") - queue = await self._get_torrent_queue() - for task in queue: - if not isinstance(task, TorrentTask): - continue - if task.node_id == target.id: - return task.id - task = TorrentTask(None) - task.name = target.name - task.node_id = target.id - task.handler = self._torrent_task_handler - task.torrent_status = TorrentTaskStatus.LOCAL_DOWNLOADING - await self._append_task(task) - return task.id - - async def QueryTasks(self, tag : str, filter_status : TaskStatus = None): - queue = self.taskQueues.get(tag, []) - if filter_status is None: - return queue - return [task for task in queue if task.status == filter_status] - - async def StopTask(self, task_id : str): - task = await self._get_task_by_id(task_id) - if task is not None and task.worker is not None: - task.worker.cancel() - - async def ResumeTask(self, task_id : str): - task = await self._get_task_by_id(task_id) - if task is not None: - task.Resume() - - #endregion diff --git a/app.py b/app.py new file mode 100644 index 0000000..9fd7a22 --- /dev/null +++ b/app.py @@ -0,0 +1,43 @@ +import asyncio +from REPL import REPLApp, RunSync, cmd2 + +class MyApp(REPLApp): + @RunSync + async def do_guess_number(self, args): + """ + play guess number game + """ + import random + number = random.randint(1, 100) + await self.print("Guess a number between 1 and 100") + while True: + guess = await self.input("Your guess: ") + if not guess.isdigit(): + await self.print("Please enter a valid number.") + continue + guess = int(guess) + if guess < number: + await self.print("Too low!") + elif guess > number: + await self.print("Too high!") + else: + await self.print("Congratulations! You guessed the number.") + break + + +async def mainLoop(): + app = MyApp() + app.preloop() + try: + stop = False + while not stop: + line = await app.input(app.prompt) + try: + stop = app.onecmd_plus_hooks(line) + except asyncio.CancelledError: + await app.print("^C: Task cancelled") + finally: + app.postloop() + +if __name__ == "__main__": + asyncio.run(mainLoop()) \ No newline at end of file diff --git a/aria2helper.py b/aria2helper.py deleted file mode 100644 index 27b036f..0000000 --- a/aria2helper.py +++ /dev/null @@ -1,72 +0,0 @@ -import httpx, json -from enum import Enum - -class Aria2Status(Enum): - ACTIVE = "active" - WAITING = "waiting" - PAUSED = "paused" - ERROR = "error" - COMPLETE = "complete" - REMOVED = "removed" - -ARIA_ADDRESS = "http://100.96.0.2:6800/jsonrpc" -ARIA_SECRET = "jfaieofjosiefjoiaesjfoiasejf" -BASE_PATH = "/downloads" - -client = httpx.AsyncClient() - -async def addUri(uri, path): - jsonreq = json.dumps({ - "jsonrpc" : "2.0", - "id" : "pikpak", - "method" : "aria2.addUri", - "params" : [ f"token:{ARIA_SECRET}", [uri], - { - "dir" : BASE_PATH, - "out" : path - }] - }) - response = await client.post(ARIA_ADDRESS, data=jsonreq) - result = json.loads(response.text) - return result["result"] - - -async def tellStatus(gid) -> Aria2Status: - jsonreq = json.dumps({ - "jsonrpc" : "2.0", - "id" : "pikpak", - "method" : "aria2.tellStatus", - "params" : [ f"token:{ARIA_SECRET}", gid] - }) - response = await client.post(ARIA_ADDRESS, data=jsonreq) - result = json.loads(response.text) - if "error" in result: - return Aria2Status.REMOVED - return Aria2Status(result["result"]["status"]) - -async def pause(gid): - jsonreq = json.dumps({ - "jsonrpc" : "2.0", - "id" : "pikpak", - "method" : "aria2.pause", - "params" : [ f"token:{ARIA_SECRET}", gid] - }) - await client.post(ARIA_ADDRESS, data=jsonreq) - -async def unpause(gid): - jsonreq = json.dumps({ - "jsonrpc" : "2.0", - "id" : "pikpak", - "method" : "aria2.unpause", - "params" : [ f"token:{ARIA_SECRET}", gid] - }) - await client.post(ARIA_ADDRESS, data=jsonreq) - -async def remove(gid): - jsonreq = json.dumps({ - "jsonrpc" : "2.0", - "id" : "pikpak", - "method" : "aria2.remove", - "params" : [ f"token:{ARIA_SECRET}", gid] - }) - await client.post(ARIA_ADDRESS, data=jsonreq) \ No newline at end of file diff --git a/main.py b/main.py deleted file mode 100644 index 9b24205..0000000 --- a/main.py +++ /dev/null @@ -1,352 +0,0 @@ -import asyncio, nest_asyncio -import cmd2 -from functools import wraps -import logging -import threading -import colorlog -from PikPakFileSystem import PikPakFileSystem -import os -from tabulate import tabulate -import types -from TaskManager import TaskManager, TaskStatus, TorrentTask, FileDownloadTask - -LogFormatter = colorlog.ColoredFormatter( - "%(log_color)s%(asctime)s - %(levelname)s - %(name)s - %(message)s", - datefmt='%Y-%m-%d %H:%M:%S', - reset=True, - log_colors={ - 'DEBUG': 'cyan', - 'INFO': 'green', - 'WARNING': 'yellow', - 'ERROR': 'red', - 'CRITICAL': 'red,bg_white', - } - ) - -def setup_logging(): - file_handler = logging.FileHandler('app.log') - file_handler.setFormatter(LogFormatter) - file_handler.setLevel(logging.DEBUG) - - logger = logging.getLogger() - logger.addHandler(file_handler) - logger.setLevel(logging.DEBUG) - -setup_logging() -MainLoop : asyncio.AbstractEventLoop = None -Client = PikPakFileSystem(auth_cache_path = "token.json", proxy_address="http://127.0.0.1:7897") - -class RunSync: - _current_task : asyncio.Task = None - - def StopCurrentRunningCoroutine(): - if RunSync._current_task is not None: - RunSync._current_task.cancel() - - def __init__(self, func): - wraps(func)(self) - - def __call__(self, *args, **kwargs): - currentLoop = None - try: - currentLoop = asyncio.get_running_loop() - except RuntimeError: - logging.error("Not in an event loop") - pass - func = self.__wrapped__ - if currentLoop is MainLoop: - task = asyncio.Task(func(*args, **kwargs)) - RunSync._current_task = task - result = MainLoop.run_until_complete(task) - RunSync._current_task = None - return result - else: - return asyncio.run_coroutine_threadsafe(func(*args, **kwargs), MainLoop).result() - - def __get__(self, instance, cls): - if instance is None: - return self - else: - return types.MethodType(self, instance) - -class App(cmd2.Cmd): - #region Console设置 - def _io_worker(self, loop): - asyncio.set_event_loop(loop) - loop.run_forever() - - async def input(self, prompt): - async def _input(prompt): - return self._read_command_line(prompt) - future = asyncio.run_coroutine_threadsafe(_input(prompt), self.ioLoop) - return await asyncio.wrap_future(future) - - async def print(self, *args, **kwargs): - async def _print(*args, **kwargs): - print(*args, **kwargs) - future = asyncio.run_coroutine_threadsafe(_print(*args, **kwargs), self.ioLoop) - await asyncio.wrap_future(future) - - def __init__(self): - super().__init__() - self.log_handler = logging.StreamHandler() - self.log_handler.setFormatter(LogFormatter) - self.log_handler.setLevel(logging.CRITICAL) - logging.getLogger().addHandler(self.log_handler) - - self.task_manager = TaskManager(Client) - - def preloop(self): - # 1. 设置忽略SIGINT - import signal - def signal_handler(sig, frame): - RunSync.StopCurrentRunningCoroutine() - signal.signal(signal.SIGINT, signal_handler) - - # 2. 创建IO线程处理输入输出 - self.ioLoop = asyncio.new_event_loop() - self.ioThread = threading.Thread(target=self._io_worker, args=(self.ioLoop,)) - self.ioThread.start() - - # 3. 设置console - self.saved_readline_settings = None - with self.sigint_protection: - self.saved_readline_settings = self._set_up_cmd2_readline() - - # 4. 启动任务管理器 - self.task_manager.Start() - - def postloop(self): - # 1. 还原console设置 - with self.sigint_protection: - if self.saved_readline_settings is not None: - self._restore_readline(self.saved_readline_settings) - - # 2. 停止IO线程 - # https://stackoverflow.com/questions/51642267/asyncio-how-do-you-use-run-forever - self.ioLoop.call_soon_threadsafe(self.ioLoop.stop) - self.ioThread.join() - - # 3. 停止任务管理器 - self.task_manager.Stop() - - #endregion - - #region 所有命令 - def do_logging_off(self, args): - """ - Disable logging - """ - self.log_handler.setLevel(logging.CRITICAL) - logging.critical("Logging disabled") - - def do_logging_debug(self, args): - """ - Enable debug mode - """ - self.log_handler.setLevel(logging.DEBUG) - logging.debug("Debug mode enabled") - - def do_logging_info(self, args): - """ - Enable info mode - """ - self.log_handler.setLevel(logging.INFO) - logging.info("Info mode enabled") - - login_parser = cmd2.Cmd2ArgumentParser() - login_parser.add_argument("username", help="username", nargs="?") - login_parser.add_argument("password", help="password", nargs="?") - @cmd2.with_argparser(login_parser) - @RunSync - async def do_login(self, args): - """ - Login to pikpak - """ - await Client.Login(args.username, args.password) - await self.print("Logged in successfully") - - async def _path_completer(self, text, line, begidx, endidx, ignore_files): - father_path, son_name = await Client.SplitPath(text) - children_names = await Client.GetChildrenNames(father_path, ignore_files) - matches : list[str] = [] - for child_name in children_names: - if child_name.startswith(son_name): - self.display_matches.append(child_name) - if son_name == "": - matches.append(text + child_name) - elif text.endswith(son_name): - matches.append(text[:text.rfind(son_name)] + child_name) - if len(matches) == 1 and await Client.IsDir(matches[0]): - if matches[0].endswith(son_name): - matches[0] += "/" - self.allow_appended_space = False - self.allow_closing_quote = False - return matches - - @RunSync - async def complete_ls(self, text, line, begidx, endidx): - return await self._path_completer(text, line, begidx, endidx, False) - - ls_parser = cmd2.Cmd2ArgumentParser() - ls_parser.add_argument("path", help="path", default="", nargs="?") - @cmd2.with_argparser(ls_parser) - @RunSync - async def do_ls(self, args): - """ - List files in a directory - """ - if await Client.IsDir(args.path): - for child_name in await Client.GetChildrenNames(args.path, False): - await self.print(child_name) - else: - await self.print(await Client.GetFileUrlByPath(args.path)) - - @RunSync - async def complete_cd(self, text, line, begidx, endidx): - return await self._path_completer(text, line, begidx, endidx, True) - - cd_parser = cmd2.Cmd2ArgumentParser() - cd_parser.add_argument("path", help="path", default="", nargs="?") - @cmd2.with_argparser(cd_parser) - @RunSync - async def do_cd(self, args): - """ - Change directory - """ - await Client.SetCwd(args.path) - - @RunSync - async def do_cwd(self, args): - """ - Print current working directory - """ - await self.print(await Client.GetCwd()) - - def do_clear(self, args): - """ - Clear the terminal screen - """ - os.system('cls' if os.name == 'nt' else 'clear') - - @RunSync - async def complete_rm(self, text, line, begidx, endidx): - return await self._path_completer(text, line, begidx, endidx, False) - - rm_parser = cmd2.Cmd2ArgumentParser() - rm_parser.add_argument("paths", help="paths", default="", nargs="+") - @cmd2.with_argparser(rm_parser) - @RunSync - async def do_rm(self, args): - """ - Remove a file or directory - """ - await Client.Delete(args.paths) - - @RunSync - async def complete_mkdir(self, text, line, begidx, endidx): - return await self._path_completer(text, line, begidx, endidx, True) - - mkdir_parser = cmd2.Cmd2ArgumentParser() - mkdir_parser.add_argument("path", help="new directory path") - @cmd2.with_argparser(mkdir_parser) - @RunSync - async def do_mkdir(self, args): - """ - Create a directory - """ - await Client.MakeDir(args.path) - - download_parser = cmd2.Cmd2ArgumentParser() - download_parser.add_argument("torrent", help="torrent") - @cmd2.with_argparser(download_parser) - @RunSync - async def do_download(self, args): - """ - Download a file or directory - """ - task_id = await self.task_manager.CreateTorrentTask(args.torrent, await Client.GetCwd()) - await self.print(f"Task {task_id} created") - - @RunSync - async def complete_pull(self, text, line, begidx, endidx): - return await self._path_completer(text, line, begidx, endidx, False) - - pull_parser = cmd2.Cmd2ArgumentParser() - pull_parser.add_argument("target", help="pull target") - @cmd2.with_argparser(pull_parser) - @RunSync - async def do_pull(self, args): - """ - Pull a file or directory - """ - task_id = await self.task_manager.PullRemote(args.target) - await self.print(f"Task {task_id} created") - - - query_parser = cmd2.Cmd2ArgumentParser() - query_parser.add_argument("-t", "--type", help="type", nargs="?", choices=["torrent", "file"], default="torrent") - query_parser.add_argument("-f", "--filter", help="filter", nargs="?", choices=[member.value for member in TaskStatus]) - @cmd2.with_argparser(query_parser) - @RunSync - async def do_query(self, args): - """ - Query All Tasks - """ - filter_status = TaskStatus(args.filter) if args.filter is not None else None - if args.type == "torrent": - tasks = await self.task_manager.QueryTasks(TorrentTask.TAG, filter_status) - # 格式化输出所有task信息id,status,lastStatus的信息,输出表格 - table = [[task.id, task.status.value, task.torrent_status.value, task.info] for task in tasks if isinstance(task, TorrentTask)] - headers = ["id", "status", "details", "progress"] - await self.print(tabulate(table, headers, tablefmt="grid")) - elif args.type == "file": - tasks = await self.task_manager.QueryTasks(FileDownloadTask.TAG, filter_status) - table = [[task.id, task.status.value, task.file_download_status, task.remote_path] for task in tasks if isinstance(task, FileDownloadTask)] - headers = ["id", "status", "details", "remote_path"] - await self.print(tabulate(table, headers, tablefmt="grid")) - - taskid_parser = cmd2.Cmd2ArgumentParser() - taskid_parser.add_argument("task_id", help="task id") - - @cmd2.with_argparser(taskid_parser) - @RunSync - async def do_pause(self, args): - """ - Stop a task - """ - await self.task_manager.StopTask(args.task_id) - - @cmd2.with_argparser(taskid_parser) - @RunSync - async def do_resume(self, args): - """ - Resume a task - """ - await self.task_manager.ResumeTask(args.task_id) - - #endregion - - -#region APP入口 -async def mainLoop(): - global MainLoop - MainLoop = asyncio.get_running_loop() - app = App() - - app.preloop() - try: - stop = False - while not stop: - line = await app.input(app.prompt) - try: - stop = app.onecmd_plus_hooks(line) - except asyncio.CancelledError: - await app.print("^C: Task cancelled") - finally: - app.postloop() - -if __name__ == "__main__": - nest_asyncio.apply() - asyncio.run(mainLoop()) -#endregion \ No newline at end of file diff --git a/readme.md b/readme.md index 6234ad4..49916af 100644 --- a/readme.md +++ b/readme.md @@ -1,167 +1,8 @@ -在Pikpak Api基础上套了一层文件系统,更好自动化离线下载 +在CMD2的基础上实现对异步函数的支持 -运行: python main.py +### 用法 -Todo: +继承REPLApp,然后对于移除函数添加@RunSync即可 -- [x] 实现自定义根路径 -- [x] 异步输出频率过高会导致卡死,似乎会多创建一个线程 -- [x] 实现Task队列管理 -- [x] 自动刷新文件系统缓存 -- [x] 分析以下方法的返回值:offline_file_info、offline_list -- [x] 持久化数据 -- [x] 实现本地下载队列(多文件,文件夹) -- [x] 实现任务暂停、继续、恢复 -- [ ] 接口分离,前后端分离 -- [ ] 添加测试用例 -- [ ] 完全类型化 - - -### 协议结构 -1. offline_download -```json -{ - "upload_type": "UPLOAD_TYPE_URL", - "url": { - "kind": "upload#url" - }, - "task": { - "kind": "drive#task", - "id": "VOAMocJorcA09bRr-3bEDUbYo1", - "name": "[FLsnow][Genshiken-2daime][BDRip]", - "type": "offline", - "user_id": "ZEBRT8Wc1IzU1rfZ", - "statuses": [], - "status_size": 56, - "params": { - "predict_speed": "73300775185", - "predict_type": "3" - }, - "file_id": "VOAMocKArcA09bRr-3bEDUbZo1", - "file_name": "[FLsnow][Genshiken-2daime][BDRip]", - "file_size": "29071069771", - "message": "Saving", - "created_time": "2024-10-29T18:29:11.092+08:00", - "updated_time": "2024-10-29T18:29:11.092+08:00", - "third_task_id": "", - "phase": "PHASE_TYPE_RUNNING", - "progress": 0, - "icon_link": "", - "callback": "", - "space": "" - } -} -``` - -2. offline_file_info -```json -{ - "kind": "drive#folder", - "id": "VOAMocKArcA09bRr-3bEDUbZo1", - "parent_id": "VNTQEPvYTRlbqP1pB2YGZorwo1", - "name": "[FLsnow][Genshiken-2daime][BDRip](1)", - "user_id": "ZEBRT8Wc1IzU1rfZ", - "size": "0", - "revision": "0", - "file_extension": "", - "mime_type": "", - "starred": false, - "web_content_link": "", - "created_time": "2024-10-29T18:29:13.251+08:00", - "modified_time": "2024-10-29T18:29:13.251+08:00", - "icon_link": "https://static.mypikpak.com/7d6933d5cde34f200366685cba0cbc4592cfd363", - "thumbnail_link": "https://sg-thumbnail-drive.mypikpak.com/v0/screenshot-thumbnails/788AB60820B162FD988606CE988FBC40B8C6EA8D/720/2048", - "md5_checksum": "", - "hash": "", - "links": {}, - "phase": "PHASE_TYPE_COMPLETE", - "audit": { - "status": "STATUS_OK", - "message": "Normal resource", - "title": "" - }, - "medias": [], - "trashed": false, - "delete_time": "", - "original_url": "", - "params": { - "platform_icon": "https://static.mypikpak.com/21ecdc2c6b2372cdee91b193df9a6248b885a1b0", - "small_thumbnail": "https://sg-thumbnail-drive.mypikpak.com/v0/screenshot-thumbnails/788AB60820B162FD988606CE988FBC40B8C6EA8D/240/720", - "url": "magnet:?xt=urn:btih:7c0e7e3e3828c22b49e903beefcee69ec2a4986e" - }, - "original_file_index": 0, - "space": "", - "apps": [], - "writable": true, - "folder_type": "NORMAL", - "sort_name": "", - "user_modified_time": "2024-10-29T18:29:13.251+08:00", - "spell_name": [], - "file_category": "OTHER", - "tags": [], - "reference_events": [] -} -``` - -3. offline_list -```json - -{ - "tasks": [ - { - "kind": "drive#task", - "id": "VOASrVEVIQmaCBjEu8Y1VDb7o1", - "name": "[LoliHouse] Mahoutsukai ni Narenakatta Onnanoko no Hanashi - 04 [WebRip 1080p HEVC-10bit AAC SRTx2].mkv", - "type": "offline", - "user_id": "ZEBRT8Wc1IzU1rfZ", - "statuses": [], - "status_size": 1, - "params": { - "age": "0", - "mime_type": "video/x-matroska", - "predict_speed": "73300775185", - "predict_type": "3", - "url": "magnet:?xt=urn:btih:02816d3bd51f9e3ac72c986cc65f3f7a2b201b5b" - }, - "file_id": "VOASrVFTIQmaCBjEu8Y1VDbAo1", - "file_name": "[LoliHouse] Mahoutsukai ni Narenakatta Onnanoko no Hanashi - 04 [WebRip 1080p HEVC-10bit AAC SRTx2].mkv", - "file_size": "726857457", - "message": "Saving", - "created_time": "2024-10-30T22:39:27.712+08:00", - "updated_time": "2024-10-30T22:39:27.712+08:00", - "third_task_id": "", - "phase": "PHASE_TYPE_RUNNING", - "progress": 90, - "icon_link": "https://static.mypikpak.com/39998a187e280e2ee9ceb5f58315a1bcc744fa64", - "callback": "", - "reference_resource": { - "@type": "type.googleapis.com/drive.ReferenceFile", - "kind": "drive#file", - "id": "VOASrVFTIQmaCBjEu8Y1VDbAo1", - "parent_id": "VNTQEPvYTRlbqP1pB2YGZorwo1", - "name": "[LoliHouse] Mahoutsukai ni Narenakatta Onnanoko no Hanashi - 04 [WebRip 1080p HEVC-10bit AAC SRTx2].mkv", - "size": "726857457", - "mime_type": "video/x-matroska", - "icon_link": "https://static.mypikpak.com/39998a187e280e2ee9ceb5f58315a1bcc744fa64", - "hash": "", - "phase": "PHASE_TYPE_RUNNING", - "thumbnail_link": "", - "params": {}, - "space": "", - "medias": [], - "starred": false, - "tags": [] - }, - "space": "" - } - ], - "next_page_token": "", - "expires_in": 3 -} - -{ - "tasks": [], - "next_page_token": "", - "expires_in": 10 -} -``` \ No newline at end of file +### 示例 +运行: python app.py