diff --git a/build.py b/build.py new file mode 100644 index 0000000..2b92822 --- /dev/null +++ b/build.py @@ -0,0 +1,308 @@ +#!/usr/bin/env python3 +""" +Linux 存储管理器 - PyInstaller 打包脚本 +自动安装依赖并打包应用程序 +""" + +import subprocess +import sys +import os +import shutil +from pathlib import Path + + +def run_command(cmd, description, check=True): + """运行命令并输出结果""" + print(f"\n{'='*60}") + print(f" {description}") + print(f" 命令: {' '.join(cmd)}") + print(f"{'='*60}") + + try: + result = subprocess.run(cmd, check=check, capture_output=False, text=True) + print(f"✓ {description} 完成") + return result.returncode == 0 + except subprocess.CalledProcessError as e: + print(f"✗ {description} 失败") + print(f" 错误: {e}") + return False + + +def check_python(): + """检查 Python 版本""" + print(f"\n{'='*60}") + print(" 检查 Python 版本") + print(f"{'='*60}") + + version = sys.version_info + print(f" Python 版本: {version.major}.{version.minor}.{version.micro}") + + if version.major < 3 or (version.major == 3 and version.minor < 6): + print("✗ Python 版本过低,需要 Python 3.6+") + return False + + print("✓ Python 版本符合要求") + return True + + +def is_externally_managed(): + """检测是否为 PEP 668 外部管理环境""" + # 检查是否在虚拟环境中 + if hasattr(sys, 'real_prefix') or (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix): + return False + if os.environ.get('VIRTUAL_ENV'): + return False + + # 检查 EXTERNALLY-MANAGED 文件 + try: + import sysconfig + stdlib_path = sysconfig.get_path('stdlib') + if os.path.exists(os.path.join(stdlib_path, 'EXTERNALLY-MANAGED')): + return True + except: + pass + + return False + + +def install_dependencies(): + """安装依赖包""" + print(f"\n{'='*60}") + print(" 安装依赖包") + print(f"{'='*60}") + + dependencies = [ + "pyinstaller", + "pexpect", + ] + + # 尝试使用 pip3 或 pip + pip_cmd = None + for pip in ["pip3", "pip"]: + if shutil.which(pip): + pip_cmd = pip + break + + if not pip_cmd: + print("✗ 未找到 pip,请手动安装 pip") + return False + + # 检测外部管理环境 + is_managed = is_externally_managed() + if is_managed: + print(" 检测到外部管理环境 (PEP 668),使用 --break-system-packages 选项") + + print(f" 使用 {pip_cmd} 安装依赖...") + + # 构建安装参数 + extra_args = ["--break-system-packages"] if is_managed else [] + + # 升级 pip 本身 + run_command([pip_cmd, "install", "--upgrade", "pip"] + extra_args, "升级 pip", check=False) + + # 安装依赖 + all_success = True + for dep in dependencies: + if not run_command([pip_cmd, "install", dep] + extra_args, f"安装 {dep}"): + print(f" 警告: {dep} 安装失败,尝试使用 sudo...") + if not run_command(["sudo", pip_cmd, "install", dep] + extra_args, f"使用 sudo 安装 {dep}", check=False): + print(f" ✗ {dep} 安装失败") + all_success = False + + if not all_success: + print(f"\n{'='*60}") + print(" 依赖安装失败") + print(f"{'='*60}") + print(" 您可以选择以下方式安装:") + print("") + print(" 方式 1 - 创建虚拟环境 (推荐):") + print(" python3 -m venv venv") + print(" source venv/bin/activate") + print(" pip install pyinstaller pexpect") + print(" python3 build.py") + print("") + print(" 方式 2 - 使用系统包管理器 (Arch):") + print(" sudo pacman -S python-pyinstaller python-pexpect") + print(" python3 build.py") + print("") + print(" 方式 3 - 强制安装 (有风险):") + print(" pip3 install pyinstaller pexpect --break-system-packages") + print(" python3 build.py") + print("") + return False + + return True + + +def clean_build(): + """清理之前的构建文件""" + print(f"\n{'='*60}") + print(" 清理构建文件") + print(f"{'='*60}") + + dirs_to_remove = ["build", "dist", "__pycache__"] + files_to_remove = ["*.spec"] + + removed = [] + for d in dirs_to_remove: + if os.path.exists(d): + try: + shutil.rmtree(d) + removed.append(d) + except Exception as e: + print(f" 警告: 无法删除 {d}: {e}") + + # 保留用户自定义的 spec 文件,只删除自动生成的 + for f in os.listdir("."): + if f.startswith("mainwindow") and f.endswith(".spec") and f != "disk-manager.spec": + try: + os.remove(f) + removed.append(f) + except Exception as e: + print(f" 警告: 无法删除 {f}: {e}") + + if removed: + print(f" 已清理: {', '.join(removed)}") + else: + print(" 没有需要清理的文件") + + return True + + +def build_app(): + """使用 PyInstaller 打包应用程序""" + print(f"\n{'='*60}") + print(" 开始打包应用程序") + print(f"{'='*60}") + + # 确定要打包的主文件 + # 默认使用 mainwindow_tkinter.py (Tkinter 版本) + main_script = "mainwindow_tkinter.py" + + if not os.path.exists(main_script): + print(f"✗ 未找到 {main_script}") + main_script = "mainwindow.py" + if not os.path.exists(main_script): + print(f"✗ 也未找到 {main_script}") + return False + print(f" 使用替代文件: {main_script}") + + # 构建命令 + output_name = "linux-storage-manager" + + cmd = [ + sys.executable, "-m", "PyInstaller", + "--onefile", # 单文件模式 + "--name", output_name, # 输出文件名 + "--clean", # 清理临时文件 + "--noconfirm", # 不询问确认 + "--console", # 显示控制台 (用于调试) + main_script + ] + + if not run_command(cmd, "PyInstaller 打包"): + return False + + # 检查输出 + output_path = Path("dist") / output_name + if output_path.exists(): + size = output_path.stat().st_size / (1024 * 1024) # MB + print(f"\n{'='*60}") + print(" 打包成功!") + print(f"{'='*60}") + print(f" 输出文件: {output_path}") + print(f" 文件大小: {size:.2f} MB") + print(f"\n 使用方法:") + print(f" sudo ./{output_path}") + return True + else: + print(f"✗ 未找到输出文件: {output_path}") + return False + + +def create_desktop_entry(): + """创建桌面快捷方式文件""" + print(f"\n{'='*60}") + print(" 创建桌面快捷方式文件") + print(f"{'='*60}") + + desktop_content = """[Desktop Entry] +Name=Linux 存储管理器 +Comment=Linux 存储管理工具 +Exec=/usr/local/bin/linux-storage-manager +Icon=drive-harddisk +Terminal=false +Type=Application +Categories=System;Utility; +Keywords=storage;disk;partition;raid;lvm; +""" + + desktop_file = "linux-storage-manager.desktop" + try: + with open(desktop_file, "w") as f: + f.write(desktop_content) + print(f" 已创建: {desktop_file}") + print(f" 安装方法: sudo cp {desktop_file} /usr/share/applications/") + return True + except Exception as e: + print(f" 警告: 无法创建桌面文件: {e}") + return False + + +def main(): + """主函数""" + print(""" +╔══════════════════════════════════════════════════════════════╗ +║ Linux 存储管理器 - PyInstaller 打包工具 ║ +╚══════════════════════════════════════════════════════════════╝ +""") + + # 检查是否在正确的目录 + if not os.path.exists("mainwindow_tkinter.py") and not os.path.exists("mainwindow.py"): + print("✗ 错误: 未找到主程序文件") + print(" 请确保在包含 mainwindow_tkinter.py 或 mainwindow.py 的目录中运行此脚本") + sys.exit(1) + + # 执行步骤 + steps = [ + ("检查 Python 版本", check_python), + ("安装依赖包", install_dependencies), + ("清理构建文件", clean_build), + ("打包应用程序", build_app), + ("创建桌面文件", create_desktop_entry), + ] + + results = [] + for name, func in steps: + try: + result = func() + results.append((name, result)) + except Exception as e: + print(f"\n✗ {name} 出错: {e}") + results.append((name, False)) + + # 打印总结 + print(f"\n{'='*60}") + print(" 打包总结") + print(f"{'='*60}") + for name, result in results: + status = "✓ 成功" if result else "✗ 失败" + print(f" {status} - {name}") + + # 检查最终结果 + if all(r[1] for r in results): + print(f"\n{'='*60}") + print(" 所有步骤完成!") + print(f"{'='*60}") + print(" 可执行文件位置: dist/linux-storage-manager") + print(" 运行命令: sudo dist/linux-storage-manager") + sys.exit(0) + else: + print(f"\n{'='*60}") + print(" 打包过程中有步骤失败,请检查错误信息") + print(f"{'='*60}") + sys.exit(1) + + +if __name__ == "__main__": + main() diff --git a/build.sh b/build.sh new file mode 100644 index 0000000..ef791df --- /dev/null +++ b/build.sh @@ -0,0 +1,164 @@ +#!/bin/bash +# +# Linux 存储管理器 - PyInstaller 打包脚本 +# 自动安装依赖并打包应用程序 +# + +set -e + +echo "" +echo "╔══════════════════════════════════════════════════════════════╗" +echo "║ Linux 存储管理器 - PyInstaller 打包工具 ║" +echo "╚══════════════════════════════════════════════════════════════╝" +echo "" + +# 检查 Python3 +if ! command -v python3 &> /dev/null; then + echo "✗ 错误: 未找到 python3" + echo " 请先安装 Python 3.6+" + exit 1 +fi + +PYTHON_VERSION=$(python3 -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")') +echo " Python 版本: $PYTHON_VERSION" + +# 检查主程序文件 +if [ ! -f "mainwindow_tkinter.py" ] && [ ! -f "mainwindow.py" ]; then + echo "✗ 错误: 未找到主程序文件" + echo " 请确保在包含 mainwindow_tkinter.py 或 mainwindow.py 的目录中运行此脚本" + exit 1 +fi + +echo "" +echo "═══════════════════════════════════════════════════════════════" +echo " 步骤 1/4: 安装依赖包" +echo "═══════════════════════════════════════════════════════════════" + +# 检测是否为外部管理环境 (PEP 668) +IS_EXTERNALLY_MANAGED=0 +if python3 -c "import sys; sys.exit(0 if (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix) or 'VIRTUAL_ENV' in __import__('os').environ else 1)" 2>/dev/null; then + echo " 检测到在虚拟环境中" + IS_EXTERNALLY_MANAGED=0 +elif python3 -c "import sysconfig; import os; print(os.path.exists(os.path.join(sysconfig.get_path('stdlib'), 'EXTERNALLY-MANAGED')))" | grep -q "True"; then + echo " 检测到外部管理环境 (PEP 668)" + IS_EXTERNALLY_MANAGED=1 +fi + +# 安装依赖 +install_deps() { + local EXTRA_ARGS="" + if [ "$IS_EXTERNALLY_MANAGED" -eq 1 ]; then + EXTRA_ARGS="--break-system-packages" + echo " 使用 --break-system-packages 选项" + fi + + # 安装/升级 pip + python3 -m pip install --upgrade pip $EXTRA_ARGS 2>/dev/null || { + echo " pip 升级失败,继续安装..." + } + + # 安装 pyinstaller 和 pexpect + pip3 install pyinstaller pexpect $EXTRA_ARGS || { + return 1 + } + return 0 +} + +if ! install_deps; then + echo " 尝试使用 sudo 安装依赖..." + sudo pip3 install pyinstaller pexpect --break-system-packages 2>/dev/null || { + echo "" + echo "✗ 依赖安装失败" + echo "" + echo " 您可以选择以下方式安装:" + echo "" + echo " 方式 1 - 创建虚拟环境 (推荐):" + echo " python3 -m venv venv" + echo " source venv/bin/activate" + echo " pip install pyinstaller pexpect" + echo " ./build.sh" + echo "" + echo " 方式 2 - 使用系统包管理器 (Arch):" + echo " sudo pacman -S python-pyinstaller python-pexpect" + echo " ./build.sh" + echo "" + echo " 方式 3 - 强制安装 (有风险):" + echo " pip3 install pyinstaller pexpect --break-system-packages" + echo " ./build.sh" + echo "" + exit 1 + } +fi + +echo "" +echo "═══════════════════════════════════════════════════════════════" +echo " 步骤 2/4: 清理构建文件" +echo "═══════════════════════════════════════════════════════════════" + +# 清理旧文件 +rm -rf build dist __pycache__ +rm -f mainwindow*.spec + +echo " 已清理构建文件" + +echo "" +echo "═══════════════════════════════════════════════════════════════" +echo " 步骤 3/4: PyInstaller 打包" +echo "═══════════════════════════════════════════════════════════════" + +# 确定主文件 +MAIN_SCRIPT="mainwindow_tkinter.py" +if [ ! -f "$MAIN_SCRIPT" ]; then + MAIN_SCRIPT="mainwindow.py" +fi + +echo " 打包文件: $MAIN_SCRIPT" + +# 执行打包 +python3 -m PyInstaller \ + --onefile \ + --name "linux-storage-manager" \ + --clean \ + --noconfirm \ + --console \ + "$MAIN_SCRIPT" + +echo "" +echo "═══════════════════════════════════════════════════════════════" +echo " 步骤 4/4: 创建桌面快捷方式" +echo "═══════════════════════════════════════════════════════════════" + +cat > linux-storage-manager.desktop << 'EOF' +[Desktop Entry] +Name=Linux 存储管理器 +Comment=Linux 存储管理工具 +Exec=/usr/local/bin/linux-storage-manager +Icon=drive-harddisk +Terminal=false +Type=Application +Categories=System;Utility; +Keywords=storage;disk;partition;raid;lvm; +EOF + +echo " 已创建桌面文件: linux-storage-manager.desktop" +echo " 安装命令: sudo cp linux-storage-manager.desktop /usr/share/applications/" + +# 检查输出 +if [ -f "dist/linux-storage-manager" ]; then + SIZE=$(du -h dist/linux-storage-manager | cut -f1) + echo "" + echo "╔══════════════════════════════════════════════════════════════╗" + echo "║ 打包成功! ║" + echo "╚══════════════════════════════════════════════════════════════╝" + echo "" + echo " 输出文件: dist/linux-storage-manager" + echo " 文件大小: $SIZE" + echo "" + echo " 使用方法:" + echo " 直接运行: sudo dist/linux-storage-manager" + echo " 安装到系统: sudo cp dist/linux-storage-manager /usr/local/bin/" + echo "" +else + echo "✗ 打包失败: 未找到输出文件" + exit 1 +fi diff --git a/dialogs_enhanced.py b/dialogs_enhanced.py new file mode 100644 index 0000000..e3e9211 --- /dev/null +++ b/dialogs_enhanced.py @@ -0,0 +1,521 @@ +# dialogs_enhanced.py +"""增强型对话框 - 智能提示与确认""" + +import tkinter as tk +from tkinter import ttk, messagebox +import logging + +logger = logging.getLogger(__name__) + + +class EnhancedFormatDialog: + """增强格式化对话框 - 显示详细警告信息""" + + def __init__(self, parent, device_path, device_info): + self.parent = parent + self.device_path = device_path + self.device_info = device_info + self.result = None + + self.dialog = tk.Toplevel(parent) + self.dialog.title(f"格式化确认 - {device_path}") + self.dialog.geometry("500x575") + self.dialog.minsize(480, 450) + self.dialog.transient(parent) + self.dialog.grab_set() + + self._center_window() + self._create_widgets() + + def _center_window(self): + self.dialog.update_idletasks() + width = self.dialog.winfo_width() + height = self.dialog.winfo_height() + x = (self.dialog.winfo_screenwidth() // 2) - (width // 2) + y = (self.dialog.winfo_screenheight() // 2) - (height // 2) + self.dialog.geometry(f"{width}x{height}+{x}+{y}") + + def _create_widgets(self): + main_frame = ttk.Frame(self.dialog, padding="15") + main_frame.pack(fill=tk.BOTH, expand=True) + + # 警告标题 + warning_frame = ttk.Frame(main_frame) + warning_frame.pack(fill=tk.X, pady=(0, 15)) + + warning_label = ttk.Label( + warning_frame, + text="⚠ 警告:此操作将永久删除所有数据!", + font=("Arial", 12, "bold"), + foreground="red" + ) + warning_label.pack() + + # 设备信息区域 + info_frame = ttk.LabelFrame(main_frame, text="设备信息", padding="10") + info_frame.pack(fill=tk.X, pady=(0, 15)) + + # 显示详细信息 + infos = [ + ("设备路径", self.device_path), + ("设备名称", self.device_info.get('name', 'N/A')), + ("文件系统", self.device_info.get('fstype', 'N/A')), + ("大小", self.device_info.get('size', 'N/A')), + ("UUID", self.device_info.get('uuid', 'N/A')), + ("挂载点", self.device_info.get('mountpoint', '未挂载')), + ] + + for i, (label, value) in enumerate(infos): + ttk.Label(info_frame, text=f"{label}:", font=("Arial", 10, "bold")).grid( + row=i, column=0, sticky=tk.W, padx=5, pady=3 + ) + val_label = ttk.Label(info_frame, text=str(value)) + val_label.grid(row=i, column=1, sticky=tk.W, padx=5, pady=3) + + # 如果有挂载点,高亮显示 + if label == "挂载点" and value and value != '未挂载': + val_label.configure(foreground="orange", font=("Arial", 9, "bold")) + + # 数据风险提示 + risk_frame = ttk.LabelFrame(main_frame, text="数据风险提示", padding="10") + risk_frame.pack(fill=tk.X, pady=(0, 15)) + + risks = [] + if self.device_info.get('fstype'): + risks.append(f"• 此分区包含 {self.device_info['fstype']} 文件系统") + if self.device_info.get('uuid'): + risks.append(f"• 分区有 UUID: {self.device_info['uuid'][:20]}...") + if self.device_info.get('mountpoint'): + risks.append(f"• 分区当前挂载在: {self.device_info['mountpoint']}") + + if not risks: + risks.append("• 无法获取分区详细信息") + + for risk in risks: + ttk.Label(risk_frame, text=risk, foreground="red").pack( + anchor=tk.W, pady=2 + ) + + # 文件系统选择 + fs_frame = ttk.LabelFrame(main_frame, text="选择新文件系统", padding="10") + fs_frame.pack(fill=tk.X, pady=(0, 15)) + + self.fs_var = tk.StringVar(value="ext4") + fs_options = ["ext4", "xfs", "fat32", "ntfs"] + + for fs in fs_options: + ttk.Radiobutton(fs_frame, text=fs, variable=self.fs_var, + value=fs).pack(side=tk.LEFT, padx=10) + + # 确认输入 + confirm_frame = ttk.Frame(main_frame) + confirm_frame.pack(fill=tk.X, pady=(0, 15)) + + ttk.Label(confirm_frame, + text=f"请输入设备名 '{self.device_info.get('name', '确认')}' 以继续:", + foreground="red").pack(anchor=tk.W, pady=5) + + self.confirm_var = tk.StringVar() + self.confirm_entry = ttk.Entry(confirm_frame, textvariable=self.confirm_var, width=20) + self.confirm_entry.pack(anchor=tk.W, pady=5) + self.confirm_entry.focus() + + # 按钮 + btn_frame = ttk.Frame(main_frame) + btn_frame.pack(fill=tk.X, pady=10) + + ttk.Button(btn_frame, text="取消", + command=self._on_cancel).pack(side=tk.RIGHT, padx=5) + ttk.Button(btn_frame, text="确认格式化", + command=self._on_confirm).pack(side=tk.RIGHT, padx=5) + + def _on_confirm(self): + expected = self.device_info.get('name', '确认') + if self.confirm_var.get() != expected: + messagebox.showerror("错误", + f"输入错误!请输入 '{expected}' 以确认格式化操作。") + return + + self.result = self.fs_var.get() + self.dialog.destroy() + + def _on_cancel(self): + self.dialog.destroy() + + def wait_for_result(self): + self.dialog.wait_window() + return self.result + + +class EnhancedRaidDeleteDialog: + """增强 RAID 删除对话框 - 显示阵列详情""" + + def __init__(self, parent, array_path, array_data, member_devices): + self.parent = parent + self.array_path = array_path + self.array_data = array_data + self.member_devices = member_devices + self.result = False + + self.dialog = tk.Toplevel(parent) + self.dialog.title(f"删除 RAID 阵列确认 - {array_path}") + self.dialog.geometry("550x750") + self.dialog.minsize(520, 750) + self.dialog.transient(parent) + self.dialog.grab_set() + + self._center_window() + self._create_widgets() + + def _center_window(self): + self.dialog.update_idletasks() + width = self.dialog.winfo_width() + height = self.dialog.winfo_height() + x = (self.dialog.winfo_screenwidth() // 2) - (width // 2) + y = (self.dialog.winfo_screenheight() // 2) - (height // 2) + self.dialog.geometry(f"{width}x{height}+{x}+{y}") + + def _create_widgets(self): + main_frame = ttk.Frame(self.dialog, padding="15") + main_frame.pack(fill=tk.BOTH, expand=True) + + # 危险警告 + warning_frame = ttk.Frame(main_frame) + warning_frame.pack(fill=tk.X, pady=(0, 15)) + + warning_label = ttk.Label( + warning_frame, + text="⚠ 危险操作:此操作将永久删除 RAID 阵列!", + font=("Arial", 13, "bold"), + foreground="red" + ) + warning_label.pack() + + # 阵列详情 + array_frame = ttk.LabelFrame(main_frame, text="阵列详情", padding="10") + array_frame.pack(fill=tk.X, pady=(0, 15)) + + details = [ + ("阵列设备", self.array_path), + ("RAID 级别", self.array_data.get('level', 'N/A')), + ("阵列状态", self.array_data.get('state', 'N/A')), + ("阵列大小", self.array_data.get('array_size', 'N/A')), + ("UUID", self.array_data.get('uuid', 'N/A')), + ("Chunk 大小", self.array_data.get('chunk_size', 'N/A')), + ] + + for i, (label, value) in enumerate(details): + ttk.Label(array_frame, text=f"{label}:", + font=("Arial", 10, "bold")).grid( + row=i, column=0, sticky=tk.W, padx=5, pady=3 + ) + ttk.Label(array_frame, text=str(value)).grid( + row=i, column=1, sticky=tk.W, padx=5, pady=3 + ) + + # 成员设备列表 + member_frame = ttk.LabelFrame(main_frame, text="成员设备(将被清除 RAID 超级块)", + padding="10") + member_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 15)) + + if self.member_devices: + for i, dev in enumerate(self.member_devices): + if dev: + ttk.Label(member_frame, text=f" • {dev}", + font=("Arial", 10), foreground="red").pack( + anchor=tk.W, pady=2 + ) + else: + ttk.Label(member_frame, text=" 无法获取成员设备列表", + foreground="orange").pack(anchor=tk.W, pady=5) + + # 后果警告 + consequence_frame = ttk.LabelFrame(main_frame, text="操作后果", padding="10") + consequence_frame.pack(fill=tk.X, pady=(0, 15)) + + consequences = [ + "• 阵列将被停止并从系统中移除", + "• 所有成员设备的 RAID 超级块将被清除", + "• 阵列上的所有数据将永久丢失", + "• /etc/mdadm.conf 中的配置将被删除", + "• 此操作无法撤销!" + ] + + for con in consequences: + ttk.Label(consequence_frame, text=con, + foreground="red").pack(anchor=tk.W, pady=2) + + # 二次确认 + confirm_frame = ttk.Frame(main_frame) + confirm_frame.pack(fill=tk.X, pady=(0, 10)) + + ttk.Label(confirm_frame, + text="请输入 'DELETE' 以确认删除操作:", + foreground="red", font=("Arial", 10, "bold")).pack( + anchor=tk.W, pady=5) + + self.confirm_var = tk.StringVar() + self.confirm_entry = ttk.Entry(confirm_frame, textvariable=self.confirm_var, width=15) + self.confirm_entry.pack(anchor=tk.W, pady=5) + self.confirm_entry.focus() + + # 按钮 + btn_frame = ttk.Frame(main_frame) + btn_frame.pack(fill=tk.X, pady=10) + + ttk.Button(btn_frame, text="取消", + command=self._on_cancel).pack(side=tk.RIGHT, padx=5) + ttk.Button(btn_frame, text="确认删除", + command=self._on_confirm, + style="Danger.TButton").pack(side=tk.RIGHT, padx=5) + + # 配置危险按钮样式 + style = ttk.Style() + style.configure("Danger.TButton", foreground="red") + + def _on_confirm(self): + if self.confirm_var.get() != "DELETE": + messagebox.showerror("错误", + "输入错误!请输入 'DELETE'(全大写)以确认删除操作。") + return + + self.result = True + self.dialog.destroy() + + def _on_cancel(self): + self.dialog.destroy() + + def wait_for_result(self): + self.dialog.wait_window() + return self.result + + +class EnhancedPartitionDialog: + """增强分区创建对话框 - 可视化滑块选择大小""" + + def __init__(self, parent, disk_path, total_disk_mib, max_available_mib): + self.parent = parent + self.disk_path = disk_path + self.total_disk_mib = total_disk_mib + self.max_available_mib = max_available_mib + self.result = None + + self.dialog = tk.Toplevel(parent) + self.dialog.title(f"创建分区 - {disk_path}") + self.dialog.geometry("550x575") + self.dialog.minsize(500, 575) + self.dialog.transient(parent) + self.dialog.grab_set() + + self._center_window() + self._create_widgets() + + def _center_window(self): + self.dialog.update_idletasks() + width = self.dialog.winfo_width() + height = self.dialog.winfo_height() + x = (self.dialog.winfo_screenwidth() // 2) - (width // 2) + y = (self.dialog.winfo_screenheight() // 2) - (height // 2) + self.dialog.geometry(f"{width}x{height}+{x}+{y}") + + def _create_widgets(self): + main_frame = ttk.Frame(self.dialog, padding="15") + main_frame.pack(fill=tk.BOTH, expand=True) + + # 磁盘信息 + info_frame = ttk.LabelFrame(main_frame, text="磁盘信息", padding="10") + info_frame.pack(fill=tk.X, pady=(0, 15)) + + total_gb = self.total_disk_mib / 1024 + max_gb = self.max_available_mib / 1024 + + ttk.Label(info_frame, text=f"磁盘: {self.disk_path}").pack(anchor=tk.W, pady=2) + ttk.Label(info_frame, text=f"总容量: {total_gb:.2f} GB").pack(anchor=tk.W, pady=2) + ttk.Label(info_frame, text=f"可用空间: {max_gb:.2f} GB", + foreground="green").pack(anchor=tk.W, pady=2) + + # 分区表类型 + pt_frame = ttk.LabelFrame(main_frame, text="分区表类型", padding="10") + pt_frame.pack(fill=tk.X, pady=(0, 15)) + + self.part_type_var = tk.StringVar(value="gpt") + ttk.Radiobutton(pt_frame, text="GPT (推荐,支持大容量磁盘)", + variable=self.part_type_var, value="gpt").pack(anchor=tk.W, pady=2) + ttk.Radiobutton(pt_frame, text="MBR/MSDOS (兼容旧系统)", + variable=self.part_type_var, value="msdos").pack(anchor=tk.W, pady=2) + + # 大小选择 + size_frame = ttk.LabelFrame(main_frame, text="分区大小", padding="10") + size_frame.pack(fill=tk.X, pady=(0, 15)) + + # 使用最大空间复选框 + self.use_max_var = tk.BooleanVar(value=True) + ttk.Checkbutton(size_frame, text="使用最大可用空间", + variable=self.use_max_var, + command=self._toggle_size_input).pack(anchor=tk.W, pady=5) + + # 滑块和输入框容器 + self.size_control_frame = ttk.Frame(size_frame) + self.size_control_frame.pack(fill=tk.X, pady=5) + + # 滑块 + self.size_var = tk.DoubleVar(value=max_gb) + self.size_slider = ttk.Scale( + self.size_control_frame, + from_=0.1, + to=max_gb, + orient=tk.HORIZONTAL, + variable=self.size_var, + command=self._on_slider_change + ) + self.size_slider.pack(fill=tk.X, pady=5) + + # 输入框和标签 + input_frame = ttk.Frame(self.size_control_frame) + input_frame.pack(fill=tk.X, pady=5) + + ttk.Label(input_frame, text="大小:").pack(side=tk.LEFT, padx=5) + + self.size_spin = tk.Spinbox( + input_frame, + from_=0.1, + to=max_gb, + increment=0.1, + textvariable=self.size_var, + width=10, + command=self._on_spin_change + ) + self.size_spin.pack(side=tk.LEFT, padx=5) + + ttk.Label(input_frame, text="GB").pack(side=tk.LEFT, padx=5) + + # 显示百分比 + self.percent_label = ttk.Label(input_frame, text="(100%)") + self.percent_label.pack(side=tk.LEFT, padx=10) + + # 可视化空间显示 + self._create_space_visualizer(main_frame, max_gb) + + # 初始状态 + self._toggle_size_input() + + # 按钮 + btn_frame = ttk.Frame(main_frame) + btn_frame.pack(fill=tk.X, pady=10) + + ttk.Button(btn_frame, text="取消", + command=self._on_cancel).pack(side=tk.RIGHT, padx=5) + ttk.Button(btn_frame, text="创建分区", + command=self._on_confirm).pack(side=tk.RIGHT, padx=5) + + def _create_space_visualizer(self, parent, max_gb): + """创建空间可视化条""" + viz_frame = ttk.LabelFrame(parent, text="空间使用预览", padding="10") + viz_frame.pack(fill=tk.X, pady=(0, 15)) + + # Canvas 用于绘制 + self.viz_canvas = tk.Canvas(viz_frame, height=40, bg="white") + self.viz_canvas.pack(fill=tk.X, pady=5) + + # 等待 Canvas 渲染完成后再更新 + self.viz_canvas.update_idletasks() + + # 绘制初始状态 + self._update_visualizer(max_gb, max_gb) + + def _update_visualizer(self, current_gb, max_gb): + """更新空间可视化""" + self.viz_canvas.delete("all") + + width = self.viz_canvas.winfo_width() or 400 + height = self.viz_canvas.winfo_height() or 40 + + # 计算比例 + if max_gb > 0: + ratio = current_gb / max_gb + else: + ratio = 0 + + fill_width = int(width * ratio) + + # 绘制背景 + self.viz_canvas.create_rectangle(0, 0, width, height, fill="lightgray", outline="") + + # 绘制已用空间 + if fill_width > 0: + color = "green" if ratio < 0.8 else "orange" if ratio < 0.95 else "red" + self.viz_canvas.create_rectangle(0, 0, fill_width, height, fill=color, outline="") + + # 绘制文字 + self.viz_canvas.create_text( + width // 2, height // 2, + text=f"{current_gb:.2f} GB / {max_gb:.2f} GB", + font=("Arial", 10, "bold") + ) + + def _on_slider_change(self, value): + """滑块变化回调""" + try: + gb = float(value) + self._update_percent(gb) + self._update_visualizer(gb, self.max_available_mib / 1024) + except: + pass + + def _on_spin_change(self): + """输入框变化回调""" + try: + gb = self.size_var.get() + self._update_percent(gb) + self._update_visualizer(gb, self.max_available_mib / 1024) + except: + pass + + def _update_percent(self, gb): + """更新百分比显示""" + max_gb = self.max_available_mib / 1024 + if max_gb > 0: + percent = (gb / max_gb) * 100 + self.percent_label.configure(text=f"({percent:.1f}%)") + + def _toggle_size_input(self): + """切换大小输入状态""" + if self.use_max_var.get(): + self.size_slider.configure(state=tk.DISABLED) + self.size_spin.configure(state=tk.DISABLED) + max_gb = self.max_available_mib / 1024 + self.size_var.set(max_gb) + self._update_percent(max_gb) + self._update_visualizer(max_gb, max_gb) + else: + self.size_slider.configure(state=tk.NORMAL) + self.size_spin.configure(state=tk.NORMAL) + + def _on_confirm(self): + size_gb = self.size_var.get() + use_max_space = self.use_max_var.get() + + if not use_max_space and size_gb <= 0: + messagebox.showwarning("输入错误", "分区大小必须大于0。") + return + + max_gb = self.max_available_mib / 1024 + if not use_max_space and size_gb > max_gb: + messagebox.showwarning("输入错误", "分区大小不能超过最大可用空间。") + return + + self.result = { + 'disk_path': self.disk_path, + 'partition_table_type': self.part_type_var.get(), + 'size_gb': size_gb, + 'total_disk_mib': self.total_disk_mib, + 'use_max_space': use_max_space + } + self.dialog.destroy() + + def _on_cancel(self): + self.dialog.destroy() + + def wait_for_result(self): + self.dialog.wait_window() + return self.result diff --git a/dialogs_history.py b/dialogs_history.py new file mode 100644 index 0000000..36c4e75 --- /dev/null +++ b/dialogs_history.py @@ -0,0 +1,531 @@ +# dialogs_history.py +"""操作历史对话框""" + +import tkinter as tk +from tkinter import ttk, messagebox, filedialog +from datetime import datetime +from operation_history import OperationHistory, PartitionTableBackup, OperationStatus + + +class OperationHistoryDialog: + """操作历史对话框""" + + def __init__(self, parent): + self.parent = parent + self.history = OperationHistory() + + self.dialog = tk.Toplevel(parent) + self.dialog.title("操作历史") + self.dialog.geometry("900x600") + self.dialog.transient(parent) + + self._center_window() + self._create_widgets() + self._load_history() + + def _center_window(self): + self.dialog.update_idletasks() + width = self.dialog.winfo_width() + height = self.dialog.winfo_height() + x = (self.dialog.winfo_screenwidth() // 2) - (width // 2) + y = (self.dialog.winfo_screenheight() // 2) - (height // 2) + self.dialog.geometry(f"{width}x{height}+{x}+{y}") + + def _create_widgets(self): + main_frame = ttk.Frame(self.dialog, padding="10") + main_frame.pack(fill=tk.BOTH, expand=True) + + # 标题 + title_label = ttk.Label( + main_frame, + text="操作历史记录", + font=("Arial", 14, "bold") + ) + title_label.pack(pady=(0, 10)) + + # 工具栏 + toolbar = ttk.Frame(main_frame) + toolbar.pack(fill=tk.X, pady=(0, 10)) + + ttk.Button(toolbar, text="刷新", command=self._load_history).pack(side=tk.LEFT, padx=5) + ttk.Button(toolbar, text="导出", command=self._export_history).pack(side=tk.LEFT, padx=5) + ttk.Button(toolbar, text="清理旧记录", command=self._clear_old).pack(side=tk.LEFT, padx=5) + ttk.Button(toolbar, text="清空所有", command=self._clear_all).pack(side=tk.LEFT, padx=5) + + # 历史记录列表 + list_frame = ttk.Frame(main_frame) + list_frame.pack(fill=tk.BOTH, expand=True) + + columns = ("时间", "操作类型", "设备", "状态", "详情", "耗时") + self.tree = ttk.Treeview(list_frame, columns=columns, show="headings") + + for col in columns: + self.tree.heading(col, text=col) + + self.tree.column("时间", width=150) + self.tree.column("操作类型", width=120) + self.tree.column("设备", width=100) + self.tree.column("状态", width=80) + self.tree.column("详情", width=300) + self.tree.column("耗时", width=80) + + # 滚动条 + scrollbar_y = ttk.Scrollbar(list_frame, orient=tk.VERTICAL, command=self.tree.yview) + scrollbar_x = ttk.Scrollbar(list_frame, orient=tk.HORIZONTAL, command=self.tree.xview) + self.tree.configure(yscrollcommand=scrollbar_y.set, xscrollcommand=scrollbar_x.set) + + self.tree.grid(row=0, column=0, sticky="nsew") + scrollbar_y.grid(row=0, column=1, sticky="ns") + scrollbar_x.grid(row=1, column=0, sticky="ew") + + list_frame.grid_rowconfigure(0, weight=1) + list_frame.grid_columnconfigure(0, weight=1) + + # 颜色标签 + self.tree.tag_configure("success", foreground="green") + self.tree.tag_configure("failed", foreground="red") + self.tree.tag_configure("cancelled", foreground="gray") + self.tree.tag_configure("pending", foreground="orange") + + # 详情区域 + detail_frame = ttk.LabelFrame(main_frame, text="操作详情", padding="10") + detail_frame.pack(fill=tk.X, pady=(10, 0)) + + self.detail_text = tk.Text(detail_frame, height=6, wrap=tk.WORD) + self.detail_text.pack(fill=tk.BOTH, expand=True) + + # 绑定选择事件 + self.tree.bind("<>", self._on_select) + + # 关闭按钮 + ttk.Button(main_frame, text="关闭", command=self.dialog.destroy).pack(pady=10) + + def _load_history(self): + """加载历史记录""" + # 清空现有数据 + for item in self.tree.get_children(): + self.tree.delete(item) + + records = self.history.get_recent_records(limit=100) + + for record in records: + # 格式化时间 + try: + dt = datetime.fromisoformat(record.timestamp) + time_str = dt.strftime("%Y-%m-%d %H:%M:%S") + except: + time_str = record.timestamp + + # 格式化耗时 + time_cost = "" + if record.execution_time: + if record.execution_time < 1: + time_cost = f"{record.execution_time*1000:.0f}ms" + else: + time_cost = f"{record.execution_time:.1f}s" + + # 格式化详情 + details = str(record.details)[:50] + "..." if len(str(record.details)) > 50 else str(record.details) + + values = ( + time_str, + record.operation_type, + record.device_path, + record.status, + details, + time_cost + ) + + # 根据状态设置标签 + tag = "" + status_lower = record.status.lower() + if "成功" in status_lower or "success" in status_lower: + tag = "success" + elif "失败" in status_lower or "failed" in status_lower: + tag = "failed" + elif "取消" in status_lower or "cancelled" in status_lower: + tag = "cancelled" + elif "待" in status_lower or "pending" in status_lower: + tag = "pending" + + self.tree.insert("", tk.END, values=values, tags=(tag,)) + + def _on_select(self, event): + """选择记录时显示详情""" + selection = self.tree.selection() + if not selection: + return + + item = selection[0] + index = self.tree.index(item) + records = self.history.get_recent_records(limit=100) + + if index < len(records): + record = records[index] + + detail_text = f"操作ID: {record.id}\n" + detail_text += f"时间: {record.timestamp}\n" + detail_text += f"类型: {record.operation_type}\n" + detail_text += f"设备: {record.device_path}\n" + detail_text += f"状态: {record.status}\n" + + if record.execution_time: + detail_text += f"耗时: {record.execution_time:.2f}秒\n" + + detail_text += f"\n详情:\n{record.details}\n" + + if record.error_message: + detail_text += f"\n错误信息:\n{record.error_message}\n" + + if record.rollback_data: + detail_text += f"\n可撤销数据:\n{record.rollback_data}\n" + + self.detail_text.delete(1.0, tk.END) + self.detail_text.insert(1.0, detail_text) + + def _export_history(self): + """导出历史记录""" + file_path = filedialog.asksaveasfilename( + defaultextension=".json", + filetypes=[("JSON files", "*.json"), ("All files", "*.*")], + title="导出操作历史" + ) + + if file_path: + if self.history.export_history(file_path): + messagebox.showinfo("成功", f"历史记录已导出到:\n{file_path}") + else: + messagebox.showerror("错误", "导出失败") + + def _clear_old(self): + """清理旧记录""" + result = messagebox.askyesno( + "确认清理", + "是否清理 30 天之前的操作记录?" + ) + if result: + self.history.clear_history(days=30) + self._load_history() + messagebox.showinfo("成功", "旧记录已清理") + + def _clear_all(self): + """清空所有记录""" + result = messagebox.askyesno( + "确认清空", + "是否清空所有操作记录?\n此操作不可恢复!", + icon=messagebox.WARNING + ) + if result: + self.history.clear_history() + self._load_history() + messagebox.showinfo("成功", "所有记录已清空") + + +class PartitionBackupDialog: + """分区表备份管理对话框""" + + def __init__(self, parent): + self.parent = parent + self.backup_manager = PartitionTableBackup() + + self.dialog = tk.Toplevel(parent) + self.dialog.title("分区表备份管理") + self.dialog.geometry("800x500") + self.dialog.transient(parent) + + self._center_window() + self._create_widgets() + self._load_backups() + + def _center_window(self): + self.dialog.update_idletasks() + width = self.dialog.winfo_width() + height = self.dialog.winfo_height() + x = (self.dialog.winfo_screenwidth() // 2) - (width // 2) + y = (self.dialog.winfo_screenheight() // 2) - (height // 2) + self.dialog.geometry(f"{width}x{height}+{x}+{y}") + + def _create_widgets(self): + main_frame = ttk.Frame(self.dialog, padding="10") + main_frame.pack(fill=tk.BOTH, expand=True) + + # 标题 + title_label = ttk.Label( + main_frame, + text="分区表备份管理", + font=("Arial", 14, "bold") + ) + title_label.pack(pady=(0, 10)) + + # 工具栏 + toolbar = ttk.Frame(main_frame) + toolbar.pack(fill=tk.X, pady=(0, 10)) + + ttk.Button(toolbar, text="刷新", command=self._load_backups).pack(side=tk.LEFT, padx=5) + ttk.Button(toolbar, text="删除选中", command=self._delete_selected).pack(side=tk.LEFT, padx=5) + + # 备份列表 + list_frame = ttk.Frame(main_frame) + list_frame.pack(fill=tk.BOTH, expand=True) + + columns = ("设备", "备份时间", "文件大小", "文件路径") + self.tree = ttk.Treeview(list_frame, columns=columns, show="headings") + + for col in columns: + self.tree.heading(col, text=col) + + self.tree.column("设备", width=100) + self.tree.column("备份时间", width=150) + self.tree.column("文件大小", width=100) + self.tree.column("文件路径", width=400) + + scrollbar = ttk.Scrollbar(list_frame, orient=tk.VERTICAL, command=self.tree.yview) + self.tree.configure(yscrollcommand=scrollbar.set) + + self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + scrollbar.pack(side=tk.RIGHT, fill=tk.Y) + + # 详情和操作按钮 + detail_frame = ttk.LabelFrame(main_frame, text="操作", padding="10") + detail_frame.pack(fill=tk.X, pady=(10, 0)) + + ttk.Label(detail_frame, text="选中备份文件后可执行以下操作:").pack(anchor=tk.W) + + btn_frame = ttk.Frame(detail_frame) + btn_frame.pack(fill=tk.X, pady=5) + + ttk.Button(btn_frame, text="查看内容", command=self._view_backup).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_frame, text="恢复此备份", command=self._restore_backup).pack(side=tk.LEFT, padx=5) + + # 关闭按钮 + ttk.Button(main_frame, text="关闭", command=self.dialog.destroy).pack(pady=10) + + def _load_backups(self): + """加载备份列表""" + for item in self.tree.get_children(): + self.tree.delete(item) + + backups = self.backup_manager.list_backups() + + for backup in backups: + size_str = self._format_size(backup['size']) + values = ( + backup['device'], + backup['created'], + size_str, + backup['file'] + ) + self.tree.insert("", tk.END, values=values) + + def _format_size(self, size_bytes): + """格式化文件大小""" + if size_bytes < 1024: + return f"{size_bytes} B" + elif size_bytes < 1024 * 1024: + return f"{size_bytes / 1024:.1f} KB" + else: + return f"{size_bytes / (1024 * 1024):.1f} MB" + + def _get_selected_backup(self): + """获取选中的备份""" + selection = self.tree.selection() + if not selection: + messagebox.showwarning("警告", "请先选择一个备份") + return None + + item = selection[0] + file_path = self.tree.item(item, "values")[3] + return file_path + + def _view_backup(self): + """查看备份内容""" + file_path = self._get_selected_backup() + if not file_path: + return + + try: + with open(file_path, 'r') as f: + content = f.read() + + # 显示内容对话框 + content_dialog = tk.Toplevel(self.dialog) + content_dialog.title(f"备份内容 - {file_path}") + content_dialog.geometry("600x400") + + text = tk.Text(content_dialog, wrap=tk.WORD, padx=10, pady=10) + text.pack(fill=tk.BOTH, expand=True) + text.insert(1.0, content) + text.config(state=tk.DISABLED) + + ttk.Button(content_dialog, text="关闭", command=content_dialog.destroy).pack(pady=10) + + except Exception as e: + messagebox.showerror("错误", f"无法读取备份文件:\n{e}") + + def _restore_backup(self): + """恢复备份""" + file_path = self._get_selected_backup() + if not file_path: + return + + # 解析设备名 + filename = os.path.basename(file_path) + device_name = filename.rsplit('_pt_', 1)[0] + device_path = f"/dev/{device_name}" + + result = messagebox.askyesno( + "确认恢复", + f"是否将备份恢复到 {device_path}?\n\n" + f"警告:这将覆盖当前的分区表!\n" + f"请确保您了解此操作的后果。", + icon=messagebox.WARNING + ) + + if result: + if self.backup_manager.restore_partition_table(device_path, file_path): + messagebox.showinfo("成功", f"分区表已恢复到 {device_path}") + else: + messagebox.showerror("错误", "恢复失败") + + def _delete_selected(self): + """删除选中的备份""" + file_path = self._get_selected_backup() + if not file_path: + return + + result = messagebox.askyesno( + "确认删除", + f"是否删除备份文件?\n{file_path}" + ) + + if result: + if self.backup_manager.delete_backup(file_path): + self._load_backups() + messagebox.showinfo("成功", "备份已删除") + else: + messagebox.showerror("错误", "删除失败") + + +class OperationQueueDialog: + """操作队列对话框""" + + def __init__(self, parent, operation_queue): + self.parent = parent + self.queue = operation_queue + + self.dialog = tk.Toplevel(parent) + self.dialog.title("操作队列") + self.dialog.geometry("600x400") + self.dialog.transient(parent) + + self._center_window() + self._create_widgets() + self._refresh_status() + + def _center_window(self): + self.dialog.update_idletasks() + width = self.dialog.winfo_width() + height = self.dialog.winfo_height() + x = (self.dialog.winfo_screenwidth() // 2) - (width // 2) + y = (self.dialog.winfo_screenheight() // 2) - (height // 2) + self.dialog.geometry(f"{width}x{height}+{x}+{y}") + + def _create_widgets(self): + main_frame = ttk.Frame(self.dialog, padding="10") + main_frame.pack(fill=tk.BOTH, expand=True) + + # 标题 + title_label = ttk.Label( + main_frame, + text="待执行操作队列", + font=("Arial", 14, "bold") + ) + title_label.pack(pady=(0, 10)) + + # 状态显示 + self.status_label = ttk.Label(main_frame, text="队列状态: 空闲") + self.status_label.pack(anchor=tk.W, pady=(0, 10)) + + # 操作列表 + list_frame = ttk.Frame(main_frame) + list_frame.pack(fill=tk.BOTH, expand=True) + + columns = ("类型", "设备", "状态") + self.tree = ttk.Treeview(list_frame, columns=columns, show="headings") + + for col in columns: + self.tree.heading(col, text=col) + self.tree.column(col, width=150) + + scrollbar = ttk.Scrollbar(list_frame, orient=tk.VERTICAL, command=self.tree.yview) + self.tree.configure(yscrollcommand=scrollbar.set) + + self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + scrollbar.pack(side=tk.RIGHT, fill=tk.Y) + + # 按钮 + btn_frame = ttk.Frame(main_frame) + btn_frame.pack(fill=tk.X, pady=10) + + ttk.Button(btn_frame, text="开始执行", command=self._start_execution).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_frame, text="取消当前", command=self._cancel_current).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_frame, text="清空队列", command=self._clear_queue).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_frame, text="刷新", command=self._refresh_status).pack(side=tk.LEFT, padx=5) + ttk.Button(btn_frame, text="关闭", command=self.dialog.destroy).pack(side=tk.RIGHT, padx=5) + + # 注册回调 + self.queue.register_callback('on_start', self._on_operation_start) + self.queue.register_callback('on_complete', self._on_operation_complete) + self.queue.register_callback('on_error', self._on_operation_error) + + def _refresh_status(self): + """刷新队列状态""" + status = self.queue.get_queue_status() + + if status['running']: + self.status_label.configure(text=f"队列状态: 执行中 ({status['pending_count']} 个待执行)") + else: + self.status_label.configure(text=f"队列状态: 空闲 ({status['pending_count']} 个待执行)") + + # 刷新列表 + for item in self.tree.get_children(): + self.tree.delete(item) + + pending = self.queue.get_pending_operations() + for op in pending: + values = ( + op['type'].value if hasattr(op['type'], 'value') else str(op['type']), + op['device_path'], + op['status'].value if hasattr(op['status'], 'value') else str(op['status']) + ) + self.tree.insert("", tk.END, values=values) + + def _start_execution(self): + """开始执行队列""" + self.queue.start_execution() + self._refresh_status() + + def _cancel_current(self): + """取消当前操作""" + self.queue.cancel_current() + self._refresh_status() + + def _clear_queue(self): + """清空队列""" + result = messagebox.askyesno("确认", "是否清空所有待执行操作?") + if result: + self.queue.clear_queue() + self._refresh_status() + + def _on_operation_start(self, operation): + """操作开始回调""" + self._refresh_status() + + def _on_operation_complete(self, operation, success): + """操作完成回调""" + self._refresh_status() + + def _on_operation_error(self, operation, error): + """操作错误回调""" + self._refresh_status() + messagebox.showerror("操作错误", f"操作失败:\n{error}") diff --git a/dialogs_smart.py b/dialogs_smart.py new file mode 100644 index 0000000..83e4d50 --- /dev/null +++ b/dialogs_smart.py @@ -0,0 +1,386 @@ +# dialogs_smart.py +"""SMART 监控相关对话框""" + +import tkinter as tk +from tkinter import ttk, messagebox +from smart_monitor import SmartMonitor, HealthStatus + + +class SmartInfoDialog: + """SMART 信息对话框""" + + def __init__(self, parent, device_path, smart_info): + self.parent = parent + self.device_path = device_path + self.smart_info = smart_info + + # 创建对话框 + self.dialog = tk.Toplevel(parent) + self.dialog.title(f"SMART 信息 - {device_path}") + self.dialog.geometry("700x600") + self.dialog.transient(parent) + self.dialog.grab_set() + + self._center_window() + self._create_widgets() + + def _center_window(self): + """居中显示""" + self.dialog.update_idletasks() + width = self.dialog.winfo_width() + height = self.dialog.winfo_height() + x = (self.dialog.winfo_screenwidth() // 2) - (width // 2) + y = (self.dialog.winfo_screenheight() // 2) - (height // 2) + self.dialog.geometry(f"{width}x{height}+{x}+{y}") + + def _create_widgets(self): + """创建界面""" + main_frame = ttk.Frame(self.dialog, padding="10") + main_frame.pack(fill=tk.BOTH, expand=True) + + # 基本信息区域 + info_frame = ttk.LabelFrame(main_frame, text="基本信息", padding="10") + info_frame.pack(fill=tk.X, pady=(0, 10)) + + # 健康状态(大图标) + status_frame = ttk.Frame(info_frame) + status_frame.pack(fill=tk.X, pady=5) + + status_colors = { + HealthStatus.GOOD: ("良好", "green"), + HealthStatus.WARNING: ("警告", "orange"), + HealthStatus.DANGER: ("危险", "red"), + HealthStatus.UNKNOWN: ("未知", "gray"), + } + status_text, status_color = status_colors.get( + self.smart_info.health_status, ("未知", "gray") + ) + + status_label = ttk.Label( + status_frame, + text=f"健康状态: {status_text}", + font=("Arial", 16, "bold"), + foreground=status_color + ) + status_label.pack(side=tk.LEFT, padx=10) + + # 健康评分 + monitor = SmartMonitor() + score = monitor.get_health_score(self.smart_info) + score_label = ttk.Label( + status_frame, + text=f"健康评分: {score}/100", + font=("Arial", 14) + ) + score_label.pack(side=tk.LEFT, padx=20) + + # 设备详细信息 + details_grid = ttk.Frame(info_frame) + details_grid.pack(fill=tk.X, pady=5) + + details = [ + ("设备", self.smart_info.device), + ("型号", self.smart_info.model or "N/A"), + ("序列号", self.smart_info.serial or "N/A"), + ("固件版本", self.smart_info.firmware or "N/A"), + ("整体健康", self.smart_info.overall_health or "N/A"), + ] + + for i, (label, value) in enumerate(details): + ttk.Label(details_grid, text=f"{label}:", font=("Arial", 10, "bold")).grid( + row=i, column=0, sticky=tk.W, padx=5, pady=2 + ) + ttk.Label(details_grid, text=value).grid( + row=i, column=1, sticky=tk.W, padx=5, pady=2 + ) + + # 使用统计 + usage_frame = ttk.LabelFrame(main_frame, text="使用统计", padding="10") + usage_frame.pack(fill=tk.X, pady=(0, 10)) + + # 温度 + temp = self.smart_info.temperature + if temp: + temp_status, temp_color = monitor.get_temperature_status(temp) + temp_text = f"{temp}°C ({temp_status})" + else: + temp_text = "N/A" + + usage_info = [ + ("温度", temp_text), + ("通电时间", f"{self.smart_info.power_on_hours} 小时" if self.smart_info.power_on_hours else "N/A"), + ("通电次数", f"{self.smart_info.power_cycle_count} 次" if self.smart_info.power_cycle_count else "N/A"), + ] + + for i, (label, value) in enumerate(usage_info): + ttk.Label(usage_frame, text=f"{label}:", font=("Arial", 10, "bold")).grid( + row=i, column=0, sticky=tk.W, padx=5, pady=2 + ) + temp_label = ttk.Label(usage_frame, text=value) + if label == "温度" and temp and temp > 45: + temp_label.configure(foreground="red") + temp_label.grid(row=i, column=1, sticky=tk.W, padx=5, pady=2) + + # SMART 属性表 + attr_frame = ttk.LabelFrame(main_frame, text="SMART 属性", padding="10") + attr_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10)) + + # 创建 Treeview + columns = ("ID", "名称", "当前值", "最差值", "阈值", "原始值", "状态") + tree = ttk.Treeview(attr_frame, columns=columns, show="headings", height=10) + + for col in columns: + tree.heading(col, text=col) + tree.column(col, width=80) + + tree.column("名称", width=150) + tree.column("原始值", width=100) + + # 添加滚动条 + scrollbar = ttk.Scrollbar(attr_frame, orient=tk.VERTICAL, command=tree.yview) + tree.configure(yscrollcommand=scrollbar.set) + + tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + scrollbar.pack(side=tk.RIGHT, fill=tk.Y) + + # 填充数据 + for attr in self.smart_info.attributes: + values = ( + attr.id, + attr.name, + attr.value, + attr.worst, + attr.threshold, + attr.raw_value, + attr.status + ) + # 根据状态设置颜色 + tag = "" + if attr.status == "预警": + tag = "warning" + elif attr.id in monitor.CRITICAL_ATTRIBUTES and attr.raw_value != "0": + tag = "danger" + + tree.insert("", tk.END, values=values, tags=(tag,)) + + # 设置标签颜色 + tree.tag_configure("warning", foreground="orange") + tree.tag_configure("danger", foreground="red") + + # 错误信息 + if self.smart_info.errors: + error_frame = ttk.LabelFrame(main_frame, text="警告/错误", padding="10") + error_frame.pack(fill=tk.X, pady=(0, 10)) + + for error in self.smart_info.errors: + ttk.Label(error_frame, text=f"⚠ {error}", foreground="red").pack( + anchor=tk.W, padx=5, pady=2 + ) + + # 关闭按钮 + btn_frame = ttk.Frame(main_frame) + btn_frame.pack(fill=tk.X, pady=10) + + ttk.Button(btn_frame, text="关闭", command=self.dialog.destroy).pack(side=tk.RIGHT, padx=5) + ttk.Button( + btn_frame, + text="刷新", + command=self._refresh + ).pack(side=tk.RIGHT, padx=5) + + def _refresh(self): + """刷新 SMART 信息""" + monitor = SmartMonitor() + new_info = monitor.get_disk_smart_info(self.device_path) + if new_info: + self.smart_info = new_info + # 重建界面 + for widget in self.dialog.winfo_children(): + widget.destroy() + self._create_widgets() + else: + messagebox.showerror("错误", "刷新 SMART 信息失败") + + +class SmartOverviewDialog: + """SMART 总览对话框 - 显示所有磁盘的健康状态""" + + def __init__(self, parent): + self.parent = parent + self.monitor = SmartMonitor() + + self.dialog = tk.Toplevel(parent) + self.dialog.title("SMART 健康监控 - 总览") + self.dialog.geometry("800x500") + self.dialog.transient(parent) + self.dialog.grab_set() + + self._center_window() + self._create_widgets() + self._refresh_data() + + def _center_window(self): + """居中显示""" + self.dialog.update_idletasks() + width = self.dialog.winfo_width() + height = self.dialog.winfo_height() + x = (self.dialog.winfo_screenwidth() // 2) - (width // 2) + y = (self.dialog.winfo_screenheight() // 2) - (height // 2) + self.dialog.geometry(f"{width}x{height}+{x}+{y}") + + def _create_widgets(self): + """创建界面""" + main_frame = ttk.Frame(self.dialog, padding="10") + main_frame.pack(fill=tk.BOTH, expand=True) + + # 标题 + title_label = ttk.Label( + main_frame, + text="磁盘 SMART 健康状态总览", + font=("Arial", 14, "bold") + ) + title_label.pack(pady=(0, 10)) + + # 状态统计 + self.stats_frame = ttk.Frame(main_frame) + self.stats_frame.pack(fill=tk.X, pady=(0, 10)) + + # 磁盘列表 + list_frame = ttk.LabelFrame(main_frame, text="磁盘列表", padding="10") + list_frame.pack(fill=tk.BOTH, expand=True) + + columns = ("设备", "型号", "健康状态", "评分", "温度", "通电时间", "详情") + self.tree = ttk.Treeview(list_frame, columns=columns, show="headings") + + for col in columns: + self.tree.heading(col, text=col) + self.tree.column(col, width=100) + + self.tree.column("设备", width=80) + self.tree.column("型号", width=200) + self.tree.column("详情", width=60) + + # 滚动条 + scrollbar = ttk.Scrollbar(list_frame, orient=tk.VERTICAL, command=self.tree.yview) + self.tree.configure(yscrollcommand=scrollbar.set) + + self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + scrollbar.pack(side=tk.RIGHT, fill=tk.Y) + + # 绑定双击事件 + self.tree.bind("", self._on_double_click) + + # 存储 item 数据的字典 + self._item_data = {} + + # 按钮 + btn_frame = ttk.Frame(main_frame) + btn_frame.pack(fill=tk.X, pady=10) + + ttk.Button(btn_frame, text="关闭", command=self.dialog.destroy).pack(side=tk.RIGHT, padx=5) + ttk.Button(btn_frame, text="刷新", command=self._refresh_data).pack(side=tk.RIGHT, padx=5) + + # 状态栏 + self.status_label = ttk.Label(main_frame, text="准备就绪") + self.status_label.pack(anchor=tk.W) + + def _refresh_data(self): + """刷新数据""" + self.status_label.configure(text="正在获取 SMART 信息...") + self.dialog.update() + + # 清空现有数据 + for item in self.tree.get_children(): + self.tree.delete(item) + + # 获取所有磁盘 SMART 信息 + smart_data = self.monitor.get_all_disks_smart() + + # 统计 + good_count = 0 + warning_count = 0 + danger_count = 0 + + for device_path, info in smart_data.items(): + score = self.monitor.get_health_score(info) + + # 状态文本和颜色 + status_colors = { + HealthStatus.GOOD: ("良好", "green"), + HealthStatus.WARNING: ("警告", "orange"), + HealthStatus.DANGER: ("危险", "red"), + HealthStatus.UNKNOWN: ("未知", "gray"), + } + status_text, _ = status_colors.get(info.health_status, ("未知", "gray")) + + # 统计 + if info.health_status == HealthStatus.GOOD: + good_count += 1 + elif info.health_status == HealthStatus.WARNING: + warning_count += 1 + elif info.health_status == HealthStatus.DANGER: + danger_count += 1 + + # 温度 + temp_text = f"{info.temperature}°C" if info.temperature else "N/A" + + # 通电时间 + hours_text = f"{info.power_on_hours}h" if info.power_on_hours else "N/A" + + values = ( + device_path, + info.model[:30] if info.model else "N/A", + status_text, + f"{score}/100", + temp_text, + hours_text, + "查看详情" + ) + + tag = "" + if info.health_status == HealthStatus.WARNING: + tag = "warning" + elif info.health_status == HealthStatus.DANGER: + tag = "danger" + + item_id = self.tree.insert("", tk.END, values=values, tags=(tag,)) + self._item_data[item_id] = {"device": device_path, "info": info} + + # 设置颜色 + self.tree.tag_configure("warning", foreground="orange") + self.tree.tag_configure("danger", foreground="red") + + # 更新统计 + for widget in self.stats_frame.winfo_children(): + widget.destroy() + + stats = [ + ("总磁盘", len(smart_data), "black"), + ("良好", good_count, "green"), + ("警告", warning_count, "orange"), + ("危险", danger_count, "red"), + ] + + for label, count, color in stats: + lbl = ttk.Label( + self.stats_frame, + text=f"{label}: {count}", + font=("Arial", 11, "bold"), + foreground=color + ) + lbl.pack(side=tk.LEFT, padx=10) + + self.status_label.configure(text=f"已更新 {len(smart_data)} 个磁盘的信息") + + def _on_double_click(self, event): + """双击查看详情""" + item = self.tree.selection()[0] + data = self._item_data.get(item) + if data: + # 暂时释放 grab,以便打开子对话框 + self.dialog.grab_release() + try: + SmartInfoDialog(self.dialog, data["device"], data["info"]) + finally: + # 恢复 grab + self.dialog.grab_set() diff --git a/dialogs_undo.py b/dialogs_undo.py new file mode 100644 index 0000000..b1316b7 --- /dev/null +++ b/dialogs_undo.py @@ -0,0 +1,250 @@ +# dialogs_undo.py +"""撤销功能对话框""" + +import tkinter as tk +from tkinter import ttk, messagebox +from operation_history import OperationHistory, OperationStatus, OperationType +from dialogs_history import PartitionBackupDialog + + +class UndoDialog: + """撤销操作对话框""" + + def __init__(self, parent, history, partition_backup): + self.parent = parent + self.history = history + self.partition_backup = partition_backup + self.result = False + + self.dialog = tk.Toplevel(parent) + self.dialog.title("撤销操作") + self.dialog.geometry("700x500") + self.dialog.transient(parent) + + self._center_window() + self._create_widgets() + self._load_undoable_operations() + + def _center_window(self): + self.dialog.update_idletasks() + width = self.dialog.winfo_width() + height = self.dialog.winfo_height() + x = (self.dialog.winfo_screenwidth() // 2) - (width // 2) + y = (self.dialog.winfo_screenheight() // 2) - (height // 2) + self.dialog.geometry(f"{width}x{height}+{x}+{y}") + + def _create_widgets(self): + main_frame = ttk.Frame(self.dialog, padding="15") + main_frame.pack(fill=tk.BOTH, expand=True) + + # 标题 + title_label = ttk.Label( + main_frame, + text="可撤销的操作", + font=("Arial", 14, "bold") + ) + title_label.pack(pady=(0, 10)) + + # 说明 + info_label = ttk.Label( + main_frame, + text="以下操作可以通过分区表备份进行回滚。选择操作查看详情并撤销。", + wraplength=600 + ) + info_label.pack(pady=(0, 10)) + + # 操作列表 + list_frame = ttk.Frame(main_frame) + list_frame.pack(fill=tk.BOTH, expand=True) + + columns = ("时间", "操作类型", "设备", "状态") + self.tree = ttk.Treeview(list_frame, columns=columns, show="headings") + + for col in columns: + self.tree.heading(col, text=col) + + self.tree.column("时间", width=150) + self.tree.column("操作类型", width=150) + self.tree.column("设备", width=150) + self.tree.column("状态", width=100) + + scrollbar = ttk.Scrollbar(list_frame, orient=tk.VERTICAL, command=self.tree.yview) + self.tree.configure(yscrollcommand=scrollbar.set) + + self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) + scrollbar.pack(side=tk.RIGHT, fill=tk.Y) + + # 绑定选择事件 + self.tree.bind("<>", self._on_select) + + # 详情区域 + detail_frame = ttk.LabelFrame(main_frame, text="操作详情与撤销选项", padding="10") + detail_frame.pack(fill=tk.X, pady=(10, 0)) + + self.detail_text = tk.Text(detail_frame, height=8, wrap=tk.WORD) + self.detail_text.pack(fill=tk.BOTH, expand=True) + self.detail_text.config(state=tk.DISABLED) + + # 按钮 + btn_frame = ttk.Frame(main_frame) + btn_frame.pack(fill=tk.X, pady=10) + + ttk.Button(btn_frame, text="关闭", command=self.dialog.destroy).pack(side=tk.RIGHT, padx=5) + ttk.Button(btn_frame, text="刷新", command=self._load_undoable_operations).pack(side=tk.RIGHT, padx=5) + self.undo_btn = ttk.Button(btn_frame, text="撤销选中操作", command=self._undo_selected, state=tk.DISABLED) + self.undo_btn.pack(side=tk.RIGHT, padx=5) + + self.selected_record = None + + def _load_undoable_operations(self): + """加载可撤销的操作""" + # 清空列表 + for item in self.tree.get_children(): + self.tree.delete(item) + + # 获取所有成功完成的操作 + records = self.history.get_recent_records(limit=50) + self.undoable_records = [] + + for record in records: + # 检查是否有回滚数据 + if record.rollback_data and record.status == OperationStatus.SUCCESS.value: + # 检查是否是可撤销的操作类型 + if record.operation_type in [ + OperationType.CREATE_PARTITION.value, + OperationType.DELETE_PARTITION.value, + OperationType.WIPE_PARTITION_TABLE.value + ]: + self.undoable_records.append(record) + + # 格式化时间 + try: + from datetime import datetime + dt = datetime.fromisoformat(record.timestamp) + time_str = dt.strftime("%Y-%m-%d %H:%M:%S") + except: + time_str = record.timestamp + + values = ( + time_str, + record.operation_type, + record.device_path, + record.status + ) + + self.tree.insert("", tk.END, values=values) + + def _on_select(self, event): + """选择操作""" + selection = self.tree.selection() + if not selection: + self.undo_btn.config(state=tk.DISABLED) + return + + index = self.tree.index(selection[0]) + if index < len(self.undoable_records): + self.selected_record = self.undoable_records[index] + self._show_details() + self.undo_btn.config(state=tk.NORMAL) + else: + self.selected_record = None + self.undo_btn.config(state=tk.DISABLED) + + def _show_details(self): + """显示详情""" + if not self.selected_record: + return + + self.detail_text.config(state=tk.NORMAL) + self.detail_text.delete(1.0, tk.END) + + record = self.selected_record + + text = f"操作ID: {record.id}\n" + text += f"时间: {record.timestamp}\n" + text += f"类型: {record.operation_type}\n" + text += f"设备: {record.device_path}\n\n" + + text += "操作详情:\n" + text += f"{record.details}\n\n" + + if record.rollback_data: + text += "可撤销数据:\n" + if 'partition_table_backup' in record.rollback_data: + backup_file = record.rollback_data['partition_table_backup'] + text += f" 分区表备份: {backup_file}\n" + if backup_file: + import os + if os.path.exists(backup_file): + text += " 状态: 备份文件存在,可以恢复\n" + else: + text += " 状态: 备份文件不存在!无法恢复\n" + self.undo_btn.config(state=tk.DISABLED) + else: + text += f"{record.rollback_data}\n" + + self.detail_text.insert(1.0, text) + self.detail_text.config(state=tk.DISABLED) + + def _undo_selected(self): + """撤销选中的操作""" + if not self.selected_record: + return + + record = self.selected_record + + # 确认对话框 + result = messagebox.askyesno( + "确认撤销", + f"确定要撤销以下操作吗?\n\n" + f"操作类型: {record.operation_type}\n" + f"设备: {record.device_path}\n" + f"时间: {record.timestamp}\n\n" + f"警告:这将恢复分区表到操作前的状态!\n" + f"此操作可能导致数据丢失!", + icon=messagebox.WARNING + ) + + if not result: + return + + # 检查是否有备份 + if not record.rollback_data or 'partition_table_backup' not in record.rollback_data: + messagebox.showerror("错误", "没有找到可恢复的备份数据") + return + + backup_file = record.rollback_data['partition_table_backup'] + if not backup_file: + messagebox.showerror("错误", "备份文件路径为空") + return + + import os + if not os.path.exists(backup_file): + messagebox.showerror("错误", f"备份文件不存在:\n{backup_file}") + return + + # 执行恢复 + device_path = record.device_path + + # 对于分区操作,需要获取父磁盘 + if record.operation_type == OperationType.DELETE_PARTITION.value: + import re + match = re.match(r'(/dev/[a-z]+)\d+', device_path) + if match: + device_path = match.group(1) + + if self.partition_backup.restore_partition_table(device_path, backup_file): + # 更新操作状态 + self.history.update_status( + record.id, + OperationStatus.ROLLED_BACK + ) + messagebox.showinfo("成功", f"操作已成功撤销!\n分区表已恢复到 {device_path}") + self.result = True + self.dialog.destroy() + else: + messagebox.showerror("错误", "撤销操作失败!\n请检查日志获取详细信息。") + + def wait_for_result(self): + self.dialog.wait_window() + return self.result diff --git a/linux-storage-manager.desktop b/linux-storage-manager.desktop new file mode 100644 index 0000000..5f093f7 --- /dev/null +++ b/linux-storage-manager.desktop @@ -0,0 +1,9 @@ +[Desktop Entry] +Name=Linux 存储管理器 +Comment=Linux 存储管理工具 +Exec=/usr/local/bin/linux-storage-manager +Icon=drive-harddisk +Terminal=false +Type=Application +Categories=System;Utility; +Keywords=storage;disk;partition;raid;lvm; diff --git a/linux-storage-manager.spec b/linux-storage-manager.spec new file mode 100644 index 0000000..ffa091d --- /dev/null +++ b/linux-storage-manager.spec @@ -0,0 +1,38 @@ +# -*- mode: python ; coding: utf-8 -*- + + +a = Analysis( + ['mainwindow_tkinter.py'], + pathex=[], + binaries=[], + datas=[], + hiddenimports=[], + hookspath=[], + hooksconfig={}, + runtime_hooks=[], + excludes=[], + noarchive=False, + optimize=0, +) +pyz = PYZ(a.pure) + +exe = EXE( + pyz, + a.scripts, + a.binaries, + a.datas, + [], + name='linux-storage-manager', + debug=False, + bootloader_ignore_signals=False, + strip=False, + upx=True, + upx_exclude=[], + runtime_tmpdir=None, + console=True, + disable_windowed_traceback=False, + argv_emulation=False, + target_arch=None, + codesign_identity=None, + entitlements_file=None, +) diff --git a/mainwindow_tkinter.py b/mainwindow_tkinter.py index fd0ef65..59cd679 100644 --- a/mainwindow_tkinter.py +++ b/mainwindow_tkinter.py @@ -22,6 +22,11 @@ from occupation_resolver_tkinter import OccupationResolver # 导入自定义对话框 from dialogs_tkinter import (CreatePartitionDialog, MountDialog, CreateRaidDialog, CreatePvDialog, CreateVgDialog, CreateLvDialog) +# 导入 SMART 对话框 +from dialogs_smart import SmartInfoDialog, SmartOverviewDialog +from smart_monitor import SmartMonitor +# 导入增强对话框 +from dialogs_enhanced import EnhancedFormatDialog, EnhancedRaidDeleteDialog, EnhancedPartitionDialog class MainWindow: @@ -71,9 +76,20 @@ class MainWindow: def _create_widgets(self): """创建界面元素""" + # 顶部按钮栏 + btn_frame = ttk.Frame(self.main_frame) + btn_frame.pack(fill=tk.X, pady=(0, 5)) + # 刷新按钮 - self.refresh_btn = ttk.Button(self.main_frame, text="刷新数据", command=self.refresh_all_info) - self.refresh_btn.pack(fill=tk.X, pady=(0, 5)) + self.refresh_btn = ttk.Button(btn_frame, text="刷新数据", command=self.refresh_all_info) + self.refresh_btn.pack(side=tk.LEFT, padx=(0, 5)) + + # SMART 监控按钮 + self.smart_btn = ttk.Button(btn_frame, text="SMART 监控", command=self._show_smart_overview) + self.smart_btn.pack(side=tk.LEFT, padx=(0, 5)) + + # 菜单栏 + self._create_menu() # Notebook (Tab 控件) self.notebook = ttk.Notebook(self.main_frame) @@ -107,6 +123,38 @@ class MainWindow: self.log_text = ScrolledText(log_frame, height=8, state=tk.DISABLED) self.log_text.pack(fill=tk.BOTH, expand=True) + def _create_menu(self): + """创建菜单栏""" + menubar = tk.Menu(self.root) + self.root.config(menu=menubar) + + # 工具菜单 + tools_menu = tk.Menu(menubar, tearoff=0) + menubar.add_cascade(label="工具", menu=tools_menu) + tools_menu.add_command(label="SMART 监控", command=self._show_smart_overview) + tools_menu.add_separator() + tools_menu.add_command(label="刷新数据", command=self.refresh_all_info) + + # 帮助菜单 + help_menu = tk.Menu(menubar, tearoff=0) + menubar.add_cascade(label="帮助", menu=help_menu) + help_menu.add_command(label="关于", command=self._show_about) + + def _show_about(self): + """显示关于对话框""" + messagebox.showinfo( + "关于", + "Linux 存储管理工具\n\n" + "版本: 2.0 (Tkinter 版)\n" + "适配低版本系统 (CentOS 8+)\n\n" + "功能:\n" + "- 块设备管理\n" + "- RAID 管理\n" + "- LVM 管理\n" + "- SMART 监控\n\n" + "使用 Python 3.6+ 和 Tkinter 构建" + ) + def _create_block_devices_tree(self): """创建块设备 Treeview""" # 创建框架 @@ -359,6 +407,10 @@ class MainWindow: menu.add_command(label=f"擦除分区表 {device_path}...", command=lambda: self._handle_wipe_partition_table(device_path)) menu.add_separator() + # SMART 信息 + menu.add_command(label=f"查看 SMART 信息 {device_path}", + command=lambda: self._handle_show_smart(device_path)) + menu.add_separator() # 分区操作 if device_type == 'part': @@ -373,7 +425,24 @@ class MainWindow: menu.add_command(label=f"删除分区 {device_path}", command=lambda: self._handle_delete_partition(device_path)) menu.add_command(label=f"格式化分区 {device_path}...", - command=lambda: self._handle_format_partition(device_path)) + command=lambda dp=device_path, dd=dev_data: self._handle_format_partition(dp, dd)) + + # 文件系统检查和修复 + fstype = dev_data.get('fstype', '') + if fstype: + fs_menu = tk.Menu(menu, tearoff=0) + # 根据文件系统类型添加不同的检查/修复选项 + if fstype.startswith('ext'): + fs_menu.add_command(label=f"检查文件系统 (fsck) {device_path}", + command=lambda dp=device_path, ft=fstype: self._handle_fsck(dp, ft)) + fs_menu.add_command(label=f"调整文件系统大小 (resize2fs) {device_path}", + command=lambda dp=device_path, ft=fstype: self._handle_resize2fs(dp, ft)) + elif fstype == 'xfs': + fs_menu.add_command(label=f"修复文件系统 (xfs_repair) {device_path}", + command=lambda dp=device_path: self._handle_xfs_repair(dp)) + if fs_menu.index(tk.END) is not None: + menu.add_cascade(label=f"文件系统工具 {device_path}", menu=fs_menu) + menu.add_separator() # Loop 设备操作 @@ -459,9 +528,26 @@ class MainWindow: menu.add_command(label=f"停止阵列 {array_path}", command=lambda: self._handle_stop_raid_array(array_path)) menu.add_command(label=f"删除阵列 {array_path}", - command=lambda: self._handle_delete_active_raid_array(array_path, member_devices, array_uuid)) + command=lambda ap=array_path, md=member_devices, au=array_uuid, ad=array_data: + self._handle_delete_active_raid_array(ap, md, au)) menu.add_command(label=f"格式化阵列 {array_path}...", command=lambda: self._handle_format_raid_array(array_path)) + + # 文件系统检查和修复 + dev_details = self.system_manager.get_device_details_by_path(array_path) + fstype = dev_details.get('fstype', '') if dev_details else '' + if fstype: + fs_menu = tk.Menu(menu, tearoff=0) + if fstype.startswith('ext'): + fs_menu.add_command(label=f"检查文件系统 (fsck) {array_path}", + command=lambda dp=array_path, ft=fstype: self._handle_fsck(dp, ft)) + fs_menu.add_command(label=f"调整文件系统大小 (resize2fs) {array_path}", + command=lambda dp=array_path, ft=fstype: self._handle_resize2fs(dp, ft)) + elif fstype == 'xfs': + fs_menu.add_command(label=f"修复文件系统 (xfs_repair) {array_path}", + command=lambda dp=array_path: self._handle_xfs_repair(dp)) + if fs_menu.index(tk.END) is not None: + menu.add_cascade(label=f"文件系统工具 {array_path}", menu=fs_menu) if menu.index(tk.END) is not None: self._show_context_menu(menu, event.x_root, event.y_root) @@ -535,7 +621,24 @@ class MainWindow: menu.add_command(label=f"删除逻辑卷 {lv_name}", command=lambda: self._handle_delete_lv(lv_name, vg_name)) menu.add_command(label=f"格式化逻辑卷 {lv_name}...", - command=lambda: self._handle_format_partition(lv_path)) + command=lambda lp=lv_path, ln=lv_name: self._handle_format_partition( + lp, {'name': ln, 'path': lp})) + + # 文件系统检查和修复 + dev_details = self.system_manager.get_device_details_by_path(lv_path) + fstype = dev_details.get('fstype', '') if dev_details else '' + if fstype: + fs_menu = tk.Menu(menu, tearoff=0) + if fstype.startswith('ext'): + fs_menu.add_command(label=f"检查文件系统 (fsck) {lv_path}", + command=lambda dp=lv_path, ft=fstype: self._handle_fsck(dp, ft)) + fs_menu.add_command(label=f"调整文件系统大小 (resize2fs) {lv_path}", + command=lambda dp=lv_path, ft=fstype: self._handle_resize2fs(dp, ft)) + elif fstype == 'xfs': + fs_menu.add_command(label=f"修复文件系统 (xfs_repair) {lv_path}", + command=lambda dp=lv_path: self._handle_xfs_repair(dp)) + if fs_menu.index(tk.END) is not None: + menu.add_cascade(label=f"文件系统工具 {lv_path}", menu=fs_menu) if menu.index(tk.END) is not None: self._show_context_menu(menu, event.x_root, event.y_root) @@ -580,6 +683,41 @@ class MainWindow: """处理卸载 loop 设备""" if self.disk_ops.unmount_loop_device(device_path): self.refresh_all_info() + + def _handle_show_smart(self, device_path): + """显示指定设备的 SMART 信息""" + monitor = SmartMonitor() + if not monitor.is_available(): + messagebox.showwarning("警告", + "smartctl 工具未安装。\n" + "请安装 smartmontools:\n" + " CentOS/RHEL: sudo yum install smartmontools\n" + " Ubuntu/Debian: sudo apt-get install smartmontools") + return + + smart_info = monitor.get_disk_smart_info(device_path) + if smart_info: + SmartInfoDialog(self.root, device_path, smart_info) + else: + messagebox.showerror("错误", + f"无法获取 {device_path} 的 SMART 信息。\n" + f"可能的原因:\n" + f"1. 设备不支持 SMART\n" + f"2. 需要 root 权限\n" + f"3. 设备未正确连接") + + def _show_smart_overview(self): + """显示 SMART 总览""" + monitor = SmartMonitor() + if not monitor.is_available(): + messagebox.showwarning("警告", + "smartctl 工具未安装。\n" + "请安装 smartmontools:\n" + " CentOS/RHEL: sudo yum install smartmontools\n" + " Ubuntu/Debian: sudo apt-get install smartmontools") + return + + SmartOverviewDialog(self.root) def _handle_wipe_partition_table(self, device_path): """处理擦除物理盘分区表""" @@ -652,7 +790,8 @@ class MainWindow: max_available_mib = max(0.0, total_disk_mib - 1.0) start_position_mib = 1.0 - dialog = CreatePartitionDialog(self.root, disk_path, total_disk_mib, max_available_mib) + # 使用增强型分区创建对话框(带滑块可视化) + dialog = EnhancedPartitionDialog(self.root, disk_path, total_disk_mib, max_available_mib) info = dialog.wait_for_result() if info: @@ -684,9 +823,101 @@ class MainWindow: if self.disk_ops.delete_partition(device_path): self.refresh_all_info() - def _handle_format_partition(self, device_path): - """处理格式化分区""" - self.disk_ops.format_partition(device_path) + def _handle_format_partition(self, device_path, dev_data=None): + """处理格式化分区 - 使用增强型对话框""" + # 获取设备信息 + if dev_data is None: + dev_data = self.system_manager.get_device_details_by_path(device_path) + + # 使用增强型格式化对话框 + dialog = EnhancedFormatDialog(self.root, device_path, dev_data or {}) + fs_type = dialog.wait_for_result() + + if fs_type: + # 执行格式化 + self.disk_ops.format_partition(device_path, fs_type) + + def _handle_fsck(self, device_path, fstype): + """检查/修复 ext 文件系统""" + # 检查设备是否已挂载 + mount_point = self.system_manager.get_mountpoint_for_device(device_path) + if mount_point and mount_point != 'N/A': + messagebox.showwarning("警告", f"设备 {device_path} 已挂载到 {mount_point}\n请先卸载后再执行文件系统检查。") + return + + if not messagebox.askyesno("确认", + f"即将对 {device_path} ({fstype}) 执行文件系统检查。\n" + f"命令: fsck -f -y {device_path}\n\n" + f"注意:此操作可能需要较长时间,请勿中断。", + default=messagebox.NO): + return + + logger.info(f"开始检查文件系统: {device_path} ({fstype})") + success, stdout, stderr = self.lvm_ops._execute_shell_command( + ["fsck", "-f", "-y", device_path], + f"检查文件系统 {device_path} 失败" + ) + + if success: + messagebox.showinfo("成功", f"文件系统检查完成:\n{stdout[:500] if stdout else '无输出'}") + else: + messagebox.showerror("错误", f"文件系统检查失败:\n{stderr[:500] if stderr else '未知错误'}") + self.refresh_all_info() + + def _handle_xfs_repair(self, device_path): + """修复 XFS 文件系统""" + # 检查设备是否已挂载 + mount_point = self.system_manager.get_mountpoint_for_device(device_path) + if mount_point and mount_point != 'N/A': + messagebox.showwarning("警告", f"设备 {device_path} 已挂载到 {mount_point}\n请先卸载后再执行文件系统修复。") + return + + if not messagebox.askyesno("确认", + f"即将对 {device_path} (XFS) 执行文件系统修复。\n" + f"命令: xfs_repair {device_path}\n\n" + f"警告:此操作会尝试修复文件系统错误,但可能无法恢复所有数据。\n" + f"建议先备份重要数据。", + default=messagebox.NO): + return + + logger.info(f"开始修复 XFS 文件系统: {device_path}") + success, stdout, stderr = self.lvm_ops._execute_shell_command( + ["xfs_repair", device_path], + f"修复 XFS 文件系统 {device_path} 失败" + ) + + if success: + messagebox.showinfo("成功", f"XFS 文件系统修复完成:\n{stdout[:500] if stdout else '无输出'}") + else: + messagebox.showerror("错误", f"XFS 文件系统修复失败:\n{stderr[:500] if stderr else '未知错误'}") + self.refresh_all_info() + + def _handle_resize2fs(self, device_path, fstype): + """调整 ext 文件系统大小""" + # 检查设备是否已挂载 + mount_point = self.system_manager.get_mountpoint_for_device(device_path) + if mount_point and mount_point != 'N/A': + messagebox.showwarning("警告", f"设备 {device_path} 已挂载到 {mount_point}\n请先卸载后再执行调整大小操作。") + return + + if not messagebox.askyesno("确认", + f"即将对 {device_path} ({fstype}) 强制调整文件系统大小以匹配分区大小。\n" + f"命令: resize2fs -f {device_path}\n\n" + f"注意:此操作会强制调整文件系统大小,请确保分区大小已正确设置。", + default=messagebox.NO): + return + + logger.info(f"开始调整文件系统大小: {device_path} ({fstype})") + success, stdout, stderr = self.lvm_ops._execute_shell_command( + ["resize2fs", "-f", device_path], + f"调整文件系统大小 {device_path} 失败" + ) + + if success: + messagebox.showinfo("成功", f"文件系统大小调整完成:\n{stdout[:500] if stdout else '无输出'}") + else: + messagebox.showerror("错误", f"调整文件系统大小失败:\n{stderr[:500] if stderr else '未知错误'}") + self.refresh_all_info() def on_disk_formatting_finished(self, success, device_path, stdout, stderr): """接收格式化完成回调并刷新界面""" @@ -791,10 +1022,29 @@ class MainWindow: if self.raid_ops.stop_raid_array(array_path): self.refresh_all_info() - def _handle_delete_active_raid_array(self, array_path, member_devices, uuid): - """处理删除活动的 RAID 阵列""" - if self.raid_ops.delete_active_raid_array(array_path, member_devices, uuid): - self.refresh_all_info() + def _handle_delete_active_raid_array(self, array_path, member_devices, uuid, array_data=None): + """处理删除活动的 RAID 阵列 - 使用增强型对话框""" + # 获取阵列详情(如果未提供) + if array_data is None: + array_data = {} + try: + raid_arrays = self.system_manager.get_mdadm_arrays() + for array in raid_arrays: + if array.get('device') == array_path: + array_data = array + break + except: + pass + + # 使用增强型删除对话框 + dialog = EnhancedRaidDeleteDialog( + self.root, array_path, array_data, member_devices + ) + + if dialog.wait_for_result(): + # 执行删除 + if self.raid_ops.delete_active_raid_array(array_path, member_devices, uuid): + self.refresh_all_info() def _handle_delete_configured_raid_array(self, uuid): """处理删除停止状态 RAID 阵列的配置文件条目""" @@ -1051,3 +1301,10 @@ class MainWindow: self.refresh_all_info() else: messagebox.showerror("删除卷组失败", f"删除卷组 {vg_name} 失败,请检查日志。") + + +if __name__ == "__main__": + # 创建主窗口 + root = tk.Tk() + app = MainWindow(root) + root.mainloop() diff --git a/operation_history.py b/operation_history.py new file mode 100644 index 0000000..007984b --- /dev/null +++ b/operation_history.py @@ -0,0 +1,456 @@ +# operation_history.py +"""操作历史与撤销管理模块""" + +import json +import os +import time +import logging +import subprocess +from datetime import datetime +from typing import List, Dict, Optional, Callable +from dataclasses import dataclass, asdict +from enum import Enum + +logger = logging.getLogger(__name__) + + +class OperationStatus(Enum): + """操作状态""" + PENDING = "待执行" + RUNNING = "执行中" + SUCCESS = "成功" + FAILED = "失败" + CANCELLED = "已取消" + ROLLED_BACK = "已撤销" + + +class OperationType(Enum): + """操作类型""" + CREATE_PARTITION = "创建分区" + DELETE_PARTITION = "删除分区" + FORMAT_PARTITION = "格式化分区" + MOUNT_PARTITION = "挂载分区" + UNMOUNT_PARTITION = "卸载分区" + CREATE_RAID = "创建 RAID" + DELETE_RAID = "删除 RAID" + STOP_RAID = "停止 RAID" + CREATE_PV = "创建物理卷" + DELETE_PV = "删除物理卷" + CREATE_VG = "创建卷组" + DELETE_VG = "删除卷组" + CREATE_LV = "创建逻辑卷" + DELETE_LV = "删除逻辑卷" + ACTIVATE_LV = "激活逻辑卷" + DEACTIVATE_LV = "停用逻辑卷" + WIPE_PARTITION_TABLE = "擦除分区表" + + +@dataclass +class OperationRecord: + """操作记录""" + id: str + timestamp: str + operation_type: str + device_path: str + status: str + details: Dict + rollback_data: Optional[Dict] = None + error_message: Optional[str] = None + execution_time: Optional[float] = None + + +class OperationHistory: + """操作历史管理器""" + + def __init__(self, history_file: str = None): + if history_file is None: + # 默认存储在用户目录 + home_dir = os.path.expanduser("~") + self.history_dir = os.path.join(home_dir, ".linux_storage_manager") + os.makedirs(self.history_dir, exist_ok=True) + self.history_file = os.path.join(self.history_dir, "operation_history.json") + self.backup_dir = os.path.join(self.history_dir, "backups") + os.makedirs(self.backup_dir, exist_ok=True) + else: + self.history_file = history_file + self.backup_dir = os.path.join(os.path.dirname(history_file), "backups") + os.makedirs(self.backup_dir, exist_ok=True) + + self.records: List[OperationRecord] = [] + self.pending_operations: List[OperationRecord] = [] + self._load_history() + + def _load_history(self): + """加载历史记录""" + if os.path.exists(self.history_file): + try: + with open(self.history_file, 'r', encoding='utf-8') as f: + data = json.load(f) + self.records = [OperationRecord(**record) for record in data] + logger.info(f"已加载 {len(self.records)} 条历史记录") + except Exception as e: + logger.error(f"加载历史记录失败: {e}") + self.records = [] + else: + self.records = [] + + def _save_history(self): + """保存历史记录""" + try: + with open(self.history_file, 'w', encoding='utf-8') as f: + json.dump([asdict(r) for r in self.records], f, + ensure_ascii=False, indent=2) + except Exception as e: + logger.error(f"保存历史记录失败: {e}") + + def add_record(self, operation_type: OperationType, device_path: str, + details: Dict, rollback_data: Optional[Dict] = None) -> str: + """添加操作记录""" + record_id = f"{int(time.time() * 1000)}_{operation_type.name}" + record = OperationRecord( + id=record_id, + timestamp=datetime.now().isoformat(), + operation_type=operation_type.value, + device_path=device_path, + status=OperationStatus.SUCCESS.value, + details=details, + rollback_data=rollback_data + ) + self.records.append(record) + self._save_history() + logger.info(f"添加操作记录: {record_id} - {operation_type.value}") + return record_id + + def update_status(self, record_id: str, status: OperationStatus, + error_message: str = None, execution_time: float = None): + """更新操作状态""" + for record in self.records: + if record.id == record_id: + record.status = status.value + if error_message: + record.error_message = error_message + if execution_time: + record.execution_time = execution_time + + # 从待执行列表中移除 + if status in [OperationStatus.SUCCESS, OperationStatus.FAILED, + OperationStatus.CANCELLED]: + self.pending_operations = [ + r for r in self.pending_operations if r.id != record_id + ] + + self._save_history() + logger.info(f"更新操作状态: {record_id} -> {status.value}") + break + + def get_recent_records(self, limit: int = 50) -> List[OperationRecord]: + """获取最近的操作记录""" + return sorted(self.records, key=lambda r: r.timestamp, reverse=True)[:limit] + + def get_pending_operations(self) -> List[OperationRecord]: + """获取待执行的操作""" + return self.pending_operations + + def export_history(self, file_path: str): + """导出历史记录""" + try: + with open(file_path, 'w', encoding='utf-8') as f: + json.dump([asdict(r) for r in self.records], f, + ensure_ascii=False, indent=2) + return True + except Exception as e: + logger.error(f"导出历史记录失败: {e}") + return False + + def clear_history(self, days: int = None): + """清理历史记录""" + if days is None: + # 清空所有记录 + self.records = [] + else: + # 清理指定天数之前的记录 + cutoff = time.time() - (days * 24 * 60 * 60) + self.records = [ + r for r in self.records + if datetime.fromisoformat(r.timestamp).timestamp() > cutoff + ] + self._save_history() + + +class PartitionTableBackup: + """分区表备份管理器""" + + def __init__(self, backup_dir: str = None): + if backup_dir is None: + home_dir = os.path.expanduser("~") + self.backup_dir = os.path.join( + home_dir, ".linux_storage_manager", "partition_backups" + ) + else: + self.backup_dir = backup_dir + os.makedirs(self.backup_dir, exist_ok=True) + + def backup_partition_table(self, device_path: str) -> Optional[str]: + """备份分区表""" + timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") + device_name = os.path.basename(device_path) + backup_file = os.path.join( + self.backup_dir, + f"{device_name}_pt_{timestamp}.bin" + ) + + try: + # 使用 sfdisk 备份分区表 + result = subprocess.run( + ["sudo", "sfdisk", "-d", device_path], + capture_output=True, + text=True, + check=True, + timeout=30 + ) + + with open(backup_file, 'w') as f: + f.write(result.stdout) + + logger.info(f"分区表已备份: {backup_file}") + return backup_file + + except subprocess.CalledProcessError as e: + logger.error(f"备份分区表失败: {e}") + return None + except Exception as e: + logger.error(f"备份分区表出错: {e}") + return None + + def restore_partition_table(self, device_path: str, backup_file: str) -> bool: + """恢复分区表""" + try: + if not os.path.exists(backup_file): + logger.error(f"备份文件不存在: {backup_file}") + return False + + with open(backup_file, 'r') as f: + partition_data = f.read() + + # 使用 sfdisk 恢复分区表 + result = subprocess.run( + ["sudo", "sfdisk", device_path], + input=partition_data, + capture_output=True, + text=True, + check=True, + timeout=30 + ) + + logger.info(f"分区表已从 {backup_file} 恢复到 {device_path}") + return True + + except subprocess.CalledProcessError as e: + logger.error(f"恢复分区表失败: {e.stderr}") + return False + except Exception as e: + logger.error(f"恢复分区表出错: {e}") + return False + + def list_backups(self, device_path: str = None) -> List[Dict]: + """列出可用的备份""" + backups = [] + + try: + for filename in os.listdir(self.backup_dir): + if filename.endswith('.bin'): + filepath = os.path.join(self.backup_dir, filename) + stat = os.stat(filepath) + + # 解析文件名 + parts = filename.rsplit('_pt_', 1) + if len(parts) == 2: + device_name = parts[0] + timestamp = parts[1].replace('.bin', '') + + if device_path is None or device_name == os.path.basename(device_path): + backups.append({ + 'file': filepath, + 'device': device_name, + 'timestamp': timestamp, + 'size': stat.st_size, + 'created': datetime.fromtimestamp(stat.st_mtime).isoformat() + }) + + except Exception as e: + logger.error(f"列出备份失败: {e}") + + return sorted(backups, key=lambda x: x['created'], reverse=True) + + def delete_backup(self, backup_file: str) -> bool: + """删除备份""" + try: + if os.path.exists(backup_file): + os.remove(backup_file) + logger.info(f"备份已删除: {backup_file}") + return True + return False + except Exception as e: + logger.error(f"删除备份失败: {e}") + return False + + +class OperationQueue: + """操作队列管理器""" + + def __init__(self, history: OperationHistory): + self.history = history + self.queue: List[Dict] = [] + self.running = False + self.current_operation = None + self.callbacks: Dict[str, List[Callable]] = { + 'on_start': [], + 'on_complete': [], + 'on_error': [], + 'on_cancel': [] + } + + def add_operation(self, operation_type: OperationType, device_path: str, + execute_func: Callable, details: Dict, + rollback_data: Dict = None) -> str: + """添加操作到队列""" + record_id = self.history.add_record( + operation_type, device_path, details, rollback_data + ) + + operation = { + 'id': record_id, + 'type': operation_type, + 'device_path': device_path, + 'execute_func': execute_func, + 'status': OperationStatus.PENDING + } + self.queue.append(operation) + + logger.info(f"操作已加入队列: {record_id}") + return record_id + + def start_execution(self): + """开始执行队列""" + if self.running: + return + + self.running = True + self._execute_next() + + def _execute_next(self): + """执行下一个操作""" + if not self.queue: + self.running = False + return + + operation = self.queue[0] + self.current_operation = operation + + # 更新状态为运行中 + self.history.update_status( + operation['id'], + OperationStatus.RUNNING + ) + operation['status'] = OperationStatus.RUNNING + + # 触发开始回调 + for callback in self.callbacks['on_start']: + callback(operation) + + # 执行操作 + start_time = time.time() + try: + result = operation['execute_func']() + execution_time = time.time() - start_time + + if result: + operation['status'] = OperationStatus.SUCCESS + self.history.update_status( + operation['id'], + OperationStatus.SUCCESS, + execution_time=execution_time + ) + + # 触发完成回调 + for callback in self.callbacks['on_complete']: + callback(operation, True) + else: + operation['status'] = OperationStatus.FAILED + self.history.update_status( + operation['id'], + OperationStatus.FAILED, + error_message="操作执行失败", + execution_time=execution_time + ) + + # 触发错误回调 + for callback in self.callbacks['on_error']: + callback(operation, "操作执行失败") + + except Exception as e: + execution_time = time.time() - start_time + operation['status'] = OperationStatus.FAILED + self.history.update_status( + operation['id'], + OperationStatus.FAILED, + error_message=str(e), + execution_time=execution_time + ) + + # 触发错误回调 + for callback in self.callbacks['on_error']: + callback(operation, str(e)) + + finally: + # 从队列中移除 + if operation in self.queue: + self.queue.remove(operation) + self.current_operation = None + + # 继续执行下一个 + if self.running and self.queue: + self._execute_next() + else: + self.running = False + + def cancel_current(self): + """取消当前操作""" + if self.current_operation: + self.history.update_status( + self.current_operation['id'], + OperationStatus.CANCELLED + ) + self.current_operation['status'] = OperationStatus.CANCELLED + + # 触发取消回调 + for callback in self.callbacks['on_cancel']: + callback(self.current_operation) + + def clear_queue(self): + """清空队列""" + for op in self.queue: + if op['status'] == OperationStatus.PENDING: + self.history.update_status( + op['id'], + OperationStatus.CANCELLED + ) + self.queue = [] + self.running = False + + def register_callback(self, event: str, callback: Callable): + """注册回调函数""" + if event in self.callbacks: + self.callbacks[event].append(callback) + + def get_queue_status(self) -> Dict: + """获取队列状态""" + return { + 'running': self.running, + 'pending_count': len([o for o in self.queue if o['status'] == OperationStatus.PENDING]), + 'current_operation': self.current_operation + } + + def get_pending_operations(self) -> List[Dict]: + """获取待执行的操作列表""" + return [o for o in self.queue if o['status'] == OperationStatus.PENDING] diff --git a/smart_monitor.py b/smart_monitor.py new file mode 100644 index 0000000..ee35bc6 --- /dev/null +++ b/smart_monitor.py @@ -0,0 +1,290 @@ +# smart_monitor.py +"""SMART 磁盘健康监控模块""" + +import subprocess +import logging +import re +import json +from typing import Dict, List, Optional, Tuple +from dataclasses import dataclass +from enum import Enum + +logger = logging.getLogger(__name__) + + +class HealthStatus(Enum): + """磁盘健康状态""" + GOOD = "良好" + WARNING = "警告" + DANGER = "危险" + UNKNOWN = "未知" + + +@dataclass +class SmartAttribute: + """SMART 属性""" + id: int + name: str + value: int + worst: int + threshold: int + raw_value: str + status: str + + +@dataclass +class SmartInfo: + """SMART 信息""" + device: str + model: str + serial: str + firmware: str + health_status: HealthStatus + temperature: Optional[int] = None + power_on_hours: Optional[int] = None + power_cycle_count: Optional[int] = None + attributes: List[SmartAttribute] = None + overall_health: str = "" + errors: List[str] = None + + def __post_init__(self): + if self.attributes is None: + self.attributes = [] + if self.errors is None: + self.errors = [] + + +class SmartMonitor: + """SMART 监控类""" + + # 关键属性 ID + CRITICAL_ATTRIBUTES = { + 5: "Reallocated_Sector_Ct", # 重映射扇区数 + 10: "Spin_Retry_Count", # 旋转重试次数 + 184: "End-to-End_Error", # 端到端错误 + 187: "Reported_Uncorrect", # 报告不可纠正错误 + 188: "Command_Timeout", # 命令超时 + 196: "Reallocation_Event_Count", # 重映射事件计数 + 197: "Current_Pending_Sector", # 当前待处理扇区 + 198: "Offline_Uncorrectable", # 离线不可纠正 + } + + def __init__(self): + self._check_smartctl() + + def _check_smartctl(self) -> bool: + """检查 smartctl 是否可用""" + try: + result = subprocess.run( + ["which", "smartctl"], + capture_output=True, + text=True, + check=False + ) + return result.returncode == 0 + except Exception: + return False + + def is_available(self) -> bool: + """检查 SMART 监控是否可用""" + return self._check_smartctl() + + def get_disk_smart_info(self, device_path: str) -> Optional[SmartInfo]: + """获取指定设备的 SMART 信息""" + if not self.is_available(): + logger.warning("smartctl 不可用,无法获取 SMART 信息") + return None + + try: + # 获取基本信息 + result = subprocess.run( + ["sudo", "smartctl", "-a", device_path], + capture_output=True, + text=True, + check=False, + timeout=30 + ) + + if result.returncode not in [0, 4, 8]: # 4=磁盘处于故障状态, 8=无法获取部分信息 + logger.error(f"获取 SMART 信息失败: {result.stderr}") + return None + + return self._parse_smart_output(device_path, result.stdout) + + except subprocess.TimeoutExpired: + logger.error(f"获取 {device_path} SMART 信息超时") + return None + except Exception as e: + logger.error(f"获取 {device_path} SMART 信息出错: {e}") + return None + + def _parse_smart_output(self, device: str, output: str) -> SmartInfo: + """解析 smartctl 输出""" + info = SmartInfo( + device=device, + model="", + serial="", + firmware="", + health_status=HealthStatus.UNKNOWN + ) + + lines = output.split('\n') + in_attributes = False + + for line in lines: + line = line.strip() + + # 解析设备信息 + if line.startswith("Device Model:"): + info.model = line.split(":", 1)[1].strip() + elif line.startswith("Serial Number:"): + info.serial = line.split(":", 1)[1].strip() + elif line.startswith("Firmware Version:"): + info.firmware = line.split(":", 1)[1].strip() + + # 解析整体健康状态 + elif "SMART overall-health self-assessment test result:" in line: + result = line.split(":")[-1].strip().upper() + info.overall_health = result + if result == "PASSED": + info.health_status = HealthStatus.GOOD + else: + info.health_status = HealthStatus.DANGER + + # 解析温度 + elif "Temperature:" in line or "Current Drive Temperature:" in line: + match = re.search(r'(\d+)\s*(?:Celsius|°C|C)', line) + if match: + info.temperature = int(match.group(1)) + + # 解析通电时间 + elif line.startswith("Power_On_Hours"): + match = re.search(r'(\d+)\s*\(', line) + if match: + info.power_on_hours = int(match.group(1)) + + # 解析通电次数 + elif line.startswith("Power_Cycle_Count"): + match = re.search(r'(\d+)\s*\(', line) + if match: + info.power_cycle_count = int(match.group(1)) + + # 解析 SMART 属性表 + elif "Vendor Specific SMART Attributes" in line: + in_attributes = True + continue + + if in_attributes and line.startswith("0x"): + attr = self._parse_attribute_line(line) + if attr: + info.attributes.append(attr) + # 检查关键属性 + if attr.id in self.CRITICAL_ATTRIBUTES: + if attr.raw_value != "0" and int(attr.raw_value) > 0: + if info.health_status != HealthStatus.DANGER: + info.health_status = HealthStatus.WARNING + info.errors.append( + f"{attr.name}: {attr.raw_value}" + ) + + # 如果没有明确的整体健康状态,根据属性判断 + if info.health_status == HealthStatus.UNKNOWN: + if info.errors: + info.health_status = HealthStatus.WARNING + else: + info.health_status = HealthStatus.GOOD + + return info + + def _parse_attribute_line(self, line: str) -> Optional[SmartAttribute]: + """解析 SMART 属性行""" + # 格式: 0x05 0x64 0x64 0x64 0x0000 0x0000 0x0000 000 + parts = line.split() + if len(parts) < 10: + return None + + try: + attr_id = int(parts[0], 16) + name = parts[1] + value = int(parts[3]) + worst = int(parts[4]) + threshold = int(parts[5]) + raw_value = parts[9] + status = "正常" if value > threshold else "预警" + + return SmartAttribute( + id=attr_id, + name=name, + value=value, + worst=worst, + threshold=threshold, + raw_value=raw_value, + status=status + ) + except (ValueError, IndexError): + return None + + def get_all_disks_smart(self) -> Dict[str, SmartInfo]: + """获取所有支持 SMART 的磁盘信息""" + results = {} + + try: + # 获取所有块设备 + result = subprocess.run( + ["lsblk", "-d", "-n", "-o", "NAME,TYPE,ROTA"], + capture_output=True, + text=True, + check=True + ) + + for line in result.stdout.strip().split('\n'): + parts = line.split() + if len(parts) >= 3: + name = parts[0] + device_type = parts[1] + is_rotational = parts[2] == "1" + + # 只对物理磁盘检查 SMART + if device_type in ["disk", "rom"] and is_rotational: + device_path = f"/dev/{name}" + smart_info = self.get_disk_smart_info(device_path) + if smart_info: + results[device_path] = smart_info + + except Exception as e: + logger.error(f"获取磁盘列表失败: {e}") + + return results + + def get_health_score(self, smart_info: SmartInfo) -> int: + """计算健康评分 (0-100)""" + if not smart_info: + return 0 + + score = 100 + + # 根据整体健康状态调整 + if smart_info.health_status == HealthStatus.DANGER: + score = 30 + elif smart_info.health_status == HealthStatus.WARNING: + score = 70 + + # 根据错误数量调整 + error_count = len(smart_info.errors) + score -= error_count * 10 + + # 确保分数在合理范围内 + return max(0, min(100, score)) + + def get_temperature_status(self, temp: Optional[int]) -> Tuple[str, str]: + """获取温度状态""" + if temp is None: + return "未知", "gray" + elif temp < 35: + return "正常", "green" + elif temp < 45: + return "偏高", "orange" + elif temp < 55: + return "高温", "red" + else: + return "危险", "red"