更新cmd2

This commit is contained in:
limil 2024-10-27 19:41:53 +08:00
parent ae6c4cc064
commit 3f79bff3ea
3 changed files with 208 additions and 37 deletions

192
main.py
View File

@ -1,21 +1,191 @@
import asyncio, nest_asyncio
import cmd2
import sys
from pikpakFs import PKVirtFs, VirtFsNode, DirNode, FileNode
from functools import wraps
from aioconsole import ainput, aprint
import logging
import colorlog
from pikpakFs import VirtFsNode, DirNode, FileNode, PKVirtFs
import os
def RunSync(func):
@wraps(func)
def decorated(*args, **kwargs):
return asyncio.get_event_loop().run_until_complete(func(*args, **kwargs))
return decorated
def ProvideDecoratorSelfArgs(decorator, argsProvider):
def wrapper(func):
@wraps(func)
def decorated(*args, **kwargs):
namespace = args[0]
return decorator(argsProvider(namespace))(func)(*args, **kwargs)
return decorated
return wrapper
class PikpakConsole(cmd2.Cmd):
def _SetupLogging(self):
formatter = colorlog.ColoredFormatter(
"%(log_color)s%(asctime)s - %(levelname)s - %(name)s - %(message)s",
datefmt='%Y-%m-%d %H:%M:%S',
reset=True,
log_colors={
'DEBUG': 'cyan',
'INFO': 'green',
'WARNING': 'yellow',
'ERROR': 'red',
'CRITICAL': 'red,bg_white',
}
)
handler = logging.StreamHandler()
handler.setFormatter(formatter)
logger = logging.getLogger()
logger.addHandler(handler)
logger.setLevel(logging.INFO)
class PKApp(cmd2.Cmd):
def __init__(self):
super().__init__()
self.fs = PKVirtFs(loginCachePath = "token.json", proxy = "http://127.0.0.1:10808")
self._SetupLogging()
self.client = PKVirtFs("token.json", proxy="http://127.0.0.1:7897")
async def Run(self):
saved_readline_settings = None
try:
# Get sigint protection while we set up readline for cmd2
with self.sigint_protection:
saved_readline_settings = self._set_up_cmd2_readline()
stop = False
while not stop:
# Get sigint protection while we read the command line
line = await asyncio.to_thread(self._read_command_line, self.prompt)
# Run the command along with all associated pre and post hooks
stop = self.onecmd_plus_hooks(line)
finally:
# Get sigint protection while we restore readline settings
with self.sigint_protection:
if saved_readline_settings is not None:
self._restore_readline(saved_readline_settings)
def do_debug(self, args):
"""
Enable debug mode
"""
logging.getLogger().setLevel(logging.DEBUG)
logging.debug("Debug mode enabled")
def do_debugoff(self, args):
"""
Disable debug mode
"""
logging.getLogger().setLevel(logging.INFO)
logging.info("Debug mode disabled")
login_parser = cmd2.Cmd2ArgumentParser()
login_parser.add_argument("username", help="username", nargs="?")
login_parser.add_argument("password", help="password", nargs="?")
@RunSync
@cmd2.with_argparser(login_parser)
async def do_login(self, args):
if len(args) < 2:
await self.fs.Login()
else:
await self.fs.Login(args[0], args[1])
"""
Login to pikpak
"""
await self.client.Login(args.username, args.password)
await aprint("Logged in successfully")
def PathParserProvider(self):
@RunSync
async def PathToNode(path):
path = await self.client.PathToNode(path)
if path is None:
raise ValueError("Invalid path")
return path
path_parser = cmd2.Cmd2ArgumentParser()
path_parser.add_argument("path", help="path", default="", nargs="?", type=PathToNode)
return path_parser
WithPathParser = ProvideDecoratorSelfArgs(cmd2.with_argparser, PathParserProvider)
@RunSync
async def PathCompleter(self, text, line, begidx, endidx, filterFiles):
father, sonName = await self.client.PathToFatherNodeAndNodeName(text)
fatherDir = self.client.ToDir(father)
if fatherDir is None:
return []
matches = []
matchesNode = []
for childId in fatherDir.childrenId:
node = self.client.nodes[childId]
if filterFiles and isinstance(node, FileNode):
continue
if node.name.startswith(sonName):
self.display_matches.append(node.name)
if sonName == "":
matches.append(text + node.name)
elif text.endswith(sonName):
matches.append(text[:text.rfind(sonName)] + node.name)
matchesNode.append(node)
if len(matchesNode) == 1 and self.client.ToDir(matchesNode[0]) is not None:
matches[0] += "/"
self.allow_appended_space = False
self.allow_closing_quote = False
return matches
def complete_ls(self, text, line, begidx, endidx):
return self.PathCompleter(text, line, begidx, endidx, filterFiles = False)
@RunSync
@WithPathParser
async def do_ls(self, args):
"""
List files in a directory
"""
if isinstance(args.path, DirNode):
for childId in args.path.childrenId:
node = self.client.nodes[childId]
await aprint(node.name)
elif isinstance(args.path, FileNode):
await aprint(args.path.name)
else:
await aprint("Invalid path")
def complete_cd(self, text, line, begidx, endidx):
return self.PathCompleter(text, line, begidx, endidx, filterFiles = True)
@RunSync
@WithPathParser
async def do_cd(self, args):
"""
Change directory
"""
if self.client.ToDir(args.path) is None:
await aprint("Invalid directory")
return
self.client.currentLocation = args.path
@RunSync
async def do_cwd(self, args):
"""
Print current working directory
"""
await aprint(self.client.NodeToPath(self.client.currentLocation))
def do_clear(self, args):
"""
Clear the terminal screen
"""
os.system('cls' if os.name == 'nt' else 'clear')
if __name__ == '__main__':
app = PKApp()
sys.exit(app.cmdloop())
if __name__ == "__main__":
nest_asyncio.apply()
prog = PikpakConsole()
asyncio.run(prog.Run())

