调整task队列

This commit is contained in:
limil 2024-11-01 09:26:40 +08:00
parent 2865964468
commit 0762ec9952
2 changed files with 70 additions and 44 deletions

View File

@ -299,6 +299,7 @@ class Console(cmd2.Cmd):
async def mainLoop(): async def mainLoop():
global MainLoop, Client global MainLoop, Client
MainLoop = asyncio.get_running_loop() MainLoop = asyncio.get_running_loop()
Client.Start()
console = Console() console = Console()
console.preloop() console.preloop()

View File

@ -16,36 +16,37 @@ class DownloadTaskStatus(Enum):
stopped = "stopped" stopped = "stopped"
class PKTaskStatus(Enum): class PKTaskStatus(Enum):
pending_offline_download = "pending" pending = "pending"
offline_downloading = "remote_downloading" remote_downloading = "remote_downloading"
pending_download = "pending_for_download"
downloading = "downloading" downloading = "downloading"
done = "done" done = "done"
error = "error" error = "error"
stopped = "stopped" stopped = "stopped"
class PkTask: class PkTask:
id = 0 _id = 0
def __init__(self, torrent : str, toDirId : str, status : PKTaskStatus = PKTaskStatus.pending_offline_download):
PkTask.id += 1 def __init__(self, torrent : str, toDirId : str, status : PKTaskStatus = PKTaskStatus.pending):
self.taskId = PkTask.id PkTask._id += 1
self.status = status self.id = PkTask._id
self.status = PKTaskStatus.pending
self.recoverStatus = status self.recoverStatus = status
self.name : str = ""
self.runningTask : asyncio.Task = None self.runningTask : asyncio.Task = None
self.name = ""
self.toDirId = toDirId self.toDirId = toDirId
self.nodeId : str = None self.nodeId : str = None
self.torrent = torrent self.torrent = torrent # todo: 将torrent的附加参数去掉再加入
self.url = None self.remoteTaskId : str = None
self.pkTaskId = None
class DownloadTask: class DownloadTask:
def __init__(self, nodeId : str, pkTaskId : str, status : DownloadTaskStatus = DownloadTaskStatus.pending): def __init__(self, nodeId : str, pkTaskId : str, status : DownloadTaskStatus = DownloadTaskStatus.pending):
self.status = status self.status = DownloadStatus.pending
self.recoverStatus = status self.recoverStatus = status
self.pkTaskId = pkTaskId self.pkTaskId = pkTaskId
self.nodeId = nodeId self.nodeId = nodeId
self.runningTask : asyncio.Task = None
class PathWalker(): class PathWalker():
def __init__(self, pathStr : str, sep : str = "/"): def __init__(self, pathStr : str, sep : str = "/"):
@ -104,35 +105,35 @@ class PkToken:
return cls(**data) return cls(**data)
class PKFs: class PKFs:
async def RetryTask(self, taskId : int): MAX_PIKPAK_TASKS = 5
task = self.tasks[taskId] MAX_DOWNLOAD_TASKS = 5
if task == None or task.status != PKTaskStatus.error:
return
task.status = task.recoverStatus
self.RunTask(task)
async def _task_pending(self, task : PkTask): async def _pktask_pending(self, task : PkTask):
if task.recoverStatus != PKTaskStatus.pending:
task.status = task.recoverStatus
return
pkTask = await self.client.offline_download(task.torrent, task.toDirId) pkTask = await self.client.offline_download(task.torrent, task.toDirId)
task.pkTaskId = pkTask["task"]["id"] task.remoteTaskId = pkTask["task"]["id"]
task.nodeId = pkTask["task"]["file_id"] task.nodeId = pkTask["task"]["file_id"]
task.name = pkTask["task"]["name"] task.name = pkTask["task"]["name"]
task.status = PKTaskStatus.offline_downloading task.status = PKTaskStatus.remote_downloading
async def _task_offline_downloading(self, task : PkTask): async def _pktask_offline_downloading(self, task : PkTask):
waitTime = 3 waitTime = 3
while True: while True:
await asyncio.sleep(waitTime) await asyncio.sleep(waitTime)
status = await self.client.get_task_status(task.pkTaskId, task.nodeId) status = await self.client.get_task_status(task.remoteTaskId, task.nodeId)
if status == DownloadStatus.not_found or status == DownloadStatus.not_found or status == DownloadStatus.error: if status in {DownloadStatus.not_found, DownloadStatus.not_downloading, DownloadStatus.error}:
task.recoverStatus = PKTaskStatus.pending_offline_download task.recoverStatus = PKTaskStatus.pending
task.status = PKTaskStatus.error task.status = PKTaskStatus.error
break break
elif status == DownloadStatus.done: elif status == DownloadStatus.done:
fileInfo = await self.client.offline_file_info(file_id=task.nodeId) fileInfo = await self.client.offline_file_info(file_id=task.nodeId)
if self.GetNodeById(task.nodeId) is not None: node = self.GetNodeById(task.nodeId)
oldFather = self.GetFatherNode(task.nodeId) if node is not None:
oldFather = self.GetFatherNode(node)
if oldFather is not None: if oldFather is not None:
oldFather.childrenId.remove(task.nodeId) oldFather.childrenId.remove(node.id)
task.toDirId = fileInfo["parent_id"] task.toDirId = fileInfo["parent_id"]
task.name = fileInfo["name"] task.name = fileInfo["name"]
@ -144,33 +145,43 @@ class PKFs:
father = self.GetNodeById(task.toDirId) father = self.GetNodeById(task.toDirId)
if father.id is not None: if father.id is not None:
father.childrenId.append(task.nodeId) father.childrenId.append(task.nodeId)
task.status = PKTaskStatus.pending_download task.status = PKTaskStatus.downloading
break break
waitTime = waitTime * 1.5 waitTime = waitTime * 1.5
async def _task_worker(self, task : PkTask): async def _pktask_worker(self, task : PkTask):
while task.status not in {PKTaskStatus.done, PKTaskStatus.error, PKTaskStatus.stopped}: while task.status not in {PKTaskStatus.done, PKTaskStatus.error, PKTaskStatus.stopped}:
try: try:
if task.status == PKTaskStatus.pending_offline_download: if task.status == PKTaskStatus.pending:
await self._task_pending(task) await self._pktask_pending(task)
elif task.status == PKTaskStatus.offline_downloading: elif task.status == PKTaskStatus.remote_downloading:
await self._task_offline_downloading(task) await self._pktask_offline_downloading(task)
elif task.status == PKTaskStatus.pending_download: elif task.status == PKTaskStatus.downloading:
task.status = PKTaskStatus.done task.status = PKTaskStatus.done
else: else:
break break
except asyncio.CancelledError:
task.recoverStatus = task.status
task.status = PKTaskStatus.stopped
except Exception as e: except Exception as e:
logging.error(f"task failed, exception occurred: {e}") logging.error(f"task failed, exception occurred: {e}")
task.recoverStatus = task.status task.recoverStatus = task.status
task.status = PKTaskStatus.error task.status = PKTaskStatus.error
async def _pktask_manager(self):
while True:
await asyncio.sleep(1)
runningTasksNum = 0
notRunningTasks = [task for task in self.tasks if task.runningTask is None or task.runningTask.done()]
if len(self.tasks) - len(notRunningTasks) >= PKFs.MAX_PIKPAK_TASKS:
continue
for task in [task for task in notRunningTasks if task.status == PKTaskStatus.pending]:
task.runningTask = asyncio.create_task(self._pktask_worker(task))
runningTasksNum += 1
if runningTasksNum >= PKFs.MAX_PIKPAK_TASKS:
break
def RunTask(self, task : PkTask):
self.tasks.append(task)
if task.runningTask is None or task.runningTask.done():
task.runningTask = asyncio.create_task(self._task_worker(task))
def __init__(self, loginCachePath : str = None, proxy : str = None, rootId = None): def __init__(self, loginCachePath : str = None, proxy : str = None, rootId = None):
self.nodes : Dict[str, FsNode] = {} self.nodes : Dict[str, FsNode] = {}
self.root = DirNode(rootId, "", None) self.root = DirNode(rootId, "", None)
@ -232,6 +243,20 @@ class PKFs:
return True return True
return False return False
async def StopTask(self, taskId : int):
pass
async def ResumeTask(self, taskId : int):
pass
async def RetryTask(self, taskId : int):
task = next((t for t in self.tasks if t.id == taskId), None)
if task and task.status == PKTaskStatus.error:
task.status = PKTaskStatus.pending
def Start(self):
asyncio.create_task(self._pktask_manager())
def GetNodeById(self, id : str) -> FsNode: def GetNodeById(self, id : str) -> FsNode:
if id == self.root.id: if id == self.root.id:
return self.root return self.root
@ -359,7 +384,7 @@ class PKFs:
async def Download(self, url : str, dirNode : DirNode) -> PkTask : async def Download(self, url : str, dirNode : DirNode) -> PkTask :
task = PkTask(url, dirNode.id) task = PkTask(url, dirNode.id)
self.RunTask(task) self.tasks.append(task)
return task return task
async def QueryTasks(self, filterByStatus : PKTaskStatus = None) -> list[PkTask]: async def QueryTasks(self, filterByStatus : PKTaskStatus = None) -> list[PkTask]: