添加任务

This commit is contained in:
limil 2024-11-02 23:25:43 +08:00
parent fb08711287
commit b84358f228
2 changed files with 106 additions and 32 deletions

View File

@ -10,6 +10,7 @@ import asyncio
import shortuuid import shortuuid
from utils import PathWalker from utils import PathWalker
from typing import Callable, Awaitable from typing import Callable, Awaitable
import random
class TaskStatus(Enum): class TaskStatus(Enum):
PENDING = "pending" PENDING = "pending"
@ -22,11 +23,13 @@ class PikPakTaskStatus(Enum):
PENDING = "pending" PENDING = "pending"
REMOTE_DOWNLOADING = "remote" REMOTE_DOWNLOADING = "remote"
LOCAL_DOWNLOADING = "local" LOCAL_DOWNLOADING = "local"
DONE = "done"
class FileDownloadTaskStatus(Enum): class FileDownloadTaskStatus(Enum):
PENDING = "pending" PENDING = "pending"
DOWNLOADING = "downloading" DOWNLOADING = "downloading"
DONE = "done"
class UnRecoverableError(Exception): class UnRecoverableError(Exception):
def __init__(self, message): def __init__(self, message):
super().__init__(message) super().__init__(message)
@ -36,6 +39,7 @@ class TaskBase:
self.id : str = shortuuid.uuid() if id is None else id self.id : str = shortuuid.uuid() if id is None else id
self.tag : str = tag self.tag : str = tag
self.maxConcurrentNumber : int = maxConcurrentNumber self.maxConcurrentNumber : int = maxConcurrentNumber
self.name : str = ""
self._status : TaskStatus = TaskStatus.PENDING self._status : TaskStatus = TaskStatus.PENDING
@ -46,25 +50,36 @@ class PikPakTask(TaskBase):
TAG = "PikPakTask" TAG = "PikPakTask"
MAX_CONCURRENT_NUMBER = 5 MAX_CONCURRENT_NUMBER = 5
def __init__(self, torrent : str, toDirId : str, id : str = None, status : PikPakTaskStatus = PikPakTaskStatus.PENDING): def __init__(self, torrent : str, toDirId : str, nodeId : str = None, status : PikPakTaskStatus = PikPakTaskStatus.PENDING, id : str = None):
super().__init__(id, PikPakTask.TAG, PikPakTask.MAX_CONCURRENT_NUMBER) super().__init__(id, PikPakTask.TAG, PikPakTask.MAX_CONCURRENT_NUMBER)
self.status : PikPakTaskStatus = status self.status : PikPakTaskStatus = status
self.toDirId : str = toDirId self.toDirId : str = toDirId
self.nodeId : str = None self.nodeId : str = nodeId
self.name : str = ""
self.torrent : str = torrent # todo: 将torrent的附加参数去掉再加入 self.torrent : str = torrent # todo: 将torrent的附加参数去掉再加入
self.remoteTaskId : str = None self.remoteTaskId : str = None
self.progress : str = ""
def __eq__(self, other):
if isinstance(other, PikPakTask):
return self is other or self.nodeId == other.nodeId
return False
class FileDownloadTask(TaskBase): class FileDownloadTask(TaskBase):
TAG = "FileDownloadTask" TAG = "FileDownloadTask"
MAX_CONCURRENT_NUMBER = 5 MAX_CONCURRENT_NUMBER = 5
def __init__(self, nodeId : str, PikPakTaskId : str, relativePath : str, status : FileDownloadTaskStatus = FileDownloadTaskStatus.PENDING): def __init__(self, nodeId : str, PikPakTaskId : str, relativePath : str, status : FileDownloadTaskStatus = FileDownloadTaskStatus.PENDING, id : str = None):
super().__init__(nodeId, FileDownloadTask.TAG, FileDownloadTask.MAX_CONCURRENT_NUMBER) super().__init__(id, FileDownloadTask.TAG, FileDownloadTask.MAX_CONCURRENT_NUMBER)
self.status : FileDownloadTaskStatus = status self.status : FileDownloadTaskStatus = status
self.PikPakTaskId : str = PikPakTaskId self.PikPakTaskId : str = PikPakTaskId
self.nodeId : str = nodeId self.nodeId : str = nodeId
self.relativePath : str = relativePath self.relativePath : str = relativePath
def __eq__(self, other):
if isinstance(other, FileDownloadTask):
return self is other or (self.nodeId == other.nodeId and self.PikPakTaskId == other.PikPakTaskId)
return False
async def TaskWorker(task : TaskBase): async def TaskWorker(task : TaskBase):
try: try:
@ -82,7 +97,7 @@ async def TaskWorker(task : TaskBase):
async def TaskManager(taskQueues : Dict[str, list[TaskBase]]): async def TaskManager(taskQueues : Dict[str, list[TaskBase]]):
# todo: 处理取消的情况 # todo: 处理取消的情况
while True: while True:
await asyncio.sleep(1) await asyncio.sleep(0.5)
for taskQueue in taskQueues.values(): for taskQueue in taskQueues.values():
notRunningTasks = [task for task in taskQueue if task.worker is None or task.worker.done()] notRunningTasks = [task for task in taskQueue if task.worker is None or task.worker.done()]
runningTasksNumber = len(taskQueue) - len(notRunningTasks) runningTasksNumber = len(taskQueue) - len(notRunningTasks)
@ -170,6 +185,7 @@ class PikPakFs:
if IsFile(node): if IsFile(node):
fileDownloadTask = FileDownloadTask(task.nodeId, task.id, self.NodeToPath(node, node)) fileDownloadTask = FileDownloadTask(task.nodeId, task.id, self.NodeToPath(node, node))
fileDownloadTask.handler = self._file_download_task_handler fileDownloadTask.handler = self._file_download_task_handler
fileDownloadTask.name = task.name
self._add_task(fileDownloadTask) self._add_task(fileDownloadTask)
elif IsDir(node): elif IsDir(node):
# 使用广度优先遍历 # 使用广度优先遍历
@ -184,7 +200,39 @@ class PikPakFs:
elif IsFile(child): elif IsFile(child):
fileDownloadTask = FileDownloadTask(childId, task.id, self.NodeToPath(child, node)) fileDownloadTask = FileDownloadTask(childId, task.id, self.NodeToPath(child, node))
fileDownloadTask.handler = self._file_download_task_handler fileDownloadTask.handler = self._file_download_task_handler
fileDownloadTask.name = task.name
self._add_task(fileDownloadTask) self._add_task(fileDownloadTask)
# 开始等待下载任务完成
while True:
fileDownloadTasks = self.taskQueues.get(FileDownloadTask.TAG)
myTasks = [myTask for myTask in fileDownloadTasks if myTask.PikPakTaskId == task.id]
allNumber = len(myTasks)
notCompletedNumber = 0
pausedNumber = 0
errorNumber = 0
for myTask in myTasks:
if myTask._status == TaskStatus.PAUSED:
pausedNumber += 1
if myTask._status == TaskStatus.ERROR:
errorNumber += 1
if myTask._status in {TaskStatus.PENDING, TaskStatus.RUNNING}:
notCompletedNumber += 1
runningNumber = allNumber - notCompletedNumber - pausedNumber - errorNumber
task.progress = f"{runningNumber}/{allNumber} ({pausedNumber}|{errorNumber})"
if notCompletedNumber > 0:
await asyncio.sleep(0.5)
continue
if errorNumber > 0:
raise Exception("file download failed")
if pausedNumber > 0:
raise asyncio.CancelledError()
break
task.status = PikPakTaskStatus.DONE
async def _pikpak_task_handler(self, task : PikPakTask): async def _pikpak_task_handler(self, task : PikPakTask):
while True: while True:
@ -198,32 +246,39 @@ class PikPakFs:
break break
async def _file_download_task_handler(self, task : FileDownloadTask): async def _file_download_task_handler(self, task : FileDownloadTask):
if random.randint(1, 5) == 2:
raise asyncio.CancelledError()
pass pass
def _add_task(self, task : TaskBase): def _add_task(self, task : TaskBase):
if self.taskQueues.get(task.tag) is None: if self.taskQueues.get(task.tag) is None:
self.taskQueues[task.tag] = [] self.taskQueues[task.tag] = []
taskQueue = self.taskQueues[task.tag] taskQueue = self.taskQueues[task.tag]
for t in taskQueue: for other in taskQueue:
if t.id == task.id: if other == task:
if other._status != TaskStatus.DONE:
other._status = TaskStatus.PENDING
return return
taskQueue.append(task) taskQueue.append(task)
async def StopTask(self, task : TaskBase): async def GetTaskById(self, taskId : str) -> TaskBase:
pass for taskQueue in self.taskQueues.values():
for task in taskQueue:
if task.id == taskId:
return task
return None
async def ResumeTask(self, task : TaskBase): async def StopTask(self, taskId : str):
pass task = await self.GetTaskById(taskId)
if task is not None and task._status in {TaskStatus.PENDING, TaskStatus.RUNNING}:
if task.worker is not None:
task.worker.cancel()
task.worker = None
async def RetryTask(self, taskId : str): async def ResumeTask(self, taskId : str):
if PikPakTask.TAG not in self.taskQueues: task = await self.GetTaskById(taskId)
return if task is not None and task._status in {TaskStatus.PAUSED, TaskStatus.ERROR}:
for task in self.taskQueues[PikPakTask.TAG]: task._status = TaskStatus.PENDING
if task.id == taskId and task._status == TaskStatus.ERROR:
task._status = TaskStatus.PENDING
break
elif task.id == taskId:
break
def Start(self): def Start(self):
return asyncio.create_task(TaskManager(self.taskQueues)) return asyncio.create_task(TaskManager(self.taskQueues))
@ -424,7 +479,8 @@ class PikPakFs:
async def Pull(self, node : FsNode) -> PikPakTask: async def Pull(self, node : FsNode) -> PikPakTask:
task = PikPakTask("", node.fatherId, node.id, PikPakTaskStatus.LOCAL_DOWNLOADING) task = PikPakTask("", node.fatherId, node.id, PikPakTaskStatus.LOCAL_DOWNLOADING)
task.handler = self._pikpak_local_downloading task.name = node.name
task.handler = self._pikpak_task_handler
self._add_task(task) self._add_task(task)
return task return task

