pikpakfs/PikPakFileSystem.py
2024-11-03 21:31:06 +08:00

363 lines
14 KiB
Python

import httpx
from pikpakapi import PikPakApi, DownloadStatus
from typing import Dict
from datetime import datetime
import json
import os
import logging
from typing import Any
class NodeBase:
def __init__(self, id : str, name : str, fatherId : str):
self.id = id
self.name = name
self._father_id = fatherId
self.lastUpdate : datetime = None
class DirNode(NodeBase):
def __init__(self, id : str, name : str, fatherId : str):
super().__init__(id, name, fatherId)
self.children_id : list[str] = []
class FileNode(NodeBase):
def __init__(self, id : str, name : str, fatherId : str):
super().__init__(id, name, fatherId)
self.url : str = None
class PikPakFileSystem:
#region 内部接口
def __init__(self, auth_cache_path : str = None, proxy_address : str = None, root_id : str = None):
# 初始化虚拟文件节点
self._nodes : Dict[str, NodeBase] = {}
self._root : DirNode = DirNode(root_id, "", None)
self._cwd : DirNode = self._root
# 初始化鉴权和代理信息
self._auth_cache_path : str = auth_cache_path
self.proxy_address : str = proxy_address
self._pikpak_client : PikPakApi = None
self._try_login_from_cache()
#region 鉴权信息相关
class PikPakToken:
def __init__(self, username : str, password : str, access_token : str, refresh_token : str, user_id : str):
self.username : str = username
self.password : str = password
self.access_token : str = access_token
self.refresh_token : str = refresh_token
self.user_id : str = user_id
def to_json(self):
return json.dumps(self.__dict__)
@classmethod
def from_json(cls, json_str):
data = json.loads(json_str)
return cls(**data)
def _init_client_by_token(self, token : PikPakToken) -> None:
self._init_client_by_username_and_password(token.username, token.password)
self._pikpak_client.access_token = token.access_token
self._pikpak_client.refresh_token = token.refresh_token
self._pikpak_client.user_id = token.user_id
self._pikpak_client.encode_token()
def _init_client_by_username_and_password(self, username : str, password : str) -> None:
httpx_client_args : Dict[str, Any] = None
if self.proxy_address != None:
httpx_client_args = {
"proxy": self.proxy_address,
"transport": httpx.AsyncHTTPTransport()
}
self._pikpak_client = PikPakApi(
username = username,
password = password,
httpx_client_args=httpx_client_args)
def _try_login_from_cache(self) -> None:
if self._auth_cache_path is None:
return
if not os.path.exists(self._auth_cache_path):
return
try:
with open(self._auth_cache_path, 'r', encoding='utf-8') as file:
content : str = file.read()
token : PikPakFileSystem.PikPakToken = PikPakFileSystem.PikPakToken.from_json(content)
self._init_client_by_token(token)
logging.info("successfully load login info from cache")
except Exception as e:
logging.error(f"failed to load login info from cache, exception occurred: {e}")
def _dump_login_info(self) -> None:
if self._auth_cache_path is None:
return
with open(self._auth_cache_path, 'w', encoding='utf-8') as file:
token : PikPakFileSystem.PikPakToken = PikPakFileSystem.PikPakToken(self._pikpak_client.username, self._pikpak_client.password, self._pikpak_client.access_token, self._pikpak_client.refresh_token, self._pikpak_client.user_id)
file.write(token.to_json())
logging.info("successfully dump login info to cache")
#endregion
#region 文件系统相关
class PathWalker():
def __init__(self, path : str, sep : str = "/"):
self._path_spots : list[str] = []
if not path.startswith(sep):
self._path_spots.append(".")
path_spots : list[str] = path.split(sep)
self._path_spots.extend(path_spots)
def IsAbsolute(self) -> bool:
return len(self._path_spots) == 0 or self._path_spots[0] != "."
def AppendSpot(self, spot) -> None:
self._path_spots.append(spot)
def Walk(self) -> list[str]:
return self._path_spots
async def _get_node_by_id(self, id : str) -> NodeBase:
if id == self._root.id:
return self._root
if id not in self._nodes:
return None
return self._nodes[id]
async def _get_father_node(self, node : NodeBase) -> NodeBase:
if node is self._root:
return self._root
return await self._get_node_by_id(node._father_id)
async def _add_node(self, node : NodeBase) -> None:
self._nodes[node.id] = node
father = await self._get_father_node(node)
if father is not None and isinstance(father, DirNode):
father.children_id.append(node.id)
async def _remove_node(self, node : NodeBase) -> None:
father = await self._get_father_node(node)
if father is not None and isinstance(father, DirNode):
father.children_id.remove(node.id)
self._nodes.pop(node.id)
async def _find_child_in_dir_by_name(self, dir : DirNode, name : str) -> NodeBase:
if dir is self._root and name == "":
return self._root
for child_id in dir.children_id:
node = await self._get_node_by_id(child_id)
if node.name == name:
return node
return None
async def _refresh(self, node : NodeBase):
if isinstance(node, DirNode):
if node.lastUpdate != None:
return
next_page_token : str = None
children_info : list[Dict[str, Any]] = []
while True:
dir_info : Dict[str, Any] = await self._pikpak_client.file_list(parent_id = node.id, next_page_token=next_page_token)
next_page_token = dir_info["next_page_token"]
children_info.extend(dir_info["files"])
if next_page_token is None or next_page_token == "":
break
node.children_id.clear()
for child_info in children_info:
id : str = child_info["id"]
name : str = child_info["name"]
child : NodeBase = await self._get_node_by_id(id)
if child is None:
if child_info["kind"].endswith("folder"):
child = DirNode(id, name, node.id)
else:
child = FileNode(id, name, node.id)
child.name = name
child._father_id = node.id
await self._add_node(child)
elif isinstance(node, FileNode):
result = await self._pikpak_client.get_download_url(node.id)
node.url = result["web_content_link"]
node.lastUpdate = datetime.now()
async def _path_to_node(self, path : str) -> NodeBase:
father, son_name = await self._path_to_father_node_and_son_name(path)
if son_name == "":
return father
if isinstance(father, DirNode):
return await self._find_child_in_dir_by_name(father, son_name)
return None
async def _path_to_father_node_and_son_name(self, path : str) -> tuple[NodeBase, str]:
path_walker : PikPakFileSystem.PathWalker = PikPakFileSystem.PathWalker(path)
father : NodeBase = None
son_name : str = None
current : NodeBase = self._root if path_walker.IsAbsolute() else self._cwd
for spot in path_walker.Walk():
if current is None:
father = None
break
if spot == "..":
current = await self._get_father_node(current)
continue
father = current
if not isinstance(current, DirNode):
current = None
continue
await self._refresh(current)
if spot == ".":
continue
sonName = spot
current = await self._find_child_in_dir_by_name(current, spot)
if current != None:
father = await self._get_father_node(current)
sonName = current.name
return father, sonName
async def _node_to_path(self, node : NodeBase, root : NodeBase = None) -> str:
if root is None:
root = self._root
if node is root:
return "/"
spots : list[str] = []
current = node
while current is not root:
spots.append(current.name)
current = await self._get_father_node(current)
spots.append("")
return "/".join(reversed(spots))
async def _is_ancestors_of(self, node_a : NodeBase, node_b : NodeBase) -> bool:
if node_b is node_a:
return False
if node_a is self._root:
return True
while node_b._father_id != self._root.id:
node_b = await self._get_father_node(node_b)
if node_b is node_a:
return True
return False
#endregion
#endregion
#region 对外接口
async def Login(self, username : str = None, password : str = None) -> None:
if self._pikpak_client != None and username is None and password is None:
username = self._pikpak_client.username
password = self._pikpak_client.password
if username == None and password == None:
raise Exception("Username and password are required")
self._init_client_by_username_and_password(username, password)
await self._pikpak_client.login()
self._dump_login_info()
async def IsDir(self, path : str) -> bool:
node = await self._path_to_node(path)
return isinstance(node, DirNode)
async def SplitPath(self, path : str) -> tuple[str, str]:
father, son_name = await self._path_to_father_node_and_son_name(path)
return await self._node_to_path(father), son_name
async def GetFileUrlByNodeId(self, node_id : str) -> str:
node = await self._get_node_by_id(node_id)
if not isinstance(node, FileNode):
return None
await self._refresh(node)
return node.url
async def GetFileUrlByPath(self, path : str) -> str:
node = await self._path_to_node(path)
if not isinstance(node, FileNode):
return None
await self._refresh(node)
return node.url
async def GetChildrenNames(self, path : str, ignore_files : bool) -> list[str]:
node = await self._path_to_node(path)
if not isinstance(node, DirNode):
return []
await self._refresh(node)
children_names : list[str] = []
for child_id in node.children_id:
child = await self._get_node_by_id(child_id)
if ignore_files and isinstance(child, FileNode):
continue
children_names.append(child.name)
return children_names
async def Delete(self, paths : list[str]) -> None:
nodes = [await self._path_to_node(path) for path in paths]
for node in nodes:
if await self._is_ancestors_of(node, self._cwd):
raise Exception("Cannot delete ancestors")
await self._pikpak_client.delete_to_trash([node.id for node in nodes])
for node in nodes:
await self._remove_node(node)
async def MakeDir(self, path : str) -> None:
father, son_name = await self._path_to_father_node_and_son_name(path)
result = await self._pikpak_client.create_folder(son_name, father.id)
id = result["file"]["id"]
name = result["file"]["name"]
son = DirNode(id, name, father.id)
await self._add_node(son)
async def SetCwd(self, path : str) -> None:
node = await self._path_to_node(path)
if not isinstance(node, DirNode):
raise Exception("Not a directory")
self._cwd = node
async def GetCwd(self) -> str:
return await self._node_to_path(self._cwd)
async def GetChildren(self, node : NodeBase) -> list[NodeBase]:
if not isinstance(node, DirNode):
return []
await self._refresh(node)
return [await self._get_node_by_id(child_id) for child_id in node.children_id]
async def PathToNode(self, path : str) -> NodeBase:
node = await self._path_to_node(path)
if node is None:
return None
return node
async def NodeToPath(self, from_node : NodeBase, to_node : NodeBase) -> str:
return await self._node_to_path(to_node, from_node)
async def RemoteDownload(self, torrent : str, remote_base_path : str) -> tuple[str, str]:
node = await self._path_to_node(remote_base_path)
info = await self._pikpak_client.offline_download(torrent, node.id)
return info["task"]["file_id"], info["task"]["id"]
async def QueryTaskStatus(self, task_id : str, node_id : str) -> DownloadStatus:
return await self._pikpak_client.get_task_status(task_id, node_id)
async def UpdateNode(self, node_id : str) -> NodeBase:
node : NodeBase = await self._get_node_by_id(node_id)
if node is None:
info = await self._pikpak_client.offline_file_info(node_id)
kind = info["kind"]
parent_id = info["parent_id"]
name = info["name"]
if kind.endswith("folder"):
node = DirNode(node_id, name, parent_id)
else:
node = FileNode(node_id, name, parent_id)
await self._add_node(node)
node.lastUpdate = None
return node
#endregion