移除tui,计划使用cmd2来实现交互

This commit is contained in:
limil 2024-10-23 09:14:06 +08:00
parent bfde0982bd
commit ae6c4cc064
4 changed files with 78 additions and 342 deletions

194
main.py
View File

@ -1,189 +1,21 @@
from textual import events import cmd2
from textual.app import App, ComposeResult
from textual.widgets import Input, Log
from textual.containers import Horizontal, Vertical, Widget
from collections import deque
import sys import sys
import asyncio from pikpakFs import PKVirtFs, VirtFsNode, DirNode, FileNode
import argparse
import pikpakFs
import logging
import functools
class TextualLogHandler(logging.Handler):
def __init__(self, log_widget: Log): class PKApp(cmd2.Cmd):
def __init__(self):
super().__init__() super().__init__()
self.log_widget = log_widget self.fs = PKVirtFs(loginCachePath = "token.json", proxy = "http://127.0.0.1:10808")
def emit(self, record): async def do_login(self, args):
message = self.format(record) if len(args) < 2:
self.log_widget.write_line(message) await self.fs.Login()
class HistoryInput(Input):
def __init__(self, placeholder: str = "", max_history: int = 20, *args, **kwargs):
super().__init__(placeholder=placeholder, *args, **kwargs)
self.block_input = False
self.history = deque(maxlen=max_history) # 历史记录列表
self.history_view = list()
self.history_index = -1 # 当前历史索引,初始为 -1
self.history_log = Log(auto_scroll=False) # 用于显示历史记录的日志小部件
def widget(self) -> Widget:
return Vertical(self, self.history_log)
def reverseIdx(self, idx) -> int:
return len(self.history) - 1 - idx
async def on_key(self, event: events.Key) -> None:
if self.block_input:
return
if event.key == "up":
if self.history_index == -1:
self.cursor_position = len(self.value)
await self.update_history_view()
return
self.history_index = max(0, self.history_index - 1)
elif event.key == "down":
self.history_index = min(len(self.history) - 1, self.history_index + 1)
else: else:
self.history_index = -1 await self.fs.Login(args[0], args[1])
await self.update_history_view()
return
if len(self.history) > 0 and self.history_index != -1:
self.value = self.history[self.reverseIdx(self.history_index)]
self.cursor_position = len(self.value)
await self.update_history_view()
async def on_input_submitted(self, event: Input.Submitted) -> None:
user_input = event.value.strip()
if user_input:
self.history.append(user_input)
self.history_index = -1
self.value = ""
await self.update_history_view()
async def update_history_view(self):
self.history_log.clear()
self.history_view.clear()
if self.history:
for idx, item in enumerate(self.history):
prefix = "> " if self.reverseIdx(idx) == self.history_index else " "
self.history_view.append(f"{prefix}{item}")
self.history_log.write_lines(reversed(self.history_view))
scroll_height = self.history_log.scrollable_size.height
scroll_start = self.history_log.scroll_offset.y
current = self.history_index
if current < scroll_start:
scroll_idx = min(max(0, current), len(self.history) - 1)
self.history_log.scroll_to(y = scroll_idx)
elif current >= scroll_start + scroll_height - 1:
self.history_log.scroll_to(y = current - scroll_height + 1)
self.refresh()
async def animate_ellipsis(self):
ellipsis = ""
try:
while True:
# 循环添加省略号最多3个点
if len(ellipsis) < 3:
ellipsis += "."
else:
ellipsis = ""
self.value = f"Waiting{ellipsis}"
await asyncio.sleep(0.5)
finally:
self.value = ""
pass
async def wait_for(self, operation):
self.disabled = True
self.block_input = True
animation_task = asyncio.create_task(self.animate_ellipsis())
await operation()
animation_task.cancel()
self.disabled = False
self.block_input = False
self.focus()
class InputLoggerApp(App):
CSS = """
.divider {
width: 0.5%;
height: 100%;
background: #444444;
}
.log {
width: 80%;
height: 100%;
}
"""
def setup_logger(self) -> None: if __name__ == '__main__':
formatStr = '%(asctime)s - %(name)s - %(levelname)s - %(message)s' app = PKApp()
sys.exit(app.cmdloop())
logging.basicConfig(
filename='app.log',
filemode='a',
format=formatStr
)
logHandler = TextualLogHandler(self.log_widget)
# 设置日志格式
logHandler.setFormatter(logging.Formatter(formatStr))
# 获取根日志记录器,并添加自定义处理器
root_logger = logging.getLogger()
root_logger.setLevel(logging.INFO)
root_logger.addHandler(logHandler)
def write_to_console(self, content) -> None:
self.log_widget.write_line(content)
def compose(self) -> ComposeResult:
self.input_widget = HistoryInput(placeholder="Input Command...")
self.log_widget = Log(classes="log", highlight=True)
left_panel = self.input_widget.widget()
right_panel = self.log_widget
divider = Vertical(classes="divider")
yield Horizontal(left_panel, divider, right_panel)
def on_mount(self) -> None:
self.setup_logger()
self.fs = pikpakFs.VirtFs(loginCachePath = "token.json", proxy = "http://127.0.0.1:7897")
async def handle_command(self, command) -> None:
try:
if command == "clear":
self.log_widget.clear()
elif command == "exit":
sys.exit(0)
elif command == "debug":
logger = logging.getLogger()
logger.setLevel(logging.DEBUG)
self.write_to_console("Done")
else:
self.write_to_console(await self.fs.HandlerCommand(command))
except Exception as e:
logging.exception(e)
async def on_input_submitted(self, event: Input.Submitted) -> None:
if event.input is not self.input_widget:
return
user_input = event.value.strip()
self.write_to_console(f"> {user_input}")
await self.input_widget.wait_for(functools.partial(self.handle_command, user_input))
if __name__ == "__main__":
app = InputLoggerApp()
app.run()