36
main.py
View File

@ -7,6 +7,8 @@ import colorlog
from PikPakFs import PikPakFs, IsDir, IsFile, TaskStatus from PikPakFs import PikPakFs, IsDir, IsFile, TaskStatus
import os import os
from tabulate import tabulate from tabulate import tabulate
import wcwidth
import types
LogFormatter = colorlog.ColoredFormatter( LogFormatter = colorlog.ColoredFormatter(
"%(log_color)s%(asctime)s - %(levelname)s - %(name)s - %(message)s", "%(log_color)s%(asctime)s - %(levelname)s - %(name)s - %(message)s",
@ -61,6 +63,12 @@ class RunSync:
else: else:
return asyncio.run_coroutine_threadsafe(func(*args, **kwargs), MainLoop).result() return asyncio.run_coroutine_threadsafe(func(*args, **kwargs), MainLoop).result()
def __get__(self, instance, cls):
if instance is None:
return self
else:
return types.MethodType(self, instance)
class Console(cmd2.Cmd): class Console(cmd2.Cmd):
def _io_worker(self, loop): def _io_worker(self, loop):
asyncio.set_event_loop(loop) asyncio.set_event_loop(loop)
@ -165,7 +173,8 @@ class Console(cmd2.Cmd):
matches.append(text[:text.rfind(sonName)] + child.name) matches.append(text[:text.rfind(sonName)] + child.name)
matchesNode.append(child) matchesNode.append(child)
if len(matchesNode) == 1 and IsDir(matchesNode[0]): if len(matchesNode) == 1 and IsDir(matchesNode[0]):
matches[0] += "/" if matches[0] == sonName:
matches[0] += "/"
self.allow_appended_space = False self.allow_appended_space = False
self.allow_closing_quote = False self.allow_closing_quote = False
return matches return matches
@ -304,8 +313,8 @@ class Console(cmd2.Cmd):
if args.type == "pikpak": if args.type == "pikpak":
tasks = await Client.QueryPikPakTasks(TaskStatus(args.filter) if args.filter is not None else None) tasks = await Client.QueryPikPakTasks(TaskStatus(args.filter) if args.filter is not None else None)
# 格式化输出所有task信息idstatuslastStatus的信息输出表格 # 格式化输出所有task信息idstatuslastStatus的信息输出表格
table = [[task.id, task._status.value, task.status.value] for task in tasks] table = [[task.id, task._status.value, task.status.value, task.progress] for task in tasks]
headers = ["id", "status", "details"] headers = ["id", "status", "details", "progress"]
await self.Print(tabulate(table, headers, tablefmt="grid")) await self.Print(tabulate(table, headers, tablefmt="grid"))
elif args.type == "filedownload": elif args.type == "filedownload":
tasks = await Client.QueryFileDownloadTasks(TaskStatus(args.filter) if args.filter is not None else None) tasks = await Client.QueryFileDownloadTasks(TaskStatus(args.filter) if args.filter is not None else None)
@ -314,15 +323,24 @@ class Console(cmd2.Cmd):
headers = ["id", "status", "details", "path"] headers = ["id", "status", "details", "path"]
await self.Print(tabulate(table, headers, tablefmt="grid")) await self.Print(tabulate(table, headers, tablefmt="grid"))
retry_parser = cmd2.Cmd2ArgumentParser() taskid_parser = cmd2.Cmd2ArgumentParser()
retry_parser.add_argument("taskId", help="taskId") taskid_parser.add_argument("taskId", help="taskId")
@cmd2.with_argparser(retry_parser)
@cmd2.with_argparser(taskid_parser)
@RunSync @RunSync
async def do_retry(self, args): async def do_pause(self, args):
""" """
Retry a task Stop a task
""" """
await Client.RetryTask(args.taskId) await Client.StopTask(args.taskId)
@cmd2.with_argparser(taskid_parser)
@RunSync
async def do_resume(self, args):
"""
Resume a task
"""
await Client.ResumeTask(args.taskId)
async def mainLoop(): async def mainLoop():
global MainLoop, Client global MainLoop, Client