View File

@ -11,10 +11,9 @@ import logging
class PathWalker():
def __init__(self, pathStr : str, sep : str = "/"):
self.__pathSpots : list[str] = []
pathStr = pathStr.strip()
if not pathStr.startswith(sep):
self.__pathSpots.append(".")
pathSpots = [spot.strip() for spot in pathStr.split(sep) if spot.strip() != ""]
pathSpots = pathStr.split(sep)
self.__pathSpots.extend(pathSpots)
def IsAbsolute(self) -> bool:
@ -91,7 +90,7 @@ class PKVirtFs:
httpx_client_args=httpx_client_args)
def __TryLoginFromCache(self):
if self.loginCachePath == None:
if self.loginCachePath is None:
return
if not os.path.exists(self.loginCachePath):
return
@ -102,7 +101,7 @@ class PKVirtFs:
logging.info("successfully load login info from cache")
def __DumpLoginInfo(self):
if self.loginCachePath == None:
if self.loginCachePath is None:
return
with open(self.loginCachePath, 'w', encoding='utf-8') as file:
token = PikpakToken(self.client.username, self.client.password, self.client.access_token, self.client.refresh_token, self.client.user_id)
@ -131,7 +130,7 @@ class PKVirtFs:
return None
def GetFatherNode(self, node : VirtFsNode) -> VirtFsNode:
if node.fatherId == None:
if node.fatherId is None:
return self.root
return self.nodes[node.fatherId]
@ -166,11 +165,13 @@ class PKVirtFs:
async def PathToNode(self, pathStr : str) -> VirtFsNode:
father, sonName = await self.PathToFatherNodeAndNodeName(pathStr)
if sonName == "":
return father
fatherDir = self.ToDir(father)
if fatherDir == None:
if fatherDir is None:
return None
return self.FindChildInDirByName(father, sonName)
async def PathToFatherNodeAndNodeName(self, pathStr : str) -> tuple[VirtFsNode, str]:
pathWalker = PathWalker(pathStr)
father : VirtFsNode = None
@ -178,7 +179,7 @@ class PKVirtFs:
current = self.root if pathWalker.IsAbsolute() else self.currentLocation
for spot in pathWalker.Walk():
if current == None:
if current is None:
father = None
break
if spot == "..":
@ -186,10 +187,10 @@ class PKVirtFs:
continue
father = current
currentDir = self.ToDir(current)
if currentDir == None:
if currentDir is None:
current = None
continue
if currentDir.lastUpdate == None:
if currentDir.lastUpdate is None:
await self.RefreshDirectory(currentDir)
if spot == ".":
continue
@ -198,19 +199,21 @@ class PKVirtFs:
if current != None:
currentDir = self.ToDir(current)
if currentDir != None and currentDir.lastUpdate == None:
if currentDir != None and currentDir.lastUpdate is None:
await self.RefreshDirectory(currentDir)
father = self.GetFatherNode(current)
sonName = current.name
return father, sonName
async def NodeToPath(self, node : VirtFsNode) -> str:
spots : list[str] = [""]
def NodeToPath(self, node : VirtFsNode) -> str:
if node is self.root:
return "/"
spots : list[str] = []
current = node
while current.id != None:
spots.append(current.name)
if current.fatherId == None:
if current.fatherId is None:
break
current = self.nodes[current.fatherId]
spots.append("")
@ -222,18 +225,12 @@ class PKVirtFs:
return self.ToDir(self.FindChildInDirByName(node, name))
async def Login(self, username : str = None, password : str = None) -> None:
if self.client != None and username == None and password == None:
if self.client != None and username is None and password is None:
username = self.client.username
password = self.client.password
if self.client != None and self.client.username == username and self.client.password == password:
logging.info("Already login, try refresh token")
try:
await self.client.refresh_access_token()
self.__DumpLoginInfo()
return
except Exception:
logging.info("Refresh access token failed! Try relogin")
if username == None and password == None:
raise Exception("Username and password are required")
self.__InitClientByUsernamePassword(username, password)
await self.client.login()
@ -246,14 +243,14 @@ class PKVirtFs:
async def Download(self, url : str, dirNode : DirNode = None) -> None :
# 默认创建在当前目录下
# todo: 完善离线下载task相关
if dirNode == None:
if dirNode is None:
dirNode = self.currentLocation
await self.client.offline_download(url, dirNode.id)
async def Delete(self, node : VirtFsNode) -> None:
father = self.GetFatherNode(node)
fatherDir = self.ToDir(father)
if fatherDir == None:
if fatherDir is None:
raise Exception('Failed to locate')
if self.currentLocation is node or self.__IsAncestorsOf(node, self.currentLocation):
raise Exception('Delete self or ancestor is not allowed')

View File

@ -1,3 +1,7 @@
在Pikpak Api基础上套了一层文件系统更好自动化离线下载
运行: python main.py
运行: python main.py
Todo:
- [ ] 实现自定义根路径