diff --git a/main.py b/main.py index c3fcbd1..ca3a963 100644 --- a/main.py +++ b/main.py @@ -4,12 +4,11 @@ from functools import wraps import logging import threading import colorlog -from pikpakFs import PKFs, IsDir, IsFile, PKTaskStatus +from PikPakFs import PikPakFs, IsDir, IsFile, TaskStatus import os -import json +import keyboard -def setup_logging(): - formatter = colorlog.ColoredFormatter( +LogFormatter = colorlog.ColoredFormatter( "%(log_color)s%(asctime)s - %(levelname)s - %(name)s - %(message)s", datefmt='%Y-%m-%d %H:%M:%S', reset=True, @@ -21,22 +20,19 @@ def setup_logging(): 'CRITICAL': 'red,bg_white', } ) - handler = logging.StreamHandler() - handler.setFormatter(formatter) - handler.setLevel(logging.INFO) +def setup_logging(): file_handler = logging.FileHandler('app.log') - file_handler.setFormatter(formatter) + file_handler.setFormatter(LogFormatter) file_handler.setLevel(logging.DEBUG) logger = logging.getLogger() - logger.addHandler(handler) logger.addHandler(file_handler) logger.setLevel(logging.DEBUG) setup_logging() MainLoop : asyncio.AbstractEventLoop = None -Client = PKFs("token.json", proxy="http://127.0.0.1:7897") +Client = PikPakFs("token.json", proxy="http://127.0.0.1:7897") def RunSync(func): @wraps(func) @@ -57,20 +53,24 @@ class Console(cmd2.Cmd): asyncio.set_event_loop(loop) loop.run_forever() - async def AsyncInput(self, prompt): + async def Input(self, prompt): async def _input(prompt): return self._read_command_line(prompt) - future = asyncio.run_coroutine_threadsafe(_input(prompt), self.inputLoop) + future = asyncio.run_coroutine_threadsafe(_input(prompt), self.ioLoop) return await asyncio.wrap_future(future) - async def AsyncPrint(self, *args, **kwargs): + async def Print(self, *args, **kwargs): async def _print(*args, **kwargs): print(*args, **kwargs) - future = asyncio.run_coroutine_threadsafe(_print(*args, **kwargs), self.outputLoop) + future = asyncio.run_coroutine_threadsafe(_print(*args, **kwargs), self.ioLoop) await asyncio.wrap_future(future) def __init__(self): super().__init__() + self.log_handler = logging.StreamHandler() + self.log_handler.setFormatter(LogFormatter) + self.log_handler.setLevel(logging.CRITICAL) + logging.getLogger().addHandler(self.log_handler) def preloop(self): # 1. 设置忽略SIGINT @@ -80,13 +80,9 @@ class Console(cmd2.Cmd): signal.signal(signal.SIGINT, signal_handler) # 2. 创建IO线程处理输入输出 - self.inputLoop = asyncio.new_event_loop() - self.inputThread = threading.Thread(target=self._io_worker, args=(self.inputLoop,)) - self.inputThread.start() - - self.outputLoop = asyncio.new_event_loop() - self.outputThread = threading.Thread(target=self._io_worker, args=(self.outputLoop,)) - self.outputThread.start() + self.ioLoop = asyncio.new_event_loop() + self.ioThread = threading.Thread(target=self._io_worker, args=(self.ioLoop,)) + self.ioThread.start() # 3. 设置console self.saved_readline_settings = None @@ -101,32 +97,29 @@ class Console(cmd2.Cmd): # 2. 停止IO线程 # https://stackoverflow.com/questions/51642267/asyncio-how-do-you-use-run-forever - self.inputLoop.call_soon_threadsafe(self.inputLoop.stop) - self.inputThread.join() - - self.outputLoop.call_soon_threadsafe(self.outputLoop.stop) - self.outputThread.join() + self.ioLoop.call_soon_threadsafe(self.ioLoop.stop) + self.ioThread.join() # commands # def do_logging_off(self, args): """ Disable logging """ - logging.getLogger().setLevel(logging.CRITICAL) + self.log_handler.setLevel(logging.CRITICAL) logging.critical("Logging disabled") def do_logging_debug(self, args): """ Enable debug mode """ - logging.getLogger().setLevel(logging.DEBUG) + self.log_handler.setLevel(logging.DEBUG) logging.debug("Debug mode enabled") def do_logging_info(self, args): """ Enable info mode """ - logging.getLogger().setLevel(logging.INFO) + self.log_handler.setLevel(logging.INFO) logging.info("Info mode enabled") login_parser = cmd2.Cmd2ArgumentParser() @@ -139,7 +132,7 @@ class Console(cmd2.Cmd): Login to pikpak """ await Client.Login(args.username, args.password) - await self.AsyncPrint("Logged in successfully") + await self.Print("Logged in successfully") async def _path_completer(self, text, line, begidx, endidx, filterfiles): father, sonName = await Client.PathToFatherNodeAndNodeName(text) @@ -178,15 +171,15 @@ class Console(cmd2.Cmd): """ node = args.path if node is None: - await self.AsyncPrint("Invalid path") + await self.Print("Invalid path") return await Client.Refresh(node) if IsDir(node): for childId in node.childrenId: child = Client.GetNodeById(childId) - await self.AsyncPrint(child.name) + await self.Print(child.name) elif IsFile(node): - await self.AsyncPrint(f"{node.name}: {node.url}") + await self.Print(f"{node.name}: {node.url}") @RunSync async def complete_cd(self, text, line, begidx, endidx): @@ -202,7 +195,7 @@ class Console(cmd2.Cmd): """ node = args.path if not IsDir(node): - await self.AsyncPrint("Invalid directory") + await self.Print("Invalid directory") return Client.currentLocation = node @@ -211,7 +204,7 @@ class Console(cmd2.Cmd): """ Print current working directory """ - await self.AsyncPrint(Client.NodeToPath(Client.currentLocation)) + await self.Print(Client.NodeToPath(Client.currentLocation)) def do_clear(self, args): """ @@ -247,11 +240,11 @@ class Console(cmd2.Cmd): """ father, sonName = args.path_and_son if not IsDir(father) or sonName == "" or sonName == None: - await self.AsyncPrint("Invalid path") + await self.Print("Invalid path") return child = Client.FindChildInDirByName(father, sonName) if child is not None: - await self.AsyncPrint("Path already exists") + await self.Print("Path already exists") return await Client.MakeDir(father, sonName) @@ -266,28 +259,27 @@ class Console(cmd2.Cmd): """ node = args.path if not IsDir(node): - await self.AsyncPrint("Invalid directory") + await self.Print("Invalid directory") return task = await Client.Download(args.url, node) - await self.AsyncPrint(f"Task {task.id} created") + await self.Print(f"Task {task.id} created") query_parser = cmd2.Cmd2ArgumentParser() - query_parser.add_argument("-f", "--filter", help="filter", nargs="?", choices=[member.value for member in PKTaskStatus]) + query_parser.add_argument("-f", "--filter", help="filter", nargs="?", choices=[member.value for member in TaskStatus]) @RunSync @cmd2.with_argparser(query_parser) async def do_query(self, args): """ Query All Tasks """ - tasks = await Client.QueryTasks(PKTaskStatus(args.filter) if args.filter is not None else None) + tasks = await Client.QueryPikPakTasks(TaskStatus(args.filter) if args.filter is not None else None) # 格式化输出所有task信息id,status,lastStatus的信息,输出表格 - await self.AsyncPrint("id\tstatus\tlastStatus") + await self.Print("tstatus\tdetails\tid") for task in tasks: - await self.AsyncPrint(f"{task.id}\t{task.status.value}\t{task.recoverStatus.value}") + await self.Print(f"{task._status.value}\t{task.status.value}\t{task.id}") retry_parser = cmd2.Cmd2ArgumentParser() - retry_parser.add_argument("taskId", help="taskId", type=int) - + retry_parser.add_argument("taskId", help="taskId") @RunSync @cmd2.with_argparser(retry_parser) async def do_retry(self, args): @@ -299,17 +291,18 @@ class Console(cmd2.Cmd): async def mainLoop(): global MainLoop, Client MainLoop = asyncio.get_running_loop() - Client.Start() + clientWorker = Client.Start() console = Console() console.preloop() try: stop = False while not stop: - line = await console.AsyncInput(console.prompt) + line = await console.Input(console.prompt) stop = console.onecmd_plus_hooks(line) finally: console.postloop() + clientWorker.cancel() if __name__ == "__main__": nest_asyncio.apply() diff --git a/pikpakFs.py b/pikpakFs.py index 21a44b3..70236e4 100644 --- a/pikpakFs.py +++ b/pikpakFs.py @@ -7,63 +7,90 @@ import os import logging from enum import Enum import asyncio +import uuid +from utils import PathWalker +from typing import Callable, Awaitable -class DownloadTaskStatus(Enum): - pending = "pending" - downloading = "downloading" - done = "done" - error = "error" - stopped = "stopped" +class TaskStatus(Enum): + PENDING = "pending" + RUNNING = "running" + DONE = "done" + ERROR = "error" + PAUSED = "paused" -class PKTaskStatus(Enum): - pending = "pending" - remote_downloading = "remote_downloading" - downloading = "downloading" - done = "done" - error = "error" - stopped = "stopped" +class PikPakTaskStatus(Enum): + PENDING = "pending" + REMOTE_DOWNLOADING = "remote downloading" + LOCAL_DOWNLOADING = "local downloading" -class PkTask: - _id = 0 +class FileDownloadTaskStatus(Enum): + PENDING = "pending" + DOWNLOADING = "downloading" - def __init__(self, torrent : str, toDirId : str, status : PKTaskStatus = PKTaskStatus.pending): - PkTask._id += 1 - self.id = PkTask._id +class UnRecoverableError(Exception): + def __init__(self, message): + super().__init__(message) - self.status = PKTaskStatus.pending - self.recoverStatus = status +class TaskBase: + def __init__(self, id : str, tag : str = "", maxConcurrentNumber = -1): + self.id : str = uuid.uuid4() if id is None else id + self.tag : str = tag + self.maxConcurrentNumber : int = maxConcurrentNumber + + self._status : TaskStatus = TaskStatus.PENDING - self.runningTask : asyncio.Task = None - self.name = "" - self.toDirId = toDirId + self.worker : asyncio.Task = None + self.handler : Callable[..., Awaitable] = None + +class PikPakTask(TaskBase): + TAG = "PikPakTask" + MAX_CONCURRENT_NUMBER = 5 + + def __init__(self, torrent : str, toDirId : str, id : str = None, status : PikPakTaskStatus = PikPakTaskStatus.PENDING): + super().__init__(id, PikPakTask.TAG, PikPakTask.MAX_CONCURRENT_NUMBER) + self.status : PikPakTaskStatus = status + self.toDirId : str = toDirId self.nodeId : str = None - self.torrent = torrent # todo: 将torrent的附加参数去掉再加入 + self.name : str = "" + self.torrent : str = torrent # todo: 将torrent的附加参数去掉再加入 self.remoteTaskId : str = None -class DownloadTask: - def __init__(self, nodeId : str, pkTaskId : str, status : DownloadTaskStatus = DownloadTaskStatus.pending): - self.status = DownloadStatus.pending - self.recoverStatus = status - self.pkTaskId = pkTaskId - self.nodeId = nodeId - self.runningTask : asyncio.Task = None +class FileDownloadTask(TaskBase): + TAG = "FileDownloadTask" + MAX_CONCURRENT_NUMBER = 5 -class PathWalker(): - def __init__(self, pathStr : str, sep : str = "/"): - self.__pathSpots : list[str] = [] - if not pathStr.startswith(sep): - self.__pathSpots.append(".") - pathSpots = pathStr.split(sep) - self.__pathSpots.extend(pathSpots) - - def IsAbsolute(self) -> bool: - return len(self.__pathSpots) == 0 or self.__pathSpots[0] != "." + def __init__(self, nodeId : str, PikPakTaskId : str, id : str = None, status : FileDownloadTaskStatus = FileDownloadTaskStatus.PENDING): + super().__init__(id, FileDownloadTask.TAG, FileDownloadTask.MAX_CONCURRENT_NUMBER) + self.status : FileDownloadTaskStatus = status + self.PikPakTaskId : str = PikPakTaskId + self.nodeId : str = nodeId + +async def TaskWorker(task : TaskBase): + try: + if task._status != TaskStatus.PENDING: + return + task._status = TaskStatus.RUNNING + await task.handler(task) + task._status = TaskStatus.DONE + except asyncio.CancelledError: + task._status = TaskStatus.PAUSED + except Exception as e: + logging.error(f"task failed, exception occurred: {e}") + task._status = TaskStatus.ERROR + +async def TaskManager(taskQueues : Dict[str, list[TaskBase]]): + # todo: 处理取消的情况 + while True: + await asyncio.sleep(1) + for taskQueue in taskQueues.values(): + notRunningTasks = [task for task in taskQueue if task.worker is None or task.worker.done()] + runningTasksNumber = len(taskQueue) - len(notRunningTasks) + for task in [task for task in notRunningTasks if task._status == TaskStatus.PENDING]: + if runningTasksNumber >= task.maxConcurrentNumber: + break + task.worker = asyncio.create_task(TaskWorker(task)) + runningTasksNumber += 1 - def AppendSpot(self, spot): - self.__pathSpots.append(spot) - - def Walk(self) -> list[str]: - return self.__pathSpots class FsNode: def __init__(self, id : str, name : str, fatherId : str): @@ -88,7 +115,7 @@ def IsDir(node : FsNode) -> bool: def IsFile(node : FsNode) -> bool: return isinstance(node, FileNode) -class PkToken: +class PikPakToken: def __init__(self, username, password, access_token, refresh_token, user_id): self.username = username self.password = password @@ -104,96 +131,88 @@ class PkToken: data = json.loads(json_str) return cls(**data) -class PKFs: - MAX_PIKPAK_TASKS = 5 - MAX_DOWNLOAD_TASKS = 5 +class PikPakFs: - async def _pktask_pending(self, task : PkTask): - if task.recoverStatus != PKTaskStatus.pending: - task.status = task.recoverStatus - return - pkTask = await self.client.offline_download(task.torrent, task.toDirId) - task.remoteTaskId = pkTask["task"]["id"] - task.nodeId = pkTask["task"]["file_id"] - task.name = pkTask["task"]["name"] - task.status = PKTaskStatus.remote_downloading + async def _pikpak_task_pending(self, task : PikPakTask): + pikPakTaskInfo = await self.client.offline_download(task.torrent, task.toDirId) + task.remoteTaskId = pikPakTaskInfo["task"]["id"] + task.nodeId = pikPakTaskInfo["task"]["file_id"] + task.status = PikPakTaskStatus.REMOTE_DOWNLOADING - async def _pktask_offline_downloading(self, task : PkTask): + async def _pikpak_offline_downloading(self, task : PikPakTask): waitTime = 3 while True: await asyncio.sleep(waitTime) status = await self.client.get_task_status(task.remoteTaskId, task.nodeId) if status in {DownloadStatus.not_found, DownloadStatus.not_downloading, DownloadStatus.error}: - task.recoverStatus = PKTaskStatus.pending - task.status = PKTaskStatus.error - break + self.status = PikPakTaskStatus.PENDING + raise Exception(f"remote download failed, status: {status}") elif status == DownloadStatus.done: - fileInfo = await self.client.offline_file_info(file_id=task.nodeId) - node = self.GetNodeById(task.nodeId) - if node is not None: - oldFather = self.GetFatherNode(node) - if oldFather is not None: - oldFather.childrenId.remove(node.id) - - task.toDirId = fileInfo["parent_id"] - task.name = fileInfo["name"] - type = fileInfo["kind"] - if type.endswith("folder"): - self.nodes[task.nodeId] = DirNode(task.nodeId, task.name, task.toDirId) - else: - self.nodes[task.nodeId] = FileNode(task.nodeId, task.name, task.toDirId) - father = self.GetNodeById(task.toDirId) - if father.id is not None: - father.childrenId.append(task.nodeId) - task.status = PKTaskStatus.downloading break waitTime = waitTime * 1.5 + + fileInfo = await self.client.offline_file_info(file_id=task.nodeId) + task.toDirId = fileInfo["parent_id"] + task.name = fileInfo["name"] + if fileInfo["kind"].endswith("folder"): + self.nodes[task.nodeId] = DirNode(task.nodeId, task.name, task.toDirId) + else: + self.nodes[task.nodeId] = FileNode(task.nodeId, task.name, task.toDirId) + father = self.GetNodeById(task.toDirId) + if father.id is not None and task.nodeId not in father.childrenId: + father.childrenId.append(task.nodeId) + task.status = PikPakTaskStatus.LOCAL_DOWNLOADING - async def _pktask_worker(self, task : PkTask): - while task.status not in {PKTaskStatus.done, PKTaskStatus.error, PKTaskStatus.stopped}: - try: - if task.status == PKTaskStatus.pending: - await self._pktask_pending(task) - elif task.status == PKTaskStatus.remote_downloading: - await self._pktask_offline_downloading(task) - elif task.status == PKTaskStatus.downloading: - task.status = PKTaskStatus.done - else: - break - except asyncio.CancelledError: - task.recoverStatus = task.status - task.status = PKTaskStatus.stopped - except Exception as e: - logging.error(f"task failed, exception occurred: {e}") - task.recoverStatus = task.status - task.status = PKTaskStatus.error - - async def _pktask_manager(self): + async def _pikpak_task_handler(self, task : PikPakTask): while True: - await asyncio.sleep(1) - runningTasksNum = 0 - notRunningTasks = [task for task in self.tasks if task.runningTask is None or task.runningTask.done()] - if len(self.tasks) - len(notRunningTasks) >= PKFs.MAX_PIKPAK_TASKS: - continue - for task in [task for task in notRunningTasks if task.status == PKTaskStatus.pending]: - task.runningTask = asyncio.create_task(self._pktask_worker(task)) - runningTasksNum += 1 - if runningTasksNum >= PKFs.MAX_PIKPAK_TASKS: - break - + if task.status == PikPakTaskStatus.PENDING: + await self._pikpak_task_pending(task) + elif task.status == PikPakTaskStatus.REMOTE_DOWNLOADING: + await self._pikpak_offline_downloading(task) + elif task.status == PikPakTaskStatus.LOCAL_DOWNLOADING: + break + else: + break + + async def _file_download_task_handler(self, task : FileDownloadTask): + pass + + def _add_task(self, task : TaskBase): + if self.taskQueues.get(task.tag) is None: + self.taskQueues[task.tag] = [] + self.taskQueues[task.tag].append(task) + + async def StopTask(self, task : TaskBase): + pass + + async def ResumeTask(self, task : TaskBase): + pass + + async def RetryTask(self, taskId : str): + if PikPakTask.TAG not in self.taskQueues: + return + for task in self.taskQueues[PikPakTask.TAG]: + if task.id == taskId and task._status == TaskStatus.ERROR: + task._status = TaskStatus.PENDING + break + elif task.id == taskId: + break + + def Start(self): + return asyncio.create_task(TaskManager(self.taskQueues)) def __init__(self, loginCachePath : str = None, proxy : str = None, rootId = None): self.nodes : Dict[str, FsNode] = {} self.root = DirNode(rootId, "", None) self.currentLocation = self.root - self.tasks : list[PkTask] = [] + self.taskQueues : Dict[str, list[TaskBase]] = {} self.loginCachePath = loginCachePath self.proxyConfig = proxy self.client : PikPakApi = None self._try_login_from_cache() - def _init_client_by_token(self, token : PkToken): + def _init_client_by_token(self, token : PikPakToken): self._init_client_by_username_and_password(token.username, token.password) self.client.access_token = token.access_token self.client.refresh_token = token.refresh_token @@ -220,7 +239,7 @@ class PKFs: return with open(self.loginCachePath, 'r', encoding='utf-8') as file: content = file.read() - token = PkToken.from_json(content) + token = PikPakToken.from_json(content) self._init_client_by_token(token) logging.info("successfully load login info from cache") @@ -228,7 +247,7 @@ class PKFs: if self.loginCachePath is None: return with open(self.loginCachePath, 'w', encoding='utf-8') as file: - token = PkToken(self.client.username, self.client.password, self.client.access_token, self.client.refresh_token, self.client.user_id) + token = PikPakToken(self.client.username, self.client.password, self.client.access_token, self.client.refresh_token, self.client.user_id) file.write(token.to_json()) logging.info("successfully dump login info to cache") @@ -243,20 +262,6 @@ class PKFs: return True return False - async def StopTask(self, taskId : int): - pass - - async def ResumeTask(self, taskId : int): - pass - - async def RetryTask(self, taskId : int): - task = next((t for t in self.tasks if t.id == taskId), None) - if task and task.status == PKTaskStatus.error: - task.status = PKTaskStatus.pending - - def Start(self): - asyncio.create_task(self._pktask_manager()) - def GetNodeById(self, id : str) -> FsNode: if id == self.root.id: return self.root @@ -382,15 +387,19 @@ class PKFs: node.childrenId.append(id) return newDir - async def Download(self, url : str, dirNode : DirNode) -> PkTask : - task = PkTask(url, dirNode.id) - self.tasks.append(task) + async def Download(self, url : str, dirNode : DirNode) -> PikPakTask : + task = PikPakTask(url, dirNode.id) + task.handler = self._pikpak_task_handler + self._add_task(task) return task - async def QueryTasks(self, filterByStatus : PKTaskStatus = None) -> list[PkTask]: - if filterByStatus is None: - return self.tasks - return [task for task in self.tasks if task.status == filterByStatus] + async def QueryPikPakTasks(self, filterStatus : TaskStatus = None) -> list[PikPakTask]: + if PikPakTask.TAG not in self.taskQueues: + return [] + taskQueue = self.taskQueues[PikPakTask.TAG] + if filterStatus is None: + return taskQueue + return [task for task in taskQueue if task._status == filterStatus] async def Delete(self, nodes : list[FsNode]) -> None: nodeIds = [node.id for node in nodes] diff --git a/readme.md b/readme.md index 841322f..b80b180 100644 --- a/readme.md +++ b/readme.md @@ -12,9 +12,11 @@ Todo: - [ ] 实现本地下载队列(多文件,文件夹) - [ ] 实现任务暂停、继续、恢复 - [ ] 持久化数据 +- [ ] 添加测试用例 +- [ ] 完全类型化 -### 记录 +### 协议结构 1. offline_download ```json { diff --git a/utils.py b/utils.py new file mode 100644 index 0000000..b6b11a0 --- /dev/null +++ b/utils.py @@ -0,0 +1,16 @@ +class PathWalker(): + def __init__(self, path : str, sep : str = "/"): + self._path_spots : list[str] = [] + if not path.startswith(sep): + self._path_spots.append(".") + path_spots : list[str] = path.split(sep) + self._path_spots.extend(path_spots) + + def IsAbsolute(self) -> bool: + return len(self._path_spots) == 0 or self._path_spots[0] != "." + + def AppendSpot(self, spot): + self._path_spots.append(spot) + + def Walk(self) -> list[str]: + return self._path_spots \ No newline at end of file