View File

@ -31,17 +31,17 @@ class VirtFsNode:
self.id = id self.id = id
self.name = name self.name = name
self.fatherId = fatherId self.fatherId = fatherId
self.lastUpdate : datetime = None
class DirNode(VirtFsNode): class DirNode(VirtFsNode):
def __init__(self, id : str, name : str, fatherId : str): def __init__(self, id : str, name : str, fatherId : str):
super().__init__(id, name, fatherId) super().__init__(id, name, fatherId)
self.childrenId : list[str] = [] self.childrenId : list[str] = []
self.lastUpdate : datetime = None
class FileNode(VirtFsNode): class FileNode(VirtFsNode):
def __init__(self, id : str, name : str, fatherId : str): def __init__(self, id : str, name : str, fatherId : str):
super().__init__(id, name, fatherId) super().__init__(id, name, fatherId)
self.lastUpdate : datetime = None self.url : str = None
class PikpakToken: class PikpakToken:
def __init__(self, username, password, access_token, refresh_token, user_id): def __init__(self, username, password, access_token, refresh_token, user_id):
@ -59,20 +59,7 @@ class PikpakToken:
data = json.loads(json_str) data = json.loads(json_str)
return cls(**data) return cls(**data)
class VirtFs: class PKVirtFs:
def __CalcMd5(self, text : str):
return md5(text.encode()).hexdigest()
def __ToDir(self, node : VirtFsNode) -> DirNode:
if isinstance(node, DirNode):
return node
return None
def __ToFile(self, node : VirtFsNode) -> FileNode:
if isinstance(node, FileNode):
return node
return None
def __init__(self, loginCachePath : str = None, proxy : str = None): def __init__(self, loginCachePath : str = None, proxy : str = None):
self.nodes : Dict[str, VirtFsNode] = {} self.nodes : Dict[str, VirtFsNode] = {}
self.root = DirNode(None, "", None) self.root = DirNode(None, "", None)
@ -133,7 +120,31 @@ class VirtFs:
return True return True
return False return False
async def __RefreshDirectory(self, dirNode : DirNode): def ToDir(self, node : VirtFsNode) -> DirNode:
if isinstance(node, DirNode):
return node
return None
def ToFile(self, node : VirtFsNode) -> FileNode:
if isinstance(node, FileNode):
return node
return None
def GetFatherNode(self, node : VirtFsNode) -> VirtFsNode:
if node.fatherId == None:
return self.root
return self.nodes[node.fatherId]
def FindChildInDirByName(self, dir : DirNode, name : str):
if dir is self.root and name == "":
return self.root
for childId in dir.childrenId:
node = self.nodes[childId]
if name == node.name:
return node
return None
async def RefreshDirectory(self, dirNode : DirNode):
dirInfo = await self.client.file_list(parent_id = dirNode.id) dirInfo = await self.client.file_list(parent_id = dirNode.id)
nodes = dirInfo["files"] nodes = dirInfo["files"]
dirNode.childrenId.clear() dirNode.childrenId.clear()
@ -153,23 +164,14 @@ class VirtFs:
dirNode.childrenId.append(id) dirNode.childrenId.append(id)
dirNode.lastUpdate = datetime.now() dirNode.lastUpdate = datetime.now()
def __FindChildInDirByName(self, dir : DirNode, name : str): async def PathToNode(self, pathStr : str) -> VirtFsNode:
if dir is self.root and name == "": father, sonName = await self.PathToFatherNodeAndNodeName(pathStr)
return self.root fatherDir = self.ToDir(father)
for childId in dir.childrenId:
node = self.nodes[childId]
if name == node.name:
return node
return None
async def __PathToNode(self, pathStr : str) -> VirtFsNode:
father, sonName = await self.__PathToFatherNodeAndNodeName(pathStr)
fatherDir = self.__ToDir(father)
if fatherDir == None: if fatherDir == None:
return None return None
return self.__FindChildInDirByName(father, sonName) return self.FindChildInDirByName(father, sonName)
async def __PathToFatherNodeAndNodeName(self, pathStr : str) -> tuple[VirtFsNode, str]: async def PathToFatherNodeAndNodeName(self, pathStr : str) -> tuple[VirtFsNode, str]:
pathWalker = PathWalker(pathStr) pathWalker = PathWalker(pathStr)
father : VirtFsNode = None father : VirtFsNode = None
sonName : str = None sonName : str = None
@ -180,40 +182,30 @@ class VirtFs:
father = None father = None
break break
if spot == "..": if spot == "..":
current = self.root if current.fatherId == None else self.nodes[current.fatherId] current = self.GetFatherNode(current)
continue continue
father = current father = current
currentDir = self.ToDir(current)
currentDir = self.__ToDir(current)
if currentDir == None: if currentDir == None:
current = None current = None
continue continue
if currentDir.lastUpdate == None: if currentDir.lastUpdate == None:
await self.__RefreshDirectory(currentDir) await self.RefreshDirectory(currentDir)
if spot == ".": if spot == ".":
continue continue
sonName = spot sonName = spot
current = self.__FindChildInDirByName(currentDir, spot) current = self.FindChildInDirByName(currentDir, spot)
if current != None: if current != None:
currentDir = self.__ToDir(current) currentDir = self.ToDir(current)
if currentDir != None: if currentDir != None and currentDir.lastUpdate == None:
await self.__RefreshDirectory(currentDir) await self.RefreshDirectory(currentDir)
father = self.root if current.fatherId == None else self.nodes[current.fatherId] father = self.GetFatherNode(current)
sonName = current.name sonName = current.name
return father, sonName return father, sonName
async def __MakeDir(self, node : DirNode, name : str) -> DirNode: async def NodeToPath(self, node : VirtFsNode) -> str:
await self.client.create_folder(name, node.id)
await self.__RefreshDirectory(node)
return self.__ToDir(self.__FindChildInDirByName(node, name))
async def __NodeToPath(self, node : VirtFsNode) -> str:
spots : list[str] = [""] spots : list[str] = [""]
current = node current = node
while current.id != None: while current.id != None:
@ -224,110 +216,47 @@ class VirtFs:
spots.append("") spots.append("")
return "/".join(reversed(spots)) return "/".join(reversed(spots))
async def login(self, username : str = None, password : str = None) -> str: async def MakeDir(self, node : DirNode, name : str) -> DirNode:
await self.client.create_folder(name, node.id)
await self.RefreshDirectory(node)
return self.ToDir(self.FindChildInDirByName(node, name))
async def Login(self, username : str = None, password : str = None) -> None:
if self.client != None and username == None and password == None: if self.client != None and username == None and password == None:
username = self.client.username username = self.client.username
password = self.client.password password = self.client.password
if self.client != None and self.client.username == username and self.client.password == password: if self.client != None and self.client.username == username and self.client.password == password:
logging.info("already login, try refresh token") logging.info("Already login, try refresh token")
try: try:
await self.client.refresh_access_token() await self.client.refresh_access_token()
self.__DumpLoginInfo() self.__DumpLoginInfo()
return "success" return
except Exception: except Exception:
logging.info("Refresh access token failed! Try relogin") logging.info("Refresh access token failed! Try relogin")
self.__InitClientByUsernamePassword(username, password) self.__InitClientByUsernamePassword(username, password)
await self.client.login() await self.client.login()
self.__DumpLoginInfo() self.__DumpLoginInfo()
return "success"
async def ls(self, pathStr : str = "") -> str: async def UpdateDownloadUrl(self, file : FileNode) -> None:
dirNode = self.__ToDir(await self.__PathToNode(pathStr)) result = await self.client.get_download_url(file.id)
if dirNode == None: file.url = result["web_content_link"]
return f"path not found or is file: {pathStr}"
result = []
for childId in dirNode.childrenId:
node = self.nodes[childId]
result.append(node.name)
return "\n".join(result)
async def cd(self, pathStr : str = "") -> str: async def Download(self, url : str, dirNode : DirNode = None) -> None :
dirNode = self.__ToDir(await self.__PathToNode(pathStr)) # 默认创建在当前目录下
if dirNode == None:
return f"path not found or is file: {pathStr}"
self.currentLocation = dirNode
return "success"
async def cwd(self) -> str:
path = await self.__NodeToPath(self.currentLocation)
return path if path != None else "cwd failed"
async def geturl(self, pathStr : str) -> str:
fileNode = self.__ToFile(await self.__PathToNode(pathStr))
if fileNode == None:
return f"path not found or is not file: {pathStr}"
result = await self.client.get_download_url(fileNode.id)
return result["web_content_link"]
async def mkdir(self, pathStr : str) -> str:
father, target = await self.__PathToFatherNodeAndNodeName(pathStr)
fatherDir = self.__ToDir(father)
if fatherDir == None:
return "Failed to locate"
if self.__FindChildInDirByName(fatherDir, target) != None:
return f"Path {pathStr} already existed"
await self.__MakeDir(fatherDir, target)
return "success"
async def download(self, url : str, pathStr : str = "") -> str :
# todo: 完善离线下载task相关 # todo: 完善离线下载task相关
dirNode = self.__ToDir(await self.__PathToNode(pathStr))
if dirNode == None: if dirNode == None:
return f"path not found or is file: {pathStr}" dirNode = self.currentLocation
await self.client.offline_download(url, dirNode.id)
subFolderName = self.__CalcMd5(url) async def Delete(self, node : VirtFsNode) -> None:
newDirNode = await self.__MakeDir(dirNode, subFolderName) father = self.GetFatherNode(node)
if newDirNode == None: fatherDir = self.ToDir(father)
return f"falied to create sub folder {subFolderName}"
await self.client.offline_download(url, newDirNode.id)
return subFolderName
async def update(self, pathStr : str = ""):
dirNode = self.__ToDir(await self.__PathToNode(pathStr))
if dirNode == None:
return f"path not found or is file: {pathStr}"
await self.__RefreshDirectory(dirNode)
return "success"
async def delete(self, pathStr : str):
father, name = await self.__PathToFatherNodeAndNodeName(pathStr)
fatherDir = self.__ToDir(father)
if fatherDir == None: if fatherDir == None:
return "Failed to locate" raise Exception('Failed to locate')
node = self.__FindChildInDirByName(fatherDir, name)
if node == None:
return f"path {pathStr} not existed"
if self.currentLocation is node or self.__IsAncestorsOf(node, self.currentLocation): if self.currentLocation is node or self.__IsAncestorsOf(node, self.currentLocation):
return f"delete self or ancestor is not allowed" raise Exception('Delete self or ancestor is not allowed')
await self.client.delete_to_trash([node.id]) await self.client.delete_to_trash([node.id])
await self.__RefreshDirectory(fatherDir) await self.RefreshDirectory(fatherDir)
return "success"
async def HandlerCommand(self, command):
result = re.findall(r'"(.*?)"|(\S+)', command)
filtered_result = [item for sublist in result for item in sublist if item]
cmd = filtered_result[0]
args = filtered_result[1:]
method = getattr(self, cmd)
if method == None:
return f"Unknown command: {cmd}"
output = await method(*args)
logging.info(f"{command} : {repr(output)}")
return output

View File

@ -1,3 +1,3 @@
在Pikpak Api基础上套了一层文件系统更好自动化离线下载 在Pikpak Api基础上套了一层文件系统更好自动化离线下载
python main.py 运行 运行: python main.py

View File

@ -1,25 +0,0 @@
anyio==4.6.2.post1
certifi==2024.8.30
charset-normalizer==3.4.0
DataRecorder==3.6.2
DownloadKit==2.0.5
et-xmlfile==1.1.0
h11==0.14.0
httpcore==1.0.6
httpx==0.27.2
idna==3.10
linkify-it-py==2.0.3
markdown-it-py==3.0.0
mdit-py-plugins==0.4.2
mdurl==0.1.2
openpyxl==3.1.5
PikPakAPI==0.1.10
platformdirs==4.3.6
Pygments==2.18.0
requests==2.32.3
rich==13.9.2
sniffio==1.3.1
textual==0.83.0
typing_extensions==4.12.2
uc-micro-py==1.0.3
urllib3==2.2.3