From 0762ec99524dc8a0484906c6575626d5a385abc1 Mon Sep 17 00:00:00 2001 From: limil Date: Fri, 1 Nov 2024 09:26:40 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4task=E9=98=9F=E5=88=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- main.py | 3 +- pikpakFs.py | 111 ++++++++++++++++++++++++++++++++-------------------- 2 files changed, 70 insertions(+), 44 deletions(-) diff --git a/main.py b/main.py index 1778cd9..c3fcbd1 100644 --- a/main.py +++ b/main.py @@ -299,7 +299,8 @@ class Console(cmd2.Cmd): async def mainLoop(): global MainLoop, Client MainLoop = asyncio.get_running_loop() - + Client.Start() + console = Console() console.preloop() try: diff --git a/pikpakFs.py b/pikpakFs.py index 2c43e85..21a44b3 100644 --- a/pikpakFs.py +++ b/pikpakFs.py @@ -16,36 +16,37 @@ class DownloadTaskStatus(Enum): stopped = "stopped" class PKTaskStatus(Enum): - pending_offline_download = "pending" - offline_downloading = "remote_downloading" - pending_download = "pending_for_download" + pending = "pending" + remote_downloading = "remote_downloading" downloading = "downloading" done = "done" error = "error" stopped = "stopped" class PkTask: - id = 0 - def __init__(self, torrent : str, toDirId : str, status : PKTaskStatus = PKTaskStatus.pending_offline_download): - PkTask.id += 1 - self.taskId = PkTask.id - self.status = status + _id = 0 + + def __init__(self, torrent : str, toDirId : str, status : PKTaskStatus = PKTaskStatus.pending): + PkTask._id += 1 + self.id = PkTask._id + + self.status = PKTaskStatus.pending self.recoverStatus = status - self.name : str = "" self.runningTask : asyncio.Task = None + self.name = "" self.toDirId = toDirId self.nodeId : str = None - self.torrent = torrent - self.url = None - self.pkTaskId = None + self.torrent = torrent # todo: 将torrent的附加参数去掉再加入 + self.remoteTaskId : str = None class DownloadTask: def __init__(self, nodeId : str, pkTaskId : str, status : DownloadTaskStatus = DownloadTaskStatus.pending): - self.status = status + self.status = DownloadStatus.pending self.recoverStatus = status self.pkTaskId = pkTaskId self.nodeId = nodeId + self.runningTask : asyncio.Task = None class PathWalker(): def __init__(self, pathStr : str, sep : str = "/"): @@ -104,35 +105,35 @@ class PkToken: return cls(**data) class PKFs: - async def RetryTask(self, taskId : int): - task = self.tasks[taskId] - if task == None or task.status != PKTaskStatus.error: - return - task.status = task.recoverStatus - self.RunTask(task) + MAX_PIKPAK_TASKS = 5 + MAX_DOWNLOAD_TASKS = 5 - async def _task_pending(self, task : PkTask): + async def _pktask_pending(self, task : PkTask): + if task.recoverStatus != PKTaskStatus.pending: + task.status = task.recoverStatus + return pkTask = await self.client.offline_download(task.torrent, task.toDirId) - task.pkTaskId = pkTask["task"]["id"] + task.remoteTaskId = pkTask["task"]["id"] task.nodeId = pkTask["task"]["file_id"] task.name = pkTask["task"]["name"] - task.status = PKTaskStatus.offline_downloading + task.status = PKTaskStatus.remote_downloading - async def _task_offline_downloading(self, task : PkTask): + async def _pktask_offline_downloading(self, task : PkTask): waitTime = 3 while True: await asyncio.sleep(waitTime) - status = await self.client.get_task_status(task.pkTaskId, task.nodeId) - if status == DownloadStatus.not_found or status == DownloadStatus.not_found or status == DownloadStatus.error: - task.recoverStatus = PKTaskStatus.pending_offline_download + status = await self.client.get_task_status(task.remoteTaskId, task.nodeId) + if status in {DownloadStatus.not_found, DownloadStatus.not_downloading, DownloadStatus.error}: + task.recoverStatus = PKTaskStatus.pending task.status = PKTaskStatus.error break elif status == DownloadStatus.done: fileInfo = await self.client.offline_file_info(file_id=task.nodeId) - if self.GetNodeById(task.nodeId) is not None: - oldFather = self.GetFatherNode(task.nodeId) + node = self.GetNodeById(task.nodeId) + if node is not None: + oldFather = self.GetFatherNode(node) if oldFather is not None: - oldFather.childrenId.remove(task.nodeId) + oldFather.childrenId.remove(node.id) task.toDirId = fileInfo["parent_id"] task.name = fileInfo["name"] @@ -144,32 +145,42 @@ class PKFs: father = self.GetNodeById(task.toDirId) if father.id is not None: father.childrenId.append(task.nodeId) - task.status = PKTaskStatus.pending_download + task.status = PKTaskStatus.downloading break waitTime = waitTime * 1.5 - async def _task_worker(self, task : PkTask): + async def _pktask_worker(self, task : PkTask): while task.status not in {PKTaskStatus.done, PKTaskStatus.error, PKTaskStatus.stopped}: try: - if task.status == PKTaskStatus.pending_offline_download: - await self._task_pending(task) - elif task.status == PKTaskStatus.offline_downloading: - await self._task_offline_downloading(task) - elif task.status == PKTaskStatus.pending_download: + if task.status == PKTaskStatus.pending: + await self._pktask_pending(task) + elif task.status == PKTaskStatus.remote_downloading: + await self._pktask_offline_downloading(task) + elif task.status == PKTaskStatus.downloading: task.status = PKTaskStatus.done else: break + except asyncio.CancelledError: + task.recoverStatus = task.status + task.status = PKTaskStatus.stopped except Exception as e: logging.error(f"task failed, exception occurred: {e}") task.recoverStatus = task.status task.status = PKTaskStatus.error - - - def RunTask(self, task : PkTask): - self.tasks.append(task) - if task.runningTask is None or task.runningTask.done(): - task.runningTask = asyncio.create_task(self._task_worker(task)) + async def _pktask_manager(self): + while True: + await asyncio.sleep(1) + runningTasksNum = 0 + notRunningTasks = [task for task in self.tasks if task.runningTask is None or task.runningTask.done()] + if len(self.tasks) - len(notRunningTasks) >= PKFs.MAX_PIKPAK_TASKS: + continue + for task in [task for task in notRunningTasks if task.status == PKTaskStatus.pending]: + task.runningTask = asyncio.create_task(self._pktask_worker(task)) + runningTasksNum += 1 + if runningTasksNum >= PKFs.MAX_PIKPAK_TASKS: + break + def __init__(self, loginCachePath : str = None, proxy : str = None, rootId = None): self.nodes : Dict[str, FsNode] = {} @@ -232,6 +243,20 @@ class PKFs: return True return False + async def StopTask(self, taskId : int): + pass + + async def ResumeTask(self, taskId : int): + pass + + async def RetryTask(self, taskId : int): + task = next((t for t in self.tasks if t.id == taskId), None) + if task and task.status == PKTaskStatus.error: + task.status = PKTaskStatus.pending + + def Start(self): + asyncio.create_task(self._pktask_manager()) + def GetNodeById(self, id : str) -> FsNode: if id == self.root.id: return self.root @@ -359,7 +384,7 @@ class PKFs: async def Download(self, url : str, dirNode : DirNode) -> PkTask : task = PkTask(url, dirNode.id) - self.RunTask(task) + self.tasks.append(task) return task async def QueryTasks(self, filterByStatus : PKTaskStatus = None) -> list[PkTask]: