diff --git a/PikPakFileSystem.py b/PikPakFileSystem.py index 792ce55..765ac2e 100644 --- a/PikPakFileSystem.py +++ b/PikPakFileSystem.py @@ -269,7 +269,14 @@ class PikPakFileSystem: father, son_name = await self._path_to_father_node_and_son_name(path) return await self._node_to_path(father), son_name - async def GetFileUrl(self, path : str) -> str: + async def GetFileUrlByNodeId(self, node_id : str) -> str: + node = await self._get_node_by_id(node_id) + if not isinstance(node, FileNode): + return None + await self._refresh(node) + return node.url + + async def GetFileUrlByPath(self, path : str) -> str: node = await self._path_to_node(path) if not isinstance(node, FileNode): return None diff --git a/TaskManager.py b/TaskManager.py index b890a7c..510013e 100644 --- a/TaskManager.py +++ b/TaskManager.py @@ -4,6 +4,7 @@ import asyncio import logging import shortuuid from PikPakFileSystem import PikPakFileSystem, FileNode, DirNode +from aria2helper import Aria2Status, addUri, tellStatus, pause, unpause from pikpakapi import DownloadStatus import random import pickle @@ -82,6 +83,8 @@ class FileDownloadTask(TaskBase): self.node_id : str = node_id self.remote_path : str = remote_path self.owner_id : str = owner_id + self.gid : str = None + self.url : str = None async def TaskWorker(task : TaskBase): try: @@ -249,12 +252,39 @@ class TaskManager: await self._append_task(task) return task.id + async def _on_file_download_task_pending(self, task : FileDownloadTask): + task.url = await self.client.GetFileUrlByNodeId(task.node_id) + task.gid = await addUri(task.url, task.remote_path) + task.file_download_status = FileDownloadTaskStatus.DOWNLOADING + + async def _on_file_download_task_downloading(self, task : FileDownloadTask): + wait_seconds = 3 + while True: + status = await tellStatus(task.gid) + if status in {Aria2Status.REMOVED, Aria2Status.ERROR}: + self.file_download_status = FileDownloadTaskStatus.PENDING + raise Exception("failed to query status") + elif status == Aria2Status.PAUSED: + await unpause(task.gid) + elif status == Aria2Status.COMPLETE: + break + await asyncio.sleep(wait_seconds) + task.file_download_status = FileDownloadTaskStatus.DONE + async def _file_download_task_handler(self, task : FileDownloadTask): - if random.randint(1, 5) == 2: - raise asyncio.CancelledError() - if random.randint(1, 5) == 3: - raise Exception("random error") - pass + try: + while True: + if task.file_download_status == FileDownloadTaskStatus.PENDING: + await self._on_file_download_task_pending(task) + elif task.file_download_status == FileDownloadTaskStatus.DOWNLOADING: + await self._on_file_download_task_downloading(task) + else: + break + except asyncio.CancelledError: + gid = task.gid + if gid is not None: + await pause(gid) + raise #endregion diff --git a/aria2helper.py b/aria2helper.py new file mode 100644 index 0000000..27b036f --- /dev/null +++ b/aria2helper.py @@ -0,0 +1,72 @@ +import httpx, json +from enum import Enum + +class Aria2Status(Enum): + ACTIVE = "active" + WAITING = "waiting" + PAUSED = "paused" + ERROR = "error" + COMPLETE = "complete" + REMOVED = "removed" + +ARIA_ADDRESS = "http://100.96.0.2:6800/jsonrpc" +ARIA_SECRET = "jfaieofjosiefjoiaesjfoiasejf" +BASE_PATH = "/downloads" + +client = httpx.AsyncClient() + +async def addUri(uri, path): + jsonreq = json.dumps({ + "jsonrpc" : "2.0", + "id" : "pikpak", + "method" : "aria2.addUri", + "params" : [ f"token:{ARIA_SECRET}", [uri], + { + "dir" : BASE_PATH, + "out" : path + }] + }) + response = await client.post(ARIA_ADDRESS, data=jsonreq) + result = json.loads(response.text) + return result["result"] + + +async def tellStatus(gid) -> Aria2Status: + jsonreq = json.dumps({ + "jsonrpc" : "2.0", + "id" : "pikpak", + "method" : "aria2.tellStatus", + "params" : [ f"token:{ARIA_SECRET}", gid] + }) + response = await client.post(ARIA_ADDRESS, data=jsonreq) + result = json.loads(response.text) + if "error" in result: + return Aria2Status.REMOVED + return Aria2Status(result["result"]["status"]) + +async def pause(gid): + jsonreq = json.dumps({ + "jsonrpc" : "2.0", + "id" : "pikpak", + "method" : "aria2.pause", + "params" : [ f"token:{ARIA_SECRET}", gid] + }) + await client.post(ARIA_ADDRESS, data=jsonreq) + +async def unpause(gid): + jsonreq = json.dumps({ + "jsonrpc" : "2.0", + "id" : "pikpak", + "method" : "aria2.unpause", + "params" : [ f"token:{ARIA_SECRET}", gid] + }) + await client.post(ARIA_ADDRESS, data=jsonreq) + +async def remove(gid): + jsonreq = json.dumps({ + "jsonrpc" : "2.0", + "id" : "pikpak", + "method" : "aria2.remove", + "params" : [ f"token:{ARIA_SECRET}", gid] + }) + await client.post(ARIA_ADDRESS, data=jsonreq) \ No newline at end of file diff --git a/main.py b/main.py index 68e61b0..9b24205 100644 --- a/main.py +++ b/main.py @@ -200,7 +200,7 @@ class App(cmd2.Cmd): for child_name in await Client.GetChildrenNames(args.path, False): await self.print(child_name) else: - await self.print(await Client.GetFileUrl(args.path)) + await self.print(await Client.GetFileUrlByPath(args.path)) @RunSync async def complete_cd(self, text, line, begidx, endidx): diff --git a/readme.md b/readme.md index 37a52d2..a854c41 100644 --- a/readme.md +++ b/readme.md @@ -12,6 +12,7 @@ Todo: - [x] 持久化数据 - [ ] 实现本地下载队列(多文件,文件夹) - [x] 实现任务暂停、继续、恢复 +- [ ] 接口分离,前后端分离 - [ ] 添加测试用例 - [ ] 完全类型化