diff --git a/PikPakFs.py b/PikPakFs.py index b575f4b..c6b4228 100644 --- a/PikPakFs.py +++ b/PikPakFs.py @@ -10,6 +10,7 @@ import asyncio import shortuuid from utils import PathWalker from typing import Callable, Awaitable +import random class TaskStatus(Enum): PENDING = "pending" @@ -22,11 +23,13 @@ class PikPakTaskStatus(Enum): PENDING = "pending" REMOTE_DOWNLOADING = "remote" LOCAL_DOWNLOADING = "local" + DONE = "done" class FileDownloadTaskStatus(Enum): PENDING = "pending" DOWNLOADING = "downloading" - + DONE = "done" + class UnRecoverableError(Exception): def __init__(self, message): super().__init__(message) @@ -36,6 +39,7 @@ class TaskBase: self.id : str = shortuuid.uuid() if id is None else id self.tag : str = tag self.maxConcurrentNumber : int = maxConcurrentNumber + self.name : str = "" self._status : TaskStatus = TaskStatus.PENDING @@ -46,25 +50,36 @@ class PikPakTask(TaskBase): TAG = "PikPakTask" MAX_CONCURRENT_NUMBER = 5 - def __init__(self, torrent : str, toDirId : str, id : str = None, status : PikPakTaskStatus = PikPakTaskStatus.PENDING): + def __init__(self, torrent : str, toDirId : str, nodeId : str = None, status : PikPakTaskStatus = PikPakTaskStatus.PENDING, id : str = None): super().__init__(id, PikPakTask.TAG, PikPakTask.MAX_CONCURRENT_NUMBER) self.status : PikPakTaskStatus = status self.toDirId : str = toDirId - self.nodeId : str = None - self.name : str = "" + self.nodeId : str = nodeId self.torrent : str = torrent # todo: 将torrent的附加参数去掉再加入 self.remoteTaskId : str = None + self.progress : str = "" + + def __eq__(self, other): + if isinstance(other, PikPakTask): + return self is other or self.nodeId == other.nodeId + return False + class FileDownloadTask(TaskBase): TAG = "FileDownloadTask" MAX_CONCURRENT_NUMBER = 5 - def __init__(self, nodeId : str, PikPakTaskId : str, relativePath : str, status : FileDownloadTaskStatus = FileDownloadTaskStatus.PENDING): - super().__init__(nodeId, FileDownloadTask.TAG, FileDownloadTask.MAX_CONCURRENT_NUMBER) + def __init__(self, nodeId : str, PikPakTaskId : str, relativePath : str, status : FileDownloadTaskStatus = FileDownloadTaskStatus.PENDING, id : str = None): + super().__init__(id, FileDownloadTask.TAG, FileDownloadTask.MAX_CONCURRENT_NUMBER) self.status : FileDownloadTaskStatus = status self.PikPakTaskId : str = PikPakTaskId self.nodeId : str = nodeId self.relativePath : str = relativePath + + def __eq__(self, other): + if isinstance(other, FileDownloadTask): + return self is other or (self.nodeId == other.nodeId and self.PikPakTaskId == other.PikPakTaskId) + return False async def TaskWorker(task : TaskBase): try: @@ -82,7 +97,7 @@ async def TaskWorker(task : TaskBase): async def TaskManager(taskQueues : Dict[str, list[TaskBase]]): # todo: 处理取消的情况 while True: - await asyncio.sleep(1) + await asyncio.sleep(0.5) for taskQueue in taskQueues.values(): notRunningTasks = [task for task in taskQueue if task.worker is None or task.worker.done()] runningTasksNumber = len(taskQueue) - len(notRunningTasks) @@ -170,6 +185,7 @@ class PikPakFs: if IsFile(node): fileDownloadTask = FileDownloadTask(task.nodeId, task.id, self.NodeToPath(node, node)) fileDownloadTask.handler = self._file_download_task_handler + fileDownloadTask.name = task.name self._add_task(fileDownloadTask) elif IsDir(node): # 使用广度优先遍历 @@ -184,7 +200,39 @@ class PikPakFs: elif IsFile(child): fileDownloadTask = FileDownloadTask(childId, task.id, self.NodeToPath(child, node)) fileDownloadTask.handler = self._file_download_task_handler + fileDownloadTask.name = task.name self._add_task(fileDownloadTask) + + # 开始等待下载任务完成 + while True: + fileDownloadTasks = self.taskQueues.get(FileDownloadTask.TAG) + myTasks = [myTask for myTask in fileDownloadTasks if myTask.PikPakTaskId == task.id] + allNumber = len(myTasks) + notCompletedNumber = 0 + pausedNumber = 0 + errorNumber = 0 + for myTask in myTasks: + if myTask._status == TaskStatus.PAUSED: + pausedNumber += 1 + if myTask._status == TaskStatus.ERROR: + errorNumber += 1 + if myTask._status in {TaskStatus.PENDING, TaskStatus.RUNNING}: + notCompletedNumber += 1 + + runningNumber = allNumber - notCompletedNumber - pausedNumber - errorNumber + task.progress = f"{runningNumber}/{allNumber} ({pausedNumber}|{errorNumber})" + + if notCompletedNumber > 0: + await asyncio.sleep(0.5) + continue + if errorNumber > 0: + raise Exception("file download failed") + if pausedNumber > 0: + raise asyncio.CancelledError() + break + + task.status = PikPakTaskStatus.DONE + async def _pikpak_task_handler(self, task : PikPakTask): while True: @@ -198,32 +246,39 @@ class PikPakFs: break async def _file_download_task_handler(self, task : FileDownloadTask): + if random.randint(1, 5) == 2: + raise asyncio.CancelledError() pass def _add_task(self, task : TaskBase): if self.taskQueues.get(task.tag) is None: self.taskQueues[task.tag] = [] taskQueue = self.taskQueues[task.tag] - for t in taskQueue: - if t.id == task.id: + for other in taskQueue: + if other == task: + if other._status != TaskStatus.DONE: + other._status = TaskStatus.PENDING return taskQueue.append(task) - async def StopTask(self, task : TaskBase): - pass + async def GetTaskById(self, taskId : str) -> TaskBase: + for taskQueue in self.taskQueues.values(): + for task in taskQueue: + if task.id == taskId: + return task + return None - async def ResumeTask(self, task : TaskBase): - pass + async def StopTask(self, taskId : str): + task = await self.GetTaskById(taskId) + if task is not None and task._status in {TaskStatus.PENDING, TaskStatus.RUNNING}: + if task.worker is not None: + task.worker.cancel() + task.worker = None - async def RetryTask(self, taskId : str): - if PikPakTask.TAG not in self.taskQueues: - return - for task in self.taskQueues[PikPakTask.TAG]: - if task.id == taskId and task._status == TaskStatus.ERROR: - task._status = TaskStatus.PENDING - break - elif task.id == taskId: - break + async def ResumeTask(self, taskId : str): + task = await self.GetTaskById(taskId) + if task is not None and task._status in {TaskStatus.PAUSED, TaskStatus.ERROR}: + task._status = TaskStatus.PENDING def Start(self): return asyncio.create_task(TaskManager(self.taskQueues)) @@ -424,7 +479,8 @@ class PikPakFs: async def Pull(self, node : FsNode) -> PikPakTask: task = PikPakTask("", node.fatherId, node.id, PikPakTaskStatus.LOCAL_DOWNLOADING) - task.handler = self._pikpak_local_downloading + task.name = node.name + task.handler = self._pikpak_task_handler self._add_task(task) return task diff --git a/main.py b/main.py index c7c0ed4..7cef92b 100644 --- a/main.py +++ b/main.py @@ -7,6 +7,8 @@ import colorlog from PikPakFs import PikPakFs, IsDir, IsFile, TaskStatus import os from tabulate import tabulate +import wcwidth +import types LogFormatter = colorlog.ColoredFormatter( "%(log_color)s%(asctime)s - %(levelname)s - %(name)s - %(message)s", @@ -61,6 +63,12 @@ class RunSync: else: return asyncio.run_coroutine_threadsafe(func(*args, **kwargs), MainLoop).result() + def __get__(self, instance, cls): + if instance is None: + return self + else: + return types.MethodType(self, instance) + class Console(cmd2.Cmd): def _io_worker(self, loop): asyncio.set_event_loop(loop) @@ -165,7 +173,8 @@ class Console(cmd2.Cmd): matches.append(text[:text.rfind(sonName)] + child.name) matchesNode.append(child) if len(matchesNode) == 1 and IsDir(matchesNode[0]): - matches[0] += "/" + if matches[0] == sonName: + matches[0] += "/" self.allow_appended_space = False self.allow_closing_quote = False return matches @@ -304,8 +313,8 @@ class Console(cmd2.Cmd): if args.type == "pikpak": tasks = await Client.QueryPikPakTasks(TaskStatus(args.filter) if args.filter is not None else None) # 格式化输出所有task信息id,status,lastStatus的信息,输出表格 - table = [[task.id, task._status.value, task.status.value] for task in tasks] - headers = ["id", "status", "details"] + table = [[task.id, task._status.value, task.status.value, task.progress] for task in tasks] + headers = ["id", "status", "details", "progress"] await self.Print(tabulate(table, headers, tablefmt="grid")) elif args.type == "filedownload": tasks = await Client.QueryFileDownloadTasks(TaskStatus(args.filter) if args.filter is not None else None) @@ -314,15 +323,24 @@ class Console(cmd2.Cmd): headers = ["id", "status", "details", "path"] await self.Print(tabulate(table, headers, tablefmt="grid")) - retry_parser = cmd2.Cmd2ArgumentParser() - retry_parser.add_argument("taskId", help="taskId") - @cmd2.with_argparser(retry_parser) + taskid_parser = cmd2.Cmd2ArgumentParser() + taskid_parser.add_argument("taskId", help="taskId") + + @cmd2.with_argparser(taskid_parser) @RunSync - async def do_retry(self, args): + async def do_pause(self, args): """ - Retry a task + Stop a task """ - await Client.RetryTask(args.taskId) + await Client.StopTask(args.taskId) + + @cmd2.with_argparser(taskid_parser) + @RunSync + async def do_resume(self, args): + """ + Resume a task + """ + await Client.ResumeTask(args.taskId) async def mainLoop(): global MainLoop, Client