This commit is contained in:
zj
2026-02-09 17:59:50 +08:00
parent 1a3a4746a3
commit 0112e4d3b1
11 changed files with 3223 additions and 13 deletions

308
build.py Normal file
View File

@@ -0,0 +1,308 @@
#!/usr/bin/env python3
"""
Linux 存储管理器 - PyInstaller 打包脚本
自动安装依赖并打包应用程序
"""
import subprocess
import sys
import os
import shutil
from pathlib import Path
def run_command(cmd, description, check=True):
"""运行命令并输出结果"""
print(f"\n{'='*60}")
print(f" {description}")
print(f" 命令: {' '.join(cmd)}")
print(f"{'='*60}")
try:
result = subprocess.run(cmd, check=check, capture_output=False, text=True)
print(f"{description} 完成")
return result.returncode == 0
except subprocess.CalledProcessError as e:
print(f"{description} 失败")
print(f" 错误: {e}")
return False
def check_python():
"""检查 Python 版本"""
print(f"\n{'='*60}")
print(" 检查 Python 版本")
print(f"{'='*60}")
version = sys.version_info
print(f" Python 版本: {version.major}.{version.minor}.{version.micro}")
if version.major < 3 or (version.major == 3 and version.minor < 6):
print("✗ Python 版本过低,需要 Python 3.6+")
return False
print("✓ Python 版本符合要求")
return True
def is_externally_managed():
"""检测是否为 PEP 668 外部管理环境"""
# 检查是否在虚拟环境中
if hasattr(sys, 'real_prefix') or (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix):
return False
if os.environ.get('VIRTUAL_ENV'):
return False
# 检查 EXTERNALLY-MANAGED 文件
try:
import sysconfig
stdlib_path = sysconfig.get_path('stdlib')
if os.path.exists(os.path.join(stdlib_path, 'EXTERNALLY-MANAGED')):
return True
except:
pass
return False
def install_dependencies():
"""安装依赖包"""
print(f"\n{'='*60}")
print(" 安装依赖包")
print(f"{'='*60}")
dependencies = [
"pyinstaller",
"pexpect",
]
# 尝试使用 pip3 或 pip
pip_cmd = None
for pip in ["pip3", "pip"]:
if shutil.which(pip):
pip_cmd = pip
break
if not pip_cmd:
print("✗ 未找到 pip请手动安装 pip")
return False
# 检测外部管理环境
is_managed = is_externally_managed()
if is_managed:
print(" 检测到外部管理环境 (PEP 668),使用 --break-system-packages 选项")
print(f" 使用 {pip_cmd} 安装依赖...")
# 构建安装参数
extra_args = ["--break-system-packages"] if is_managed else []
# 升级 pip 本身
run_command([pip_cmd, "install", "--upgrade", "pip"] + extra_args, "升级 pip", check=False)
# 安装依赖
all_success = True
for dep in dependencies:
if not run_command([pip_cmd, "install", dep] + extra_args, f"安装 {dep}"):
print(f" 警告: {dep} 安装失败,尝试使用 sudo...")
if not run_command(["sudo", pip_cmd, "install", dep] + extra_args, f"使用 sudo 安装 {dep}", check=False):
print(f"{dep} 安装失败")
all_success = False
if not all_success:
print(f"\n{'='*60}")
print(" 依赖安装失败")
print(f"{'='*60}")
print(" 您可以选择以下方式安装:")
print("")
print(" 方式 1 - 创建虚拟环境 (推荐):")
print(" python3 -m venv venv")
print(" source venv/bin/activate")
print(" pip install pyinstaller pexpect")
print(" python3 build.py")
print("")
print(" 方式 2 - 使用系统包管理器 (Arch):")
print(" sudo pacman -S python-pyinstaller python-pexpect")
print(" python3 build.py")
print("")
print(" 方式 3 - 强制安装 (有风险):")
print(" pip3 install pyinstaller pexpect --break-system-packages")
print(" python3 build.py")
print("")
return False
return True
def clean_build():
"""清理之前的构建文件"""
print(f"\n{'='*60}")
print(" 清理构建文件")
print(f"{'='*60}")
dirs_to_remove = ["build", "dist", "__pycache__"]
files_to_remove = ["*.spec"]
removed = []
for d in dirs_to_remove:
if os.path.exists(d):
try:
shutil.rmtree(d)
removed.append(d)
except Exception as e:
print(f" 警告: 无法删除 {d}: {e}")
# 保留用户自定义的 spec 文件,只删除自动生成的
for f in os.listdir("."):
if f.startswith("mainwindow") and f.endswith(".spec") and f != "disk-manager.spec":
try:
os.remove(f)
removed.append(f)
except Exception as e:
print(f" 警告: 无法删除 {f}: {e}")
if removed:
print(f" 已清理: {', '.join(removed)}")
else:
print(" 没有需要清理的文件")
return True
def build_app():
"""使用 PyInstaller 打包应用程序"""
print(f"\n{'='*60}")
print(" 开始打包应用程序")
print(f"{'='*60}")
# 确定要打包的主文件
# 默认使用 mainwindow_tkinter.py (Tkinter 版本)
main_script = "mainwindow_tkinter.py"
if not os.path.exists(main_script):
print(f"✗ 未找到 {main_script}")
main_script = "mainwindow.py"
if not os.path.exists(main_script):
print(f"✗ 也未找到 {main_script}")
return False
print(f" 使用替代文件: {main_script}")
# 构建命令
output_name = "linux-storage-manager"
cmd = [
sys.executable, "-m", "PyInstaller",
"--onefile", # 单文件模式
"--name", output_name, # 输出文件名
"--clean", # 清理临时文件
"--noconfirm", # 不询问确认
"--console", # 显示控制台 (用于调试)
main_script
]
if not run_command(cmd, "PyInstaller 打包"):
return False
# 检查输出
output_path = Path("dist") / output_name
if output_path.exists():
size = output_path.stat().st_size / (1024 * 1024) # MB
print(f"\n{'='*60}")
print(" 打包成功!")
print(f"{'='*60}")
print(f" 输出文件: {output_path}")
print(f" 文件大小: {size:.2f} MB")
print(f"\n 使用方法:")
print(f" sudo ./{output_path}")
return True
else:
print(f"✗ 未找到输出文件: {output_path}")
return False
def create_desktop_entry():
"""创建桌面快捷方式文件"""
print(f"\n{'='*60}")
print(" 创建桌面快捷方式文件")
print(f"{'='*60}")
desktop_content = """[Desktop Entry]
Name=Linux 存储管理器
Comment=Linux 存储管理工具
Exec=/usr/local/bin/linux-storage-manager
Icon=drive-harddisk
Terminal=false
Type=Application
Categories=System;Utility;
Keywords=storage;disk;partition;raid;lvm;
"""
desktop_file = "linux-storage-manager.desktop"
try:
with open(desktop_file, "w") as f:
f.write(desktop_content)
print(f" 已创建: {desktop_file}")
print(f" 安装方法: sudo cp {desktop_file} /usr/share/applications/")
return True
except Exception as e:
print(f" 警告: 无法创建桌面文件: {e}")
return False
def main():
"""主函数"""
print("""
╔══════════════════════════════════════════════════════════════╗
║ Linux 存储管理器 - PyInstaller 打包工具 ║
╚══════════════════════════════════════════════════════════════╝
""")
# 检查是否在正确的目录
if not os.path.exists("mainwindow_tkinter.py") and not os.path.exists("mainwindow.py"):
print("✗ 错误: 未找到主程序文件")
print(" 请确保在包含 mainwindow_tkinter.py 或 mainwindow.py 的目录中运行此脚本")
sys.exit(1)
# 执行步骤
steps = [
("检查 Python 版本", check_python),
("安装依赖包", install_dependencies),
("清理构建文件", clean_build),
("打包应用程序", build_app),
("创建桌面文件", create_desktop_entry),
]
results = []
for name, func in steps:
try:
result = func()
results.append((name, result))
except Exception as e:
print(f"\n{name} 出错: {e}")
results.append((name, False))
# 打印总结
print(f"\n{'='*60}")
print(" 打包总结")
print(f"{'='*60}")
for name, result in results:
status = "✓ 成功" if result else "✗ 失败"
print(f" {status} - {name}")
# 检查最终结果
if all(r[1] for r in results):
print(f"\n{'='*60}")
print(" 所有步骤完成!")
print(f"{'='*60}")
print(" 可执行文件位置: dist/linux-storage-manager")
print(" 运行命令: sudo dist/linux-storage-manager")
sys.exit(0)
else:
print(f"\n{'='*60}")
print(" 打包过程中有步骤失败,请检查错误信息")
print(f"{'='*60}")
sys.exit(1)
if __name__ == "__main__":
main()

164
build.sh Normal file
View File

@@ -0,0 +1,164 @@
#!/bin/bash
#
# Linux 存储管理器 - PyInstaller 打包脚本
# 自动安装依赖并打包应用程序
#
set -e
echo ""
echo "╔══════════════════════════════════════════════════════════════╗"
echo "║ Linux 存储管理器 - PyInstaller 打包工具 ║"
echo "╚══════════════════════════════════════════════════════════════╝"
echo ""
# 检查 Python3
if ! command -v python3 &> /dev/null; then
echo "✗ 错误: 未找到 python3"
echo " 请先安装 Python 3.6+"
exit 1
fi
PYTHON_VERSION=$(python3 -c 'import sys; print(f"{sys.version_info.major}.{sys.version_info.minor}")')
echo " Python 版本: $PYTHON_VERSION"
# 检查主程序文件
if [ ! -f "mainwindow_tkinter.py" ] && [ ! -f "mainwindow.py" ]; then
echo "✗ 错误: 未找到主程序文件"
echo " 请确保在包含 mainwindow_tkinter.py 或 mainwindow.py 的目录中运行此脚本"
exit 1
fi
echo ""
echo "═══════════════════════════════════════════════════════════════"
echo " 步骤 1/4: 安装依赖包"
echo "═══════════════════════════════════════════════════════════════"
# 检测是否为外部管理环境 (PEP 668)
IS_EXTERNALLY_MANAGED=0
if python3 -c "import sys; sys.exit(0 if (hasattr(sys, 'base_prefix') and sys.base_prefix != sys.prefix) or 'VIRTUAL_ENV' in __import__('os').environ else 1)" 2>/dev/null; then
echo " 检测到在虚拟环境中"
IS_EXTERNALLY_MANAGED=0
elif python3 -c "import sysconfig; import os; print(os.path.exists(os.path.join(sysconfig.get_path('stdlib'), 'EXTERNALLY-MANAGED')))" | grep -q "True"; then
echo " 检测到外部管理环境 (PEP 668)"
IS_EXTERNALLY_MANAGED=1
fi
# 安装依赖
install_deps() {
local EXTRA_ARGS=""
if [ "$IS_EXTERNALLY_MANAGED" -eq 1 ]; then
EXTRA_ARGS="--break-system-packages"
echo " 使用 --break-system-packages 选项"
fi
# 安装/升级 pip
python3 -m pip install --upgrade pip $EXTRA_ARGS 2>/dev/null || {
echo " pip 升级失败,继续安装..."
}
# 安装 pyinstaller 和 pexpect
pip3 install pyinstaller pexpect $EXTRA_ARGS || {
return 1
}
return 0
}
if ! install_deps; then
echo " 尝试使用 sudo 安装依赖..."
sudo pip3 install pyinstaller pexpect --break-system-packages 2>/dev/null || {
echo ""
echo "✗ 依赖安装失败"
echo ""
echo " 您可以选择以下方式安装:"
echo ""
echo " 方式 1 - 创建虚拟环境 (推荐):"
echo " python3 -m venv venv"
echo " source venv/bin/activate"
echo " pip install pyinstaller pexpect"
echo " ./build.sh"
echo ""
echo " 方式 2 - 使用系统包管理器 (Arch):"
echo " sudo pacman -S python-pyinstaller python-pexpect"
echo " ./build.sh"
echo ""
echo " 方式 3 - 强制安装 (有风险):"
echo " pip3 install pyinstaller pexpect --break-system-packages"
echo " ./build.sh"
echo ""
exit 1
}
fi
echo ""
echo "═══════════════════════════════════════════════════════════════"
echo " 步骤 2/4: 清理构建文件"
echo "═══════════════════════════════════════════════════════════════"
# 清理旧文件
rm -rf build dist __pycache__
rm -f mainwindow*.spec
echo " 已清理构建文件"
echo ""
echo "═══════════════════════════════════════════════════════════════"
echo " 步骤 3/4: PyInstaller 打包"
echo "═══════════════════════════════════════════════════════════════"
# 确定主文件
MAIN_SCRIPT="mainwindow_tkinter.py"
if [ ! -f "$MAIN_SCRIPT" ]; then
MAIN_SCRIPT="mainwindow.py"
fi
echo " 打包文件: $MAIN_SCRIPT"
# 执行打包
python3 -m PyInstaller \
--onefile \
--name "linux-storage-manager" \
--clean \
--noconfirm \
--console \
"$MAIN_SCRIPT"
echo ""
echo "═══════════════════════════════════════════════════════════════"
echo " 步骤 4/4: 创建桌面快捷方式"
echo "═══════════════════════════════════════════════════════════════"
cat > linux-storage-manager.desktop << 'EOF'
[Desktop Entry]
Name=Linux 存储管理器
Comment=Linux 存储管理工具
Exec=/usr/local/bin/linux-storage-manager
Icon=drive-harddisk
Terminal=false
Type=Application
Categories=System;Utility;
Keywords=storage;disk;partition;raid;lvm;
EOF
echo " 已创建桌面文件: linux-storage-manager.desktop"
echo " 安装命令: sudo cp linux-storage-manager.desktop /usr/share/applications/"
# 检查输出
if [ -f "dist/linux-storage-manager" ]; then
SIZE=$(du -h dist/linux-storage-manager | cut -f1)
echo ""
echo "╔══════════════════════════════════════════════════════════════╗"
echo "║ 打包成功! ║"
echo "╚══════════════════════════════════════════════════════════════╝"
echo ""
echo " 输出文件: dist/linux-storage-manager"
echo " 文件大小: $SIZE"
echo ""
echo " 使用方法:"
echo " 直接运行: sudo dist/linux-storage-manager"
echo " 安装到系统: sudo cp dist/linux-storage-manager /usr/local/bin/"
echo ""
else
echo "✗ 打包失败: 未找到输出文件"
exit 1
fi

521
dialogs_enhanced.py Normal file
View File

@@ -0,0 +1,521 @@
# dialogs_enhanced.py
"""增强型对话框 - 智能提示与确认"""
import tkinter as tk
from tkinter import ttk, messagebox
import logging
logger = logging.getLogger(__name__)
class EnhancedFormatDialog:
"""增强格式化对话框 - 显示详细警告信息"""
def __init__(self, parent, device_path, device_info):
self.parent = parent
self.device_path = device_path
self.device_info = device_info
self.result = None
self.dialog = tk.Toplevel(parent)
self.dialog.title(f"格式化确认 - {device_path}")
self.dialog.geometry("500x575")
self.dialog.minsize(480, 450)
self.dialog.transient(parent)
self.dialog.grab_set()
self._center_window()
self._create_widgets()
def _center_window(self):
self.dialog.update_idletasks()
width = self.dialog.winfo_width()
height = self.dialog.winfo_height()
x = (self.dialog.winfo_screenwidth() // 2) - (width // 2)
y = (self.dialog.winfo_screenheight() // 2) - (height // 2)
self.dialog.geometry(f"{width}x{height}+{x}+{y}")
def _create_widgets(self):
main_frame = ttk.Frame(self.dialog, padding="15")
main_frame.pack(fill=tk.BOTH, expand=True)
# 警告标题
warning_frame = ttk.Frame(main_frame)
warning_frame.pack(fill=tk.X, pady=(0, 15))
warning_label = ttk.Label(
warning_frame,
text="⚠ 警告:此操作将永久删除所有数据!",
font=("Arial", 12, "bold"),
foreground="red"
)
warning_label.pack()
# 设备信息区域
info_frame = ttk.LabelFrame(main_frame, text="设备信息", padding="10")
info_frame.pack(fill=tk.X, pady=(0, 15))
# 显示详细信息
infos = [
("设备路径", self.device_path),
("设备名称", self.device_info.get('name', 'N/A')),
("文件系统", self.device_info.get('fstype', 'N/A')),
("大小", self.device_info.get('size', 'N/A')),
("UUID", self.device_info.get('uuid', 'N/A')),
("挂载点", self.device_info.get('mountpoint', '未挂载')),
]
for i, (label, value) in enumerate(infos):
ttk.Label(info_frame, text=f"{label}:", font=("Arial", 10, "bold")).grid(
row=i, column=0, sticky=tk.W, padx=5, pady=3
)
val_label = ttk.Label(info_frame, text=str(value))
val_label.grid(row=i, column=1, sticky=tk.W, padx=5, pady=3)
# 如果有挂载点,高亮显示
if label == "挂载点" and value and value != '未挂载':
val_label.configure(foreground="orange", font=("Arial", 9, "bold"))
# 数据风险提示
risk_frame = ttk.LabelFrame(main_frame, text="数据风险提示", padding="10")
risk_frame.pack(fill=tk.X, pady=(0, 15))
risks = []
if self.device_info.get('fstype'):
risks.append(f"• 此分区包含 {self.device_info['fstype']} 文件系统")
if self.device_info.get('uuid'):
risks.append(f"• 分区有 UUID: {self.device_info['uuid'][:20]}...")
if self.device_info.get('mountpoint'):
risks.append(f"• 分区当前挂载在: {self.device_info['mountpoint']}")
if not risks:
risks.append("• 无法获取分区详细信息")
for risk in risks:
ttk.Label(risk_frame, text=risk, foreground="red").pack(
anchor=tk.W, pady=2
)
# 文件系统选择
fs_frame = ttk.LabelFrame(main_frame, text="选择新文件系统", padding="10")
fs_frame.pack(fill=tk.X, pady=(0, 15))
self.fs_var = tk.StringVar(value="ext4")
fs_options = ["ext4", "xfs", "fat32", "ntfs"]
for fs in fs_options:
ttk.Radiobutton(fs_frame, text=fs, variable=self.fs_var,
value=fs).pack(side=tk.LEFT, padx=10)
# 确认输入
confirm_frame = ttk.Frame(main_frame)
confirm_frame.pack(fill=tk.X, pady=(0, 15))
ttk.Label(confirm_frame,
text=f"请输入设备名 '{self.device_info.get('name', '确认')}' 以继续:",
foreground="red").pack(anchor=tk.W, pady=5)
self.confirm_var = tk.StringVar()
self.confirm_entry = ttk.Entry(confirm_frame, textvariable=self.confirm_var, width=20)
self.confirm_entry.pack(anchor=tk.W, pady=5)
self.confirm_entry.focus()
# 按钮
btn_frame = ttk.Frame(main_frame)
btn_frame.pack(fill=tk.X, pady=10)
ttk.Button(btn_frame, text="取消",
command=self._on_cancel).pack(side=tk.RIGHT, padx=5)
ttk.Button(btn_frame, text="确认格式化",
command=self._on_confirm).pack(side=tk.RIGHT, padx=5)
def _on_confirm(self):
expected = self.device_info.get('name', '确认')
if self.confirm_var.get() != expected:
messagebox.showerror("错误",
f"输入错误!请输入 '{expected}' 以确认格式化操作。")
return
self.result = self.fs_var.get()
self.dialog.destroy()
def _on_cancel(self):
self.dialog.destroy()
def wait_for_result(self):
self.dialog.wait_window()
return self.result
class EnhancedRaidDeleteDialog:
"""增强 RAID 删除对话框 - 显示阵列详情"""
def __init__(self, parent, array_path, array_data, member_devices):
self.parent = parent
self.array_path = array_path
self.array_data = array_data
self.member_devices = member_devices
self.result = False
self.dialog = tk.Toplevel(parent)
self.dialog.title(f"删除 RAID 阵列确认 - {array_path}")
self.dialog.geometry("550x750")
self.dialog.minsize(520, 750)
self.dialog.transient(parent)
self.dialog.grab_set()
self._center_window()
self._create_widgets()
def _center_window(self):
self.dialog.update_idletasks()
width = self.dialog.winfo_width()
height = self.dialog.winfo_height()
x = (self.dialog.winfo_screenwidth() // 2) - (width // 2)
y = (self.dialog.winfo_screenheight() // 2) - (height // 2)
self.dialog.geometry(f"{width}x{height}+{x}+{y}")
def _create_widgets(self):
main_frame = ttk.Frame(self.dialog, padding="15")
main_frame.pack(fill=tk.BOTH, expand=True)
# 危险警告
warning_frame = ttk.Frame(main_frame)
warning_frame.pack(fill=tk.X, pady=(0, 15))
warning_label = ttk.Label(
warning_frame,
text="⚠ 危险操作:此操作将永久删除 RAID 阵列!",
font=("Arial", 13, "bold"),
foreground="red"
)
warning_label.pack()
# 阵列详情
array_frame = ttk.LabelFrame(main_frame, text="阵列详情", padding="10")
array_frame.pack(fill=tk.X, pady=(0, 15))
details = [
("阵列设备", self.array_path),
("RAID 级别", self.array_data.get('level', 'N/A')),
("阵列状态", self.array_data.get('state', 'N/A')),
("阵列大小", self.array_data.get('array_size', 'N/A')),
("UUID", self.array_data.get('uuid', 'N/A')),
("Chunk 大小", self.array_data.get('chunk_size', 'N/A')),
]
for i, (label, value) in enumerate(details):
ttk.Label(array_frame, text=f"{label}:",
font=("Arial", 10, "bold")).grid(
row=i, column=0, sticky=tk.W, padx=5, pady=3
)
ttk.Label(array_frame, text=str(value)).grid(
row=i, column=1, sticky=tk.W, padx=5, pady=3
)
# 成员设备列表
member_frame = ttk.LabelFrame(main_frame, text="成员设备(将被清除 RAID 超级块)",
padding="10")
member_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 15))
if self.member_devices:
for i, dev in enumerate(self.member_devices):
if dev:
ttk.Label(member_frame, text=f"{dev}",
font=("Arial", 10), foreground="red").pack(
anchor=tk.W, pady=2
)
else:
ttk.Label(member_frame, text=" 无法获取成员设备列表",
foreground="orange").pack(anchor=tk.W, pady=5)
# 后果警告
consequence_frame = ttk.LabelFrame(main_frame, text="操作后果", padding="10")
consequence_frame.pack(fill=tk.X, pady=(0, 15))
consequences = [
"• 阵列将被停止并从系统中移除",
"• 所有成员设备的 RAID 超级块将被清除",
"• 阵列上的所有数据将永久丢失",
"• /etc/mdadm.conf 中的配置将被删除",
"• 此操作无法撤销!"
]
for con in consequences:
ttk.Label(consequence_frame, text=con,
foreground="red").pack(anchor=tk.W, pady=2)
# 二次确认
confirm_frame = ttk.Frame(main_frame)
confirm_frame.pack(fill=tk.X, pady=(0, 10))
ttk.Label(confirm_frame,
text="请输入 'DELETE' 以确认删除操作:",
foreground="red", font=("Arial", 10, "bold")).pack(
anchor=tk.W, pady=5)
self.confirm_var = tk.StringVar()
self.confirm_entry = ttk.Entry(confirm_frame, textvariable=self.confirm_var, width=15)
self.confirm_entry.pack(anchor=tk.W, pady=5)
self.confirm_entry.focus()
# 按钮
btn_frame = ttk.Frame(main_frame)
btn_frame.pack(fill=tk.X, pady=10)
ttk.Button(btn_frame, text="取消",
command=self._on_cancel).pack(side=tk.RIGHT, padx=5)
ttk.Button(btn_frame, text="确认删除",
command=self._on_confirm,
style="Danger.TButton").pack(side=tk.RIGHT, padx=5)
# 配置危险按钮样式
style = ttk.Style()
style.configure("Danger.TButton", foreground="red")
def _on_confirm(self):
if self.confirm_var.get() != "DELETE":
messagebox.showerror("错误",
"输入错误!请输入 'DELETE'(全大写)以确认删除操作。")
return
self.result = True
self.dialog.destroy()
def _on_cancel(self):
self.dialog.destroy()
def wait_for_result(self):
self.dialog.wait_window()
return self.result
class EnhancedPartitionDialog:
"""增强分区创建对话框 - 可视化滑块选择大小"""
def __init__(self, parent, disk_path, total_disk_mib, max_available_mib):
self.parent = parent
self.disk_path = disk_path
self.total_disk_mib = total_disk_mib
self.max_available_mib = max_available_mib
self.result = None
self.dialog = tk.Toplevel(parent)
self.dialog.title(f"创建分区 - {disk_path}")
self.dialog.geometry("550x575")
self.dialog.minsize(500, 575)
self.dialog.transient(parent)
self.dialog.grab_set()
self._center_window()
self._create_widgets()
def _center_window(self):
self.dialog.update_idletasks()
width = self.dialog.winfo_width()
height = self.dialog.winfo_height()
x = (self.dialog.winfo_screenwidth() // 2) - (width // 2)
y = (self.dialog.winfo_screenheight() // 2) - (height // 2)
self.dialog.geometry(f"{width}x{height}+{x}+{y}")
def _create_widgets(self):
main_frame = ttk.Frame(self.dialog, padding="15")
main_frame.pack(fill=tk.BOTH, expand=True)
# 磁盘信息
info_frame = ttk.LabelFrame(main_frame, text="磁盘信息", padding="10")
info_frame.pack(fill=tk.X, pady=(0, 15))
total_gb = self.total_disk_mib / 1024
max_gb = self.max_available_mib / 1024
ttk.Label(info_frame, text=f"磁盘: {self.disk_path}").pack(anchor=tk.W, pady=2)
ttk.Label(info_frame, text=f"总容量: {total_gb:.2f} GB").pack(anchor=tk.W, pady=2)
ttk.Label(info_frame, text=f"可用空间: {max_gb:.2f} GB",
foreground="green").pack(anchor=tk.W, pady=2)
# 分区表类型
pt_frame = ttk.LabelFrame(main_frame, text="分区表类型", padding="10")
pt_frame.pack(fill=tk.X, pady=(0, 15))
self.part_type_var = tk.StringVar(value="gpt")
ttk.Radiobutton(pt_frame, text="GPT (推荐,支持大容量磁盘)",
variable=self.part_type_var, value="gpt").pack(anchor=tk.W, pady=2)
ttk.Radiobutton(pt_frame, text="MBR/MSDOS (兼容旧系统)",
variable=self.part_type_var, value="msdos").pack(anchor=tk.W, pady=2)
# 大小选择
size_frame = ttk.LabelFrame(main_frame, text="分区大小", padding="10")
size_frame.pack(fill=tk.X, pady=(0, 15))
# 使用最大空间复选框
self.use_max_var = tk.BooleanVar(value=True)
ttk.Checkbutton(size_frame, text="使用最大可用空间",
variable=self.use_max_var,
command=self._toggle_size_input).pack(anchor=tk.W, pady=5)
# 滑块和输入框容器
self.size_control_frame = ttk.Frame(size_frame)
self.size_control_frame.pack(fill=tk.X, pady=5)
# 滑块
self.size_var = tk.DoubleVar(value=max_gb)
self.size_slider = ttk.Scale(
self.size_control_frame,
from_=0.1,
to=max_gb,
orient=tk.HORIZONTAL,
variable=self.size_var,
command=self._on_slider_change
)
self.size_slider.pack(fill=tk.X, pady=5)
# 输入框和标签
input_frame = ttk.Frame(self.size_control_frame)
input_frame.pack(fill=tk.X, pady=5)
ttk.Label(input_frame, text="大小:").pack(side=tk.LEFT, padx=5)
self.size_spin = tk.Spinbox(
input_frame,
from_=0.1,
to=max_gb,
increment=0.1,
textvariable=self.size_var,
width=10,
command=self._on_spin_change
)
self.size_spin.pack(side=tk.LEFT, padx=5)
ttk.Label(input_frame, text="GB").pack(side=tk.LEFT, padx=5)
# 显示百分比
self.percent_label = ttk.Label(input_frame, text="(100%)")
self.percent_label.pack(side=tk.LEFT, padx=10)
# 可视化空间显示
self._create_space_visualizer(main_frame, max_gb)
# 初始状态
self._toggle_size_input()
# 按钮
btn_frame = ttk.Frame(main_frame)
btn_frame.pack(fill=tk.X, pady=10)
ttk.Button(btn_frame, text="取消",
command=self._on_cancel).pack(side=tk.RIGHT, padx=5)
ttk.Button(btn_frame, text="创建分区",
command=self._on_confirm).pack(side=tk.RIGHT, padx=5)
def _create_space_visualizer(self, parent, max_gb):
"""创建空间可视化条"""
viz_frame = ttk.LabelFrame(parent, text="空间使用预览", padding="10")
viz_frame.pack(fill=tk.X, pady=(0, 15))
# Canvas 用于绘制
self.viz_canvas = tk.Canvas(viz_frame, height=40, bg="white")
self.viz_canvas.pack(fill=tk.X, pady=5)
# 等待 Canvas 渲染完成后再更新
self.viz_canvas.update_idletasks()
# 绘制初始状态
self._update_visualizer(max_gb, max_gb)
def _update_visualizer(self, current_gb, max_gb):
"""更新空间可视化"""
self.viz_canvas.delete("all")
width = self.viz_canvas.winfo_width() or 400
height = self.viz_canvas.winfo_height() or 40
# 计算比例
if max_gb > 0:
ratio = current_gb / max_gb
else:
ratio = 0
fill_width = int(width * ratio)
# 绘制背景
self.viz_canvas.create_rectangle(0, 0, width, height, fill="lightgray", outline="")
# 绘制已用空间
if fill_width > 0:
color = "green" if ratio < 0.8 else "orange" if ratio < 0.95 else "red"
self.viz_canvas.create_rectangle(0, 0, fill_width, height, fill=color, outline="")
# 绘制文字
self.viz_canvas.create_text(
width // 2, height // 2,
text=f"{current_gb:.2f} GB / {max_gb:.2f} GB",
font=("Arial", 10, "bold")
)
def _on_slider_change(self, value):
"""滑块变化回调"""
try:
gb = float(value)
self._update_percent(gb)
self._update_visualizer(gb, self.max_available_mib / 1024)
except:
pass
def _on_spin_change(self):
"""输入框变化回调"""
try:
gb = self.size_var.get()
self._update_percent(gb)
self._update_visualizer(gb, self.max_available_mib / 1024)
except:
pass
def _update_percent(self, gb):
"""更新百分比显示"""
max_gb = self.max_available_mib / 1024
if max_gb > 0:
percent = (gb / max_gb) * 100
self.percent_label.configure(text=f"({percent:.1f}%)")
def _toggle_size_input(self):
"""切换大小输入状态"""
if self.use_max_var.get():
self.size_slider.configure(state=tk.DISABLED)
self.size_spin.configure(state=tk.DISABLED)
max_gb = self.max_available_mib / 1024
self.size_var.set(max_gb)
self._update_percent(max_gb)
self._update_visualizer(max_gb, max_gb)
else:
self.size_slider.configure(state=tk.NORMAL)
self.size_spin.configure(state=tk.NORMAL)
def _on_confirm(self):
size_gb = self.size_var.get()
use_max_space = self.use_max_var.get()
if not use_max_space and size_gb <= 0:
messagebox.showwarning("输入错误", "分区大小必须大于0。")
return
max_gb = self.max_available_mib / 1024
if not use_max_space and size_gb > max_gb:
messagebox.showwarning("输入错误", "分区大小不能超过最大可用空间。")
return
self.result = {
'disk_path': self.disk_path,
'partition_table_type': self.part_type_var.get(),
'size_gb': size_gb,
'total_disk_mib': self.total_disk_mib,
'use_max_space': use_max_space
}
self.dialog.destroy()
def _on_cancel(self):
self.dialog.destroy()
def wait_for_result(self):
self.dialog.wait_window()
return self.result

531
dialogs_history.py Normal file
View File

@@ -0,0 +1,531 @@
# dialogs_history.py
"""操作历史对话框"""
import tkinter as tk
from tkinter import ttk, messagebox, filedialog
from datetime import datetime
from operation_history import OperationHistory, PartitionTableBackup, OperationStatus
class OperationHistoryDialog:
"""操作历史对话框"""
def __init__(self, parent):
self.parent = parent
self.history = OperationHistory()
self.dialog = tk.Toplevel(parent)
self.dialog.title("操作历史")
self.dialog.geometry("900x600")
self.dialog.transient(parent)
self._center_window()
self._create_widgets()
self._load_history()
def _center_window(self):
self.dialog.update_idletasks()
width = self.dialog.winfo_width()
height = self.dialog.winfo_height()
x = (self.dialog.winfo_screenwidth() // 2) - (width // 2)
y = (self.dialog.winfo_screenheight() // 2) - (height // 2)
self.dialog.geometry(f"{width}x{height}+{x}+{y}")
def _create_widgets(self):
main_frame = ttk.Frame(self.dialog, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# 标题
title_label = ttk.Label(
main_frame,
text="操作历史记录",
font=("Arial", 14, "bold")
)
title_label.pack(pady=(0, 10))
# 工具栏
toolbar = ttk.Frame(main_frame)
toolbar.pack(fill=tk.X, pady=(0, 10))
ttk.Button(toolbar, text="刷新", command=self._load_history).pack(side=tk.LEFT, padx=5)
ttk.Button(toolbar, text="导出", command=self._export_history).pack(side=tk.LEFT, padx=5)
ttk.Button(toolbar, text="清理旧记录", command=self._clear_old).pack(side=tk.LEFT, padx=5)
ttk.Button(toolbar, text="清空所有", command=self._clear_all).pack(side=tk.LEFT, padx=5)
# 历史记录列表
list_frame = ttk.Frame(main_frame)
list_frame.pack(fill=tk.BOTH, expand=True)
columns = ("时间", "操作类型", "设备", "状态", "详情", "耗时")
self.tree = ttk.Treeview(list_frame, columns=columns, show="headings")
for col in columns:
self.tree.heading(col, text=col)
self.tree.column("时间", width=150)
self.tree.column("操作类型", width=120)
self.tree.column("设备", width=100)
self.tree.column("状态", width=80)
self.tree.column("详情", width=300)
self.tree.column("耗时", width=80)
# 滚动条
scrollbar_y = ttk.Scrollbar(list_frame, orient=tk.VERTICAL, command=self.tree.yview)
scrollbar_x = ttk.Scrollbar(list_frame, orient=tk.HORIZONTAL, command=self.tree.xview)
self.tree.configure(yscrollcommand=scrollbar_y.set, xscrollcommand=scrollbar_x.set)
self.tree.grid(row=0, column=0, sticky="nsew")
scrollbar_y.grid(row=0, column=1, sticky="ns")
scrollbar_x.grid(row=1, column=0, sticky="ew")
list_frame.grid_rowconfigure(0, weight=1)
list_frame.grid_columnconfigure(0, weight=1)
# 颜色标签
self.tree.tag_configure("success", foreground="green")
self.tree.tag_configure("failed", foreground="red")
self.tree.tag_configure("cancelled", foreground="gray")
self.tree.tag_configure("pending", foreground="orange")
# 详情区域
detail_frame = ttk.LabelFrame(main_frame, text="操作详情", padding="10")
detail_frame.pack(fill=tk.X, pady=(10, 0))
self.detail_text = tk.Text(detail_frame, height=6, wrap=tk.WORD)
self.detail_text.pack(fill=tk.BOTH, expand=True)
# 绑定选择事件
self.tree.bind("<<TreeviewSelect>>", self._on_select)
# 关闭按钮
ttk.Button(main_frame, text="关闭", command=self.dialog.destroy).pack(pady=10)
def _load_history(self):
"""加载历史记录"""
# 清空现有数据
for item in self.tree.get_children():
self.tree.delete(item)
records = self.history.get_recent_records(limit=100)
for record in records:
# 格式化时间
try:
dt = datetime.fromisoformat(record.timestamp)
time_str = dt.strftime("%Y-%m-%d %H:%M:%S")
except:
time_str = record.timestamp
# 格式化耗时
time_cost = ""
if record.execution_time:
if record.execution_time < 1:
time_cost = f"{record.execution_time*1000:.0f}ms"
else:
time_cost = f"{record.execution_time:.1f}s"
# 格式化详情
details = str(record.details)[:50] + "..." if len(str(record.details)) > 50 else str(record.details)
values = (
time_str,
record.operation_type,
record.device_path,
record.status,
details,
time_cost
)
# 根据状态设置标签
tag = ""
status_lower = record.status.lower()
if "成功" in status_lower or "success" in status_lower:
tag = "success"
elif "失败" in status_lower or "failed" in status_lower:
tag = "failed"
elif "取消" in status_lower or "cancelled" in status_lower:
tag = "cancelled"
elif "" in status_lower or "pending" in status_lower:
tag = "pending"
self.tree.insert("", tk.END, values=values, tags=(tag,))
def _on_select(self, event):
"""选择记录时显示详情"""
selection = self.tree.selection()
if not selection:
return
item = selection[0]
index = self.tree.index(item)
records = self.history.get_recent_records(limit=100)
if index < len(records):
record = records[index]
detail_text = f"操作ID: {record.id}\n"
detail_text += f"时间: {record.timestamp}\n"
detail_text += f"类型: {record.operation_type}\n"
detail_text += f"设备: {record.device_path}\n"
detail_text += f"状态: {record.status}\n"
if record.execution_time:
detail_text += f"耗时: {record.execution_time:.2f}\n"
detail_text += f"\n详情:\n{record.details}\n"
if record.error_message:
detail_text += f"\n错误信息:\n{record.error_message}\n"
if record.rollback_data:
detail_text += f"\n可撤销数据:\n{record.rollback_data}\n"
self.detail_text.delete(1.0, tk.END)
self.detail_text.insert(1.0, detail_text)
def _export_history(self):
"""导出历史记录"""
file_path = filedialog.asksaveasfilename(
defaultextension=".json",
filetypes=[("JSON files", "*.json"), ("All files", "*.*")],
title="导出操作历史"
)
if file_path:
if self.history.export_history(file_path):
messagebox.showinfo("成功", f"历史记录已导出到:\n{file_path}")
else:
messagebox.showerror("错误", "导出失败")
def _clear_old(self):
"""清理旧记录"""
result = messagebox.askyesno(
"确认清理",
"是否清理 30 天之前的操作记录?"
)
if result:
self.history.clear_history(days=30)
self._load_history()
messagebox.showinfo("成功", "旧记录已清理")
def _clear_all(self):
"""清空所有记录"""
result = messagebox.askyesno(
"确认清空",
"是否清空所有操作记录?\n此操作不可恢复!",
icon=messagebox.WARNING
)
if result:
self.history.clear_history()
self._load_history()
messagebox.showinfo("成功", "所有记录已清空")
class PartitionBackupDialog:
"""分区表备份管理对话框"""
def __init__(self, parent):
self.parent = parent
self.backup_manager = PartitionTableBackup()
self.dialog = tk.Toplevel(parent)
self.dialog.title("分区表备份管理")
self.dialog.geometry("800x500")
self.dialog.transient(parent)
self._center_window()
self._create_widgets()
self._load_backups()
def _center_window(self):
self.dialog.update_idletasks()
width = self.dialog.winfo_width()
height = self.dialog.winfo_height()
x = (self.dialog.winfo_screenwidth() // 2) - (width // 2)
y = (self.dialog.winfo_screenheight() // 2) - (height // 2)
self.dialog.geometry(f"{width}x{height}+{x}+{y}")
def _create_widgets(self):
main_frame = ttk.Frame(self.dialog, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# 标题
title_label = ttk.Label(
main_frame,
text="分区表备份管理",
font=("Arial", 14, "bold")
)
title_label.pack(pady=(0, 10))
# 工具栏
toolbar = ttk.Frame(main_frame)
toolbar.pack(fill=tk.X, pady=(0, 10))
ttk.Button(toolbar, text="刷新", command=self._load_backups).pack(side=tk.LEFT, padx=5)
ttk.Button(toolbar, text="删除选中", command=self._delete_selected).pack(side=tk.LEFT, padx=5)
# 备份列表
list_frame = ttk.Frame(main_frame)
list_frame.pack(fill=tk.BOTH, expand=True)
columns = ("设备", "备份时间", "文件大小", "文件路径")
self.tree = ttk.Treeview(list_frame, columns=columns, show="headings")
for col in columns:
self.tree.heading(col, text=col)
self.tree.column("设备", width=100)
self.tree.column("备份时间", width=150)
self.tree.column("文件大小", width=100)
self.tree.column("文件路径", width=400)
scrollbar = ttk.Scrollbar(list_frame, orient=tk.VERTICAL, command=self.tree.yview)
self.tree.configure(yscrollcommand=scrollbar.set)
self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 详情和操作按钮
detail_frame = ttk.LabelFrame(main_frame, text="操作", padding="10")
detail_frame.pack(fill=tk.X, pady=(10, 0))
ttk.Label(detail_frame, text="选中备份文件后可执行以下操作:").pack(anchor=tk.W)
btn_frame = ttk.Frame(detail_frame)
btn_frame.pack(fill=tk.X, pady=5)
ttk.Button(btn_frame, text="查看内容", command=self._view_backup).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="恢复此备份", command=self._restore_backup).pack(side=tk.LEFT, padx=5)
# 关闭按钮
ttk.Button(main_frame, text="关闭", command=self.dialog.destroy).pack(pady=10)
def _load_backups(self):
"""加载备份列表"""
for item in self.tree.get_children():
self.tree.delete(item)
backups = self.backup_manager.list_backups()
for backup in backups:
size_str = self._format_size(backup['size'])
values = (
backup['device'],
backup['created'],
size_str,
backup['file']
)
self.tree.insert("", tk.END, values=values)
def _format_size(self, size_bytes):
"""格式化文件大小"""
if size_bytes < 1024:
return f"{size_bytes} B"
elif size_bytes < 1024 * 1024:
return f"{size_bytes / 1024:.1f} KB"
else:
return f"{size_bytes / (1024 * 1024):.1f} MB"
def _get_selected_backup(self):
"""获取选中的备份"""
selection = self.tree.selection()
if not selection:
messagebox.showwarning("警告", "请先选择一个备份")
return None
item = selection[0]
file_path = self.tree.item(item, "values")[3]
return file_path
def _view_backup(self):
"""查看备份内容"""
file_path = self._get_selected_backup()
if not file_path:
return
try:
with open(file_path, 'r') as f:
content = f.read()
# 显示内容对话框
content_dialog = tk.Toplevel(self.dialog)
content_dialog.title(f"备份内容 - {file_path}")
content_dialog.geometry("600x400")
text = tk.Text(content_dialog, wrap=tk.WORD, padx=10, pady=10)
text.pack(fill=tk.BOTH, expand=True)
text.insert(1.0, content)
text.config(state=tk.DISABLED)
ttk.Button(content_dialog, text="关闭", command=content_dialog.destroy).pack(pady=10)
except Exception as e:
messagebox.showerror("错误", f"无法读取备份文件:\n{e}")
def _restore_backup(self):
"""恢复备份"""
file_path = self._get_selected_backup()
if not file_path:
return
# 解析设备名
filename = os.path.basename(file_path)
device_name = filename.rsplit('_pt_', 1)[0]
device_path = f"/dev/{device_name}"
result = messagebox.askyesno(
"确认恢复",
f"是否将备份恢复到 {device_path}\n\n"
f"警告:这将覆盖当前的分区表!\n"
f"请确保您了解此操作的后果。",
icon=messagebox.WARNING
)
if result:
if self.backup_manager.restore_partition_table(device_path, file_path):
messagebox.showinfo("成功", f"分区表已恢复到 {device_path}")
else:
messagebox.showerror("错误", "恢复失败")
def _delete_selected(self):
"""删除选中的备份"""
file_path = self._get_selected_backup()
if not file_path:
return
result = messagebox.askyesno(
"确认删除",
f"是否删除备份文件?\n{file_path}"
)
if result:
if self.backup_manager.delete_backup(file_path):
self._load_backups()
messagebox.showinfo("成功", "备份已删除")
else:
messagebox.showerror("错误", "删除失败")
class OperationQueueDialog:
"""操作队列对话框"""
def __init__(self, parent, operation_queue):
self.parent = parent
self.queue = operation_queue
self.dialog = tk.Toplevel(parent)
self.dialog.title("操作队列")
self.dialog.geometry("600x400")
self.dialog.transient(parent)
self._center_window()
self._create_widgets()
self._refresh_status()
def _center_window(self):
self.dialog.update_idletasks()
width = self.dialog.winfo_width()
height = self.dialog.winfo_height()
x = (self.dialog.winfo_screenwidth() // 2) - (width // 2)
y = (self.dialog.winfo_screenheight() // 2) - (height // 2)
self.dialog.geometry(f"{width}x{height}+{x}+{y}")
def _create_widgets(self):
main_frame = ttk.Frame(self.dialog, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# 标题
title_label = ttk.Label(
main_frame,
text="待执行操作队列",
font=("Arial", 14, "bold")
)
title_label.pack(pady=(0, 10))
# 状态显示
self.status_label = ttk.Label(main_frame, text="队列状态: 空闲")
self.status_label.pack(anchor=tk.W, pady=(0, 10))
# 操作列表
list_frame = ttk.Frame(main_frame)
list_frame.pack(fill=tk.BOTH, expand=True)
columns = ("类型", "设备", "状态")
self.tree = ttk.Treeview(list_frame, columns=columns, show="headings")
for col in columns:
self.tree.heading(col, text=col)
self.tree.column(col, width=150)
scrollbar = ttk.Scrollbar(list_frame, orient=tk.VERTICAL, command=self.tree.yview)
self.tree.configure(yscrollcommand=scrollbar.set)
self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 按钮
btn_frame = ttk.Frame(main_frame)
btn_frame.pack(fill=tk.X, pady=10)
ttk.Button(btn_frame, text="开始执行", command=self._start_execution).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="取消当前", command=self._cancel_current).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="清空队列", command=self._clear_queue).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="刷新", command=self._refresh_status).pack(side=tk.LEFT, padx=5)
ttk.Button(btn_frame, text="关闭", command=self.dialog.destroy).pack(side=tk.RIGHT, padx=5)
# 注册回调
self.queue.register_callback('on_start', self._on_operation_start)
self.queue.register_callback('on_complete', self._on_operation_complete)
self.queue.register_callback('on_error', self._on_operation_error)
def _refresh_status(self):
"""刷新队列状态"""
status = self.queue.get_queue_status()
if status['running']:
self.status_label.configure(text=f"队列状态: 执行中 ({status['pending_count']} 个待执行)")
else:
self.status_label.configure(text=f"队列状态: 空闲 ({status['pending_count']} 个待执行)")
# 刷新列表
for item in self.tree.get_children():
self.tree.delete(item)
pending = self.queue.get_pending_operations()
for op in pending:
values = (
op['type'].value if hasattr(op['type'], 'value') else str(op['type']),
op['device_path'],
op['status'].value if hasattr(op['status'], 'value') else str(op['status'])
)
self.tree.insert("", tk.END, values=values)
def _start_execution(self):
"""开始执行队列"""
self.queue.start_execution()
self._refresh_status()
def _cancel_current(self):
"""取消当前操作"""
self.queue.cancel_current()
self._refresh_status()
def _clear_queue(self):
"""清空队列"""
result = messagebox.askyesno("确认", "是否清空所有待执行操作?")
if result:
self.queue.clear_queue()
self._refresh_status()
def _on_operation_start(self, operation):
"""操作开始回调"""
self._refresh_status()
def _on_operation_complete(self, operation, success):
"""操作完成回调"""
self._refresh_status()
def _on_operation_error(self, operation, error):
"""操作错误回调"""
self._refresh_status()
messagebox.showerror("操作错误", f"操作失败:\n{error}")

386
dialogs_smart.py Normal file
View File

@@ -0,0 +1,386 @@
# dialogs_smart.py
"""SMART 监控相关对话框"""
import tkinter as tk
from tkinter import ttk, messagebox
from smart_monitor import SmartMonitor, HealthStatus
class SmartInfoDialog:
"""SMART 信息对话框"""
def __init__(self, parent, device_path, smart_info):
self.parent = parent
self.device_path = device_path
self.smart_info = smart_info
# 创建对话框
self.dialog = tk.Toplevel(parent)
self.dialog.title(f"SMART 信息 - {device_path}")
self.dialog.geometry("700x600")
self.dialog.transient(parent)
self.dialog.grab_set()
self._center_window()
self._create_widgets()
def _center_window(self):
"""居中显示"""
self.dialog.update_idletasks()
width = self.dialog.winfo_width()
height = self.dialog.winfo_height()
x = (self.dialog.winfo_screenwidth() // 2) - (width // 2)
y = (self.dialog.winfo_screenheight() // 2) - (height // 2)
self.dialog.geometry(f"{width}x{height}+{x}+{y}")
def _create_widgets(self):
"""创建界面"""
main_frame = ttk.Frame(self.dialog, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# 基本信息区域
info_frame = ttk.LabelFrame(main_frame, text="基本信息", padding="10")
info_frame.pack(fill=tk.X, pady=(0, 10))
# 健康状态(大图标)
status_frame = ttk.Frame(info_frame)
status_frame.pack(fill=tk.X, pady=5)
status_colors = {
HealthStatus.GOOD: ("良好", "green"),
HealthStatus.WARNING: ("警告", "orange"),
HealthStatus.DANGER: ("危险", "red"),
HealthStatus.UNKNOWN: ("未知", "gray"),
}
status_text, status_color = status_colors.get(
self.smart_info.health_status, ("未知", "gray")
)
status_label = ttk.Label(
status_frame,
text=f"健康状态: {status_text}",
font=("Arial", 16, "bold"),
foreground=status_color
)
status_label.pack(side=tk.LEFT, padx=10)
# 健康评分
monitor = SmartMonitor()
score = monitor.get_health_score(self.smart_info)
score_label = ttk.Label(
status_frame,
text=f"健康评分: {score}/100",
font=("Arial", 14)
)
score_label.pack(side=tk.LEFT, padx=20)
# 设备详细信息
details_grid = ttk.Frame(info_frame)
details_grid.pack(fill=tk.X, pady=5)
details = [
("设备", self.smart_info.device),
("型号", self.smart_info.model or "N/A"),
("序列号", self.smart_info.serial or "N/A"),
("固件版本", self.smart_info.firmware or "N/A"),
("整体健康", self.smart_info.overall_health or "N/A"),
]
for i, (label, value) in enumerate(details):
ttk.Label(details_grid, text=f"{label}:", font=("Arial", 10, "bold")).grid(
row=i, column=0, sticky=tk.W, padx=5, pady=2
)
ttk.Label(details_grid, text=value).grid(
row=i, column=1, sticky=tk.W, padx=5, pady=2
)
# 使用统计
usage_frame = ttk.LabelFrame(main_frame, text="使用统计", padding="10")
usage_frame.pack(fill=tk.X, pady=(0, 10))
# 温度
temp = self.smart_info.temperature
if temp:
temp_status, temp_color = monitor.get_temperature_status(temp)
temp_text = f"{temp}°C ({temp_status})"
else:
temp_text = "N/A"
usage_info = [
("温度", temp_text),
("通电时间", f"{self.smart_info.power_on_hours} 小时" if self.smart_info.power_on_hours else "N/A"),
("通电次数", f"{self.smart_info.power_cycle_count}" if self.smart_info.power_cycle_count else "N/A"),
]
for i, (label, value) in enumerate(usage_info):
ttk.Label(usage_frame, text=f"{label}:", font=("Arial", 10, "bold")).grid(
row=i, column=0, sticky=tk.W, padx=5, pady=2
)
temp_label = ttk.Label(usage_frame, text=value)
if label == "温度" and temp and temp > 45:
temp_label.configure(foreground="red")
temp_label.grid(row=i, column=1, sticky=tk.W, padx=5, pady=2)
# SMART 属性表
attr_frame = ttk.LabelFrame(main_frame, text="SMART 属性", padding="10")
attr_frame.pack(fill=tk.BOTH, expand=True, pady=(0, 10))
# 创建 Treeview
columns = ("ID", "名称", "当前值", "最差值", "阈值", "原始值", "状态")
tree = ttk.Treeview(attr_frame, columns=columns, show="headings", height=10)
for col in columns:
tree.heading(col, text=col)
tree.column(col, width=80)
tree.column("名称", width=150)
tree.column("原始值", width=100)
# 添加滚动条
scrollbar = ttk.Scrollbar(attr_frame, orient=tk.VERTICAL, command=tree.yview)
tree.configure(yscrollcommand=scrollbar.set)
tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 填充数据
for attr in self.smart_info.attributes:
values = (
attr.id,
attr.name,
attr.value,
attr.worst,
attr.threshold,
attr.raw_value,
attr.status
)
# 根据状态设置颜色
tag = ""
if attr.status == "预警":
tag = "warning"
elif attr.id in monitor.CRITICAL_ATTRIBUTES and attr.raw_value != "0":
tag = "danger"
tree.insert("", tk.END, values=values, tags=(tag,))
# 设置标签颜色
tree.tag_configure("warning", foreground="orange")
tree.tag_configure("danger", foreground="red")
# 错误信息
if self.smart_info.errors:
error_frame = ttk.LabelFrame(main_frame, text="警告/错误", padding="10")
error_frame.pack(fill=tk.X, pady=(0, 10))
for error in self.smart_info.errors:
ttk.Label(error_frame, text=f"{error}", foreground="red").pack(
anchor=tk.W, padx=5, pady=2
)
# 关闭按钮
btn_frame = ttk.Frame(main_frame)
btn_frame.pack(fill=tk.X, pady=10)
ttk.Button(btn_frame, text="关闭", command=self.dialog.destroy).pack(side=tk.RIGHT, padx=5)
ttk.Button(
btn_frame,
text="刷新",
command=self._refresh
).pack(side=tk.RIGHT, padx=5)
def _refresh(self):
"""刷新 SMART 信息"""
monitor = SmartMonitor()
new_info = monitor.get_disk_smart_info(self.device_path)
if new_info:
self.smart_info = new_info
# 重建界面
for widget in self.dialog.winfo_children():
widget.destroy()
self._create_widgets()
else:
messagebox.showerror("错误", "刷新 SMART 信息失败")
class SmartOverviewDialog:
"""SMART 总览对话框 - 显示所有磁盘的健康状态"""
def __init__(self, parent):
self.parent = parent
self.monitor = SmartMonitor()
self.dialog = tk.Toplevel(parent)
self.dialog.title("SMART 健康监控 - 总览")
self.dialog.geometry("800x500")
self.dialog.transient(parent)
self.dialog.grab_set()
self._center_window()
self._create_widgets()
self._refresh_data()
def _center_window(self):
"""居中显示"""
self.dialog.update_idletasks()
width = self.dialog.winfo_width()
height = self.dialog.winfo_height()
x = (self.dialog.winfo_screenwidth() // 2) - (width // 2)
y = (self.dialog.winfo_screenheight() // 2) - (height // 2)
self.dialog.geometry(f"{width}x{height}+{x}+{y}")
def _create_widgets(self):
"""创建界面"""
main_frame = ttk.Frame(self.dialog, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# 标题
title_label = ttk.Label(
main_frame,
text="磁盘 SMART 健康状态总览",
font=("Arial", 14, "bold")
)
title_label.pack(pady=(0, 10))
# 状态统计
self.stats_frame = ttk.Frame(main_frame)
self.stats_frame.pack(fill=tk.X, pady=(0, 10))
# 磁盘列表
list_frame = ttk.LabelFrame(main_frame, text="磁盘列表", padding="10")
list_frame.pack(fill=tk.BOTH, expand=True)
columns = ("设备", "型号", "健康状态", "评分", "温度", "通电时间", "详情")
self.tree = ttk.Treeview(list_frame, columns=columns, show="headings")
for col in columns:
self.tree.heading(col, text=col)
self.tree.column(col, width=100)
self.tree.column("设备", width=80)
self.tree.column("型号", width=200)
self.tree.column("详情", width=60)
# 滚动条
scrollbar = ttk.Scrollbar(list_frame, orient=tk.VERTICAL, command=self.tree.yview)
self.tree.configure(yscrollcommand=scrollbar.set)
self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 绑定双击事件
self.tree.bind("<Double-1>", self._on_double_click)
# 存储 item 数据的字典
self._item_data = {}
# 按钮
btn_frame = ttk.Frame(main_frame)
btn_frame.pack(fill=tk.X, pady=10)
ttk.Button(btn_frame, text="关闭", command=self.dialog.destroy).pack(side=tk.RIGHT, padx=5)
ttk.Button(btn_frame, text="刷新", command=self._refresh_data).pack(side=tk.RIGHT, padx=5)
# 状态栏
self.status_label = ttk.Label(main_frame, text="准备就绪")
self.status_label.pack(anchor=tk.W)
def _refresh_data(self):
"""刷新数据"""
self.status_label.configure(text="正在获取 SMART 信息...")
self.dialog.update()
# 清空现有数据
for item in self.tree.get_children():
self.tree.delete(item)
# 获取所有磁盘 SMART 信息
smart_data = self.monitor.get_all_disks_smart()
# 统计
good_count = 0
warning_count = 0
danger_count = 0
for device_path, info in smart_data.items():
score = self.monitor.get_health_score(info)
# 状态文本和颜色
status_colors = {
HealthStatus.GOOD: ("良好", "green"),
HealthStatus.WARNING: ("警告", "orange"),
HealthStatus.DANGER: ("危险", "red"),
HealthStatus.UNKNOWN: ("未知", "gray"),
}
status_text, _ = status_colors.get(info.health_status, ("未知", "gray"))
# 统计
if info.health_status == HealthStatus.GOOD:
good_count += 1
elif info.health_status == HealthStatus.WARNING:
warning_count += 1
elif info.health_status == HealthStatus.DANGER:
danger_count += 1
# 温度
temp_text = f"{info.temperature}°C" if info.temperature else "N/A"
# 通电时间
hours_text = f"{info.power_on_hours}h" if info.power_on_hours else "N/A"
values = (
device_path,
info.model[:30] if info.model else "N/A",
status_text,
f"{score}/100",
temp_text,
hours_text,
"查看详情"
)
tag = ""
if info.health_status == HealthStatus.WARNING:
tag = "warning"
elif info.health_status == HealthStatus.DANGER:
tag = "danger"
item_id = self.tree.insert("", tk.END, values=values, tags=(tag,))
self._item_data[item_id] = {"device": device_path, "info": info}
# 设置颜色
self.tree.tag_configure("warning", foreground="orange")
self.tree.tag_configure("danger", foreground="red")
# 更新统计
for widget in self.stats_frame.winfo_children():
widget.destroy()
stats = [
("总磁盘", len(smart_data), "black"),
("良好", good_count, "green"),
("警告", warning_count, "orange"),
("危险", danger_count, "red"),
]
for label, count, color in stats:
lbl = ttk.Label(
self.stats_frame,
text=f"{label}: {count}",
font=("Arial", 11, "bold"),
foreground=color
)
lbl.pack(side=tk.LEFT, padx=10)
self.status_label.configure(text=f"已更新 {len(smart_data)} 个磁盘的信息")
def _on_double_click(self, event):
"""双击查看详情"""
item = self.tree.selection()[0]
data = self._item_data.get(item)
if data:
# 暂时释放 grab以便打开子对话框
self.dialog.grab_release()
try:
SmartInfoDialog(self.dialog, data["device"], data["info"])
finally:
# 恢复 grab
self.dialog.grab_set()

250
dialogs_undo.py Normal file
View File

@@ -0,0 +1,250 @@
# dialogs_undo.py
"""撤销功能对话框"""
import tkinter as tk
from tkinter import ttk, messagebox
from operation_history import OperationHistory, OperationStatus, OperationType
from dialogs_history import PartitionBackupDialog
class UndoDialog:
"""撤销操作对话框"""
def __init__(self, parent, history, partition_backup):
self.parent = parent
self.history = history
self.partition_backup = partition_backup
self.result = False
self.dialog = tk.Toplevel(parent)
self.dialog.title("撤销操作")
self.dialog.geometry("700x500")
self.dialog.transient(parent)
self._center_window()
self._create_widgets()
self._load_undoable_operations()
def _center_window(self):
self.dialog.update_idletasks()
width = self.dialog.winfo_width()
height = self.dialog.winfo_height()
x = (self.dialog.winfo_screenwidth() // 2) - (width // 2)
y = (self.dialog.winfo_screenheight() // 2) - (height // 2)
self.dialog.geometry(f"{width}x{height}+{x}+{y}")
def _create_widgets(self):
main_frame = ttk.Frame(self.dialog, padding="15")
main_frame.pack(fill=tk.BOTH, expand=True)
# 标题
title_label = ttk.Label(
main_frame,
text="可撤销的操作",
font=("Arial", 14, "bold")
)
title_label.pack(pady=(0, 10))
# 说明
info_label = ttk.Label(
main_frame,
text="以下操作可以通过分区表备份进行回滚。选择操作查看详情并撤销。",
wraplength=600
)
info_label.pack(pady=(0, 10))
# 操作列表
list_frame = ttk.Frame(main_frame)
list_frame.pack(fill=tk.BOTH, expand=True)
columns = ("时间", "操作类型", "设备", "状态")
self.tree = ttk.Treeview(list_frame, columns=columns, show="headings")
for col in columns:
self.tree.heading(col, text=col)
self.tree.column("时间", width=150)
self.tree.column("操作类型", width=150)
self.tree.column("设备", width=150)
self.tree.column("状态", width=100)
scrollbar = ttk.Scrollbar(list_frame, orient=tk.VERTICAL, command=self.tree.yview)
self.tree.configure(yscrollcommand=scrollbar.set)
self.tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
# 绑定选择事件
self.tree.bind("<<TreeviewSelect>>", self._on_select)
# 详情区域
detail_frame = ttk.LabelFrame(main_frame, text="操作详情与撤销选项", padding="10")
detail_frame.pack(fill=tk.X, pady=(10, 0))
self.detail_text = tk.Text(detail_frame, height=8, wrap=tk.WORD)
self.detail_text.pack(fill=tk.BOTH, expand=True)
self.detail_text.config(state=tk.DISABLED)
# 按钮
btn_frame = ttk.Frame(main_frame)
btn_frame.pack(fill=tk.X, pady=10)
ttk.Button(btn_frame, text="关闭", command=self.dialog.destroy).pack(side=tk.RIGHT, padx=5)
ttk.Button(btn_frame, text="刷新", command=self._load_undoable_operations).pack(side=tk.RIGHT, padx=5)
self.undo_btn = ttk.Button(btn_frame, text="撤销选中操作", command=self._undo_selected, state=tk.DISABLED)
self.undo_btn.pack(side=tk.RIGHT, padx=5)
self.selected_record = None
def _load_undoable_operations(self):
"""加载可撤销的操作"""
# 清空列表
for item in self.tree.get_children():
self.tree.delete(item)
# 获取所有成功完成的操作
records = self.history.get_recent_records(limit=50)
self.undoable_records = []
for record in records:
# 检查是否有回滚数据
if record.rollback_data and record.status == OperationStatus.SUCCESS.value:
# 检查是否是可撤销的操作类型
if record.operation_type in [
OperationType.CREATE_PARTITION.value,
OperationType.DELETE_PARTITION.value,
OperationType.WIPE_PARTITION_TABLE.value
]:
self.undoable_records.append(record)
# 格式化时间
try:
from datetime import datetime
dt = datetime.fromisoformat(record.timestamp)
time_str = dt.strftime("%Y-%m-%d %H:%M:%S")
except:
time_str = record.timestamp
values = (
time_str,
record.operation_type,
record.device_path,
record.status
)
self.tree.insert("", tk.END, values=values)
def _on_select(self, event):
"""选择操作"""
selection = self.tree.selection()
if not selection:
self.undo_btn.config(state=tk.DISABLED)
return
index = self.tree.index(selection[0])
if index < len(self.undoable_records):
self.selected_record = self.undoable_records[index]
self._show_details()
self.undo_btn.config(state=tk.NORMAL)
else:
self.selected_record = None
self.undo_btn.config(state=tk.DISABLED)
def _show_details(self):
"""显示详情"""
if not self.selected_record:
return
self.detail_text.config(state=tk.NORMAL)
self.detail_text.delete(1.0, tk.END)
record = self.selected_record
text = f"操作ID: {record.id}\n"
text += f"时间: {record.timestamp}\n"
text += f"类型: {record.operation_type}\n"
text += f"设备: {record.device_path}\n\n"
text += "操作详情:\n"
text += f"{record.details}\n\n"
if record.rollback_data:
text += "可撤销数据:\n"
if 'partition_table_backup' in record.rollback_data:
backup_file = record.rollback_data['partition_table_backup']
text += f" 分区表备份: {backup_file}\n"
if backup_file:
import os
if os.path.exists(backup_file):
text += " 状态: 备份文件存在,可以恢复\n"
else:
text += " 状态: 备份文件不存在!无法恢复\n"
self.undo_btn.config(state=tk.DISABLED)
else:
text += f"{record.rollback_data}\n"
self.detail_text.insert(1.0, text)
self.detail_text.config(state=tk.DISABLED)
def _undo_selected(self):
"""撤销选中的操作"""
if not self.selected_record:
return
record = self.selected_record
# 确认对话框
result = messagebox.askyesno(
"确认撤销",
f"确定要撤销以下操作吗?\n\n"
f"操作类型: {record.operation_type}\n"
f"设备: {record.device_path}\n"
f"时间: {record.timestamp}\n\n"
f"警告:这将恢复分区表到操作前的状态!\n"
f"此操作可能导致数据丢失!",
icon=messagebox.WARNING
)
if not result:
return
# 检查是否有备份
if not record.rollback_data or 'partition_table_backup' not in record.rollback_data:
messagebox.showerror("错误", "没有找到可恢复的备份数据")
return
backup_file = record.rollback_data['partition_table_backup']
if not backup_file:
messagebox.showerror("错误", "备份文件路径为空")
return
import os
if not os.path.exists(backup_file):
messagebox.showerror("错误", f"备份文件不存在:\n{backup_file}")
return
# 执行恢复
device_path = record.device_path
# 对于分区操作,需要获取父磁盘
if record.operation_type == OperationType.DELETE_PARTITION.value:
import re
match = re.match(r'(/dev/[a-z]+)\d+', device_path)
if match:
device_path = match.group(1)
if self.partition_backup.restore_partition_table(device_path, backup_file):
# 更新操作状态
self.history.update_status(
record.id,
OperationStatus.ROLLED_BACK
)
messagebox.showinfo("成功", f"操作已成功撤销!\n分区表已恢复到 {device_path}")
self.result = True
self.dialog.destroy()
else:
messagebox.showerror("错误", "撤销操作失败!\n请检查日志获取详细信息。")
def wait_for_result(self):
self.dialog.wait_window()
return self.result

View File

@@ -0,0 +1,9 @@
[Desktop Entry]
Name=Linux 存储管理器
Comment=Linux 存储管理工具
Exec=/usr/local/bin/linux-storage-manager
Icon=drive-harddisk
Terminal=false
Type=Application
Categories=System;Utility;
Keywords=storage;disk;partition;raid;lvm;

View File

@@ -0,0 +1,38 @@
# -*- mode: python ; coding: utf-8 -*-
a = Analysis(
['mainwindow_tkinter.py'],
pathex=[],
binaries=[],
datas=[],
hiddenimports=[],
hookspath=[],
hooksconfig={},
runtime_hooks=[],
excludes=[],
noarchive=False,
optimize=0,
)
pyz = PYZ(a.pure)
exe = EXE(
pyz,
a.scripts,
a.binaries,
a.datas,
[],
name='linux-storage-manager',
debug=False,
bootloader_ignore_signals=False,
strip=False,
upx=True,
upx_exclude=[],
runtime_tmpdir=None,
console=True,
disable_windowed_traceback=False,
argv_emulation=False,
target_arch=None,
codesign_identity=None,
entitlements_file=None,
)

View File

@@ -22,6 +22,11 @@ from occupation_resolver_tkinter import OccupationResolver
# 导入自定义对话框
from dialogs_tkinter import (CreatePartitionDialog, MountDialog, CreateRaidDialog,
CreatePvDialog, CreateVgDialog, CreateLvDialog)
# 导入 SMART 对话框
from dialogs_smart import SmartInfoDialog, SmartOverviewDialog
from smart_monitor import SmartMonitor
# 导入增强对话框
from dialogs_enhanced import EnhancedFormatDialog, EnhancedRaidDeleteDialog, EnhancedPartitionDialog
class MainWindow:
@@ -71,9 +76,20 @@ class MainWindow:
def _create_widgets(self):
"""创建界面元素"""
# 顶部按钮栏
btn_frame = ttk.Frame(self.main_frame)
btn_frame.pack(fill=tk.X, pady=(0, 5))
# 刷新按钮
self.refresh_btn = ttk.Button(self.main_frame, text="刷新数据", command=self.refresh_all_info)
self.refresh_btn.pack(fill=tk.X, pady=(0, 5))
self.refresh_btn = ttk.Button(btn_frame, text="刷新数据", command=self.refresh_all_info)
self.refresh_btn.pack(side=tk.LEFT, padx=(0, 5))
# SMART 监控按钮
self.smart_btn = ttk.Button(btn_frame, text="SMART 监控", command=self._show_smart_overview)
self.smart_btn.pack(side=tk.LEFT, padx=(0, 5))
# 菜单栏
self._create_menu()
# Notebook (Tab 控件)
self.notebook = ttk.Notebook(self.main_frame)
@@ -107,6 +123,38 @@ class MainWindow:
self.log_text = ScrolledText(log_frame, height=8, state=tk.DISABLED)
self.log_text.pack(fill=tk.BOTH, expand=True)
def _create_menu(self):
"""创建菜单栏"""
menubar = tk.Menu(self.root)
self.root.config(menu=menubar)
# 工具菜单
tools_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="工具", menu=tools_menu)
tools_menu.add_command(label="SMART 监控", command=self._show_smart_overview)
tools_menu.add_separator()
tools_menu.add_command(label="刷新数据", command=self.refresh_all_info)
# 帮助菜单
help_menu = tk.Menu(menubar, tearoff=0)
menubar.add_cascade(label="帮助", menu=help_menu)
help_menu.add_command(label="关于", command=self._show_about)
def _show_about(self):
"""显示关于对话框"""
messagebox.showinfo(
"关于",
"Linux 存储管理工具\n\n"
"版本: 2.0 (Tkinter 版)\n"
"适配低版本系统 (CentOS 8+)\n\n"
"功能:\n"
"- 块设备管理\n"
"- RAID 管理\n"
"- LVM 管理\n"
"- SMART 监控\n\n"
"使用 Python 3.6+ 和 Tkinter 构建"
)
def _create_block_devices_tree(self):
"""创建块设备 Treeview"""
# 创建框架
@@ -359,6 +407,10 @@ class MainWindow:
menu.add_command(label=f"擦除分区表 {device_path}...",
command=lambda: self._handle_wipe_partition_table(device_path))
menu.add_separator()
# SMART 信息
menu.add_command(label=f"查看 SMART 信息 {device_path}",
command=lambda: self._handle_show_smart(device_path))
menu.add_separator()
# 分区操作
if device_type == 'part':
@@ -373,7 +425,24 @@ class MainWindow:
menu.add_command(label=f"删除分区 {device_path}",
command=lambda: self._handle_delete_partition(device_path))
menu.add_command(label=f"格式化分区 {device_path}...",
command=lambda: self._handle_format_partition(device_path))
command=lambda dp=device_path, dd=dev_data: self._handle_format_partition(dp, dd))
# 文件系统检查和修复
fstype = dev_data.get('fstype', '')
if fstype:
fs_menu = tk.Menu(menu, tearoff=0)
# 根据文件系统类型添加不同的检查/修复选项
if fstype.startswith('ext'):
fs_menu.add_command(label=f"检查文件系统 (fsck) {device_path}",
command=lambda dp=device_path, ft=fstype: self._handle_fsck(dp, ft))
fs_menu.add_command(label=f"调整文件系统大小 (resize2fs) {device_path}",
command=lambda dp=device_path, ft=fstype: self._handle_resize2fs(dp, ft))
elif fstype == 'xfs':
fs_menu.add_command(label=f"修复文件系统 (xfs_repair) {device_path}",
command=lambda dp=device_path: self._handle_xfs_repair(dp))
if fs_menu.index(tk.END) is not None:
menu.add_cascade(label=f"文件系统工具 {device_path}", menu=fs_menu)
menu.add_separator()
# Loop 设备操作
@@ -459,10 +528,27 @@ class MainWindow:
menu.add_command(label=f"停止阵列 {array_path}",
command=lambda: self._handle_stop_raid_array(array_path))
menu.add_command(label=f"删除阵列 {array_path}",
command=lambda: self._handle_delete_active_raid_array(array_path, member_devices, array_uuid))
command=lambda ap=array_path, md=member_devices, au=array_uuid, ad=array_data:
self._handle_delete_active_raid_array(ap, md, au))
menu.add_command(label=f"格式化阵列 {array_path}...",
command=lambda: self._handle_format_raid_array(array_path))
# 文件系统检查和修复
dev_details = self.system_manager.get_device_details_by_path(array_path)
fstype = dev_details.get('fstype', '') if dev_details else ''
if fstype:
fs_menu = tk.Menu(menu, tearoff=0)
if fstype.startswith('ext'):
fs_menu.add_command(label=f"检查文件系统 (fsck) {array_path}",
command=lambda dp=array_path, ft=fstype: self._handle_fsck(dp, ft))
fs_menu.add_command(label=f"调整文件系统大小 (resize2fs) {array_path}",
command=lambda dp=array_path, ft=fstype: self._handle_resize2fs(dp, ft))
elif fstype == 'xfs':
fs_menu.add_command(label=f"修复文件系统 (xfs_repair) {array_path}",
command=lambda dp=array_path: self._handle_xfs_repair(dp))
if fs_menu.index(tk.END) is not None:
menu.add_cascade(label=f"文件系统工具 {array_path}", menu=fs_menu)
if menu.index(tk.END) is not None:
self._show_context_menu(menu, event.x_root, event.y_root)
else:
@@ -535,7 +621,24 @@ class MainWindow:
menu.add_command(label=f"删除逻辑卷 {lv_name}",
command=lambda: self._handle_delete_lv(lv_name, vg_name))
menu.add_command(label=f"格式化逻辑卷 {lv_name}...",
command=lambda: self._handle_format_partition(lv_path))
command=lambda lp=lv_path, ln=lv_name: self._handle_format_partition(
lp, {'name': ln, 'path': lp}))
# 文件系统检查和修复
dev_details = self.system_manager.get_device_details_by_path(lv_path)
fstype = dev_details.get('fstype', '') if dev_details else ''
if fstype:
fs_menu = tk.Menu(menu, tearoff=0)
if fstype.startswith('ext'):
fs_menu.add_command(label=f"检查文件系统 (fsck) {lv_path}",
command=lambda dp=lv_path, ft=fstype: self._handle_fsck(dp, ft))
fs_menu.add_command(label=f"调整文件系统大小 (resize2fs) {lv_path}",
command=lambda dp=lv_path, ft=fstype: self._handle_resize2fs(dp, ft))
elif fstype == 'xfs':
fs_menu.add_command(label=f"修复文件系统 (xfs_repair) {lv_path}",
command=lambda dp=lv_path: self._handle_xfs_repair(dp))
if fs_menu.index(tk.END) is not None:
menu.add_cascade(label=f"文件系统工具 {lv_path}", menu=fs_menu)
if menu.index(tk.END) is not None:
self._show_context_menu(menu, event.x_root, event.y_root)
@@ -581,6 +684,41 @@ class MainWindow:
if self.disk_ops.unmount_loop_device(device_path):
self.refresh_all_info()
def _handle_show_smart(self, device_path):
"""显示指定设备的 SMART 信息"""
monitor = SmartMonitor()
if not monitor.is_available():
messagebox.showwarning("警告",
"smartctl 工具未安装。\n"
"请安装 smartmontools:\n"
" CentOS/RHEL: sudo yum install smartmontools\n"
" Ubuntu/Debian: sudo apt-get install smartmontools")
return
smart_info = monitor.get_disk_smart_info(device_path)
if smart_info:
SmartInfoDialog(self.root, device_path, smart_info)
else:
messagebox.showerror("错误",
f"无法获取 {device_path} 的 SMART 信息。\n"
f"可能的原因:\n"
f"1. 设备不支持 SMART\n"
f"2. 需要 root 权限\n"
f"3. 设备未正确连接")
def _show_smart_overview(self):
"""显示 SMART 总览"""
monitor = SmartMonitor()
if not monitor.is_available():
messagebox.showwarning("警告",
"smartctl 工具未安装。\n"
"请安装 smartmontools:\n"
" CentOS/RHEL: sudo yum install smartmontools\n"
" Ubuntu/Debian: sudo apt-get install smartmontools")
return
SmartOverviewDialog(self.root)
def _handle_wipe_partition_table(self, device_path):
"""处理擦除物理盘分区表"""
if messagebox.askyesno("确认擦除分区表",
@@ -652,7 +790,8 @@ class MainWindow:
max_available_mib = max(0.0, total_disk_mib - 1.0)
start_position_mib = 1.0
dialog = CreatePartitionDialog(self.root, disk_path, total_disk_mib, max_available_mib)
# 使用增强型分区创建对话框(带滑块可视化)
dialog = EnhancedPartitionDialog(self.root, disk_path, total_disk_mib, max_available_mib)
info = dialog.wait_for_result()
if info:
@@ -684,9 +823,101 @@ class MainWindow:
if self.disk_ops.delete_partition(device_path):
self.refresh_all_info()
def _handle_format_partition(self, device_path):
"""处理格式化分区"""
self.disk_ops.format_partition(device_path)
def _handle_format_partition(self, device_path, dev_data=None):
"""处理格式化分区 - 使用增强型对话框"""
# 获取设备信息
if dev_data is None:
dev_data = self.system_manager.get_device_details_by_path(device_path)
# 使用增强型格式化对话框
dialog = EnhancedFormatDialog(self.root, device_path, dev_data or {})
fs_type = dialog.wait_for_result()
if fs_type:
# 执行格式化
self.disk_ops.format_partition(device_path, fs_type)
def _handle_fsck(self, device_path, fstype):
"""检查/修复 ext 文件系统"""
# 检查设备是否已挂载
mount_point = self.system_manager.get_mountpoint_for_device(device_path)
if mount_point and mount_point != 'N/A':
messagebox.showwarning("警告", f"设备 {device_path} 已挂载到 {mount_point}\n请先卸载后再执行文件系统检查。")
return
if not messagebox.askyesno("确认",
f"即将对 {device_path} ({fstype}) 执行文件系统检查。\n"
f"命令: fsck -f -y {device_path}\n\n"
f"注意:此操作可能需要较长时间,请勿中断。",
default=messagebox.NO):
return
logger.info(f"开始检查文件系统: {device_path} ({fstype})")
success, stdout, stderr = self.lvm_ops._execute_shell_command(
["fsck", "-f", "-y", device_path],
f"检查文件系统 {device_path} 失败"
)
if success:
messagebox.showinfo("成功", f"文件系统检查完成:\n{stdout[:500] if stdout else '无输出'}")
else:
messagebox.showerror("错误", f"文件系统检查失败:\n{stderr[:500] if stderr else '未知错误'}")
self.refresh_all_info()
def _handle_xfs_repair(self, device_path):
"""修复 XFS 文件系统"""
# 检查设备是否已挂载
mount_point = self.system_manager.get_mountpoint_for_device(device_path)
if mount_point and mount_point != 'N/A':
messagebox.showwarning("警告", f"设备 {device_path} 已挂载到 {mount_point}\n请先卸载后再执行文件系统修复。")
return
if not messagebox.askyesno("确认",
f"即将对 {device_path} (XFS) 执行文件系统修复。\n"
f"命令: xfs_repair {device_path}\n\n"
f"警告:此操作会尝试修复文件系统错误,但可能无法恢复所有数据。\n"
f"建议先备份重要数据。",
default=messagebox.NO):
return
logger.info(f"开始修复 XFS 文件系统: {device_path}")
success, stdout, stderr = self.lvm_ops._execute_shell_command(
["xfs_repair", device_path],
f"修复 XFS 文件系统 {device_path} 失败"
)
if success:
messagebox.showinfo("成功", f"XFS 文件系统修复完成:\n{stdout[:500] if stdout else '无输出'}")
else:
messagebox.showerror("错误", f"XFS 文件系统修复失败:\n{stderr[:500] if stderr else '未知错误'}")
self.refresh_all_info()
def _handle_resize2fs(self, device_path, fstype):
"""调整 ext 文件系统大小"""
# 检查设备是否已挂载
mount_point = self.system_manager.get_mountpoint_for_device(device_path)
if mount_point and mount_point != 'N/A':
messagebox.showwarning("警告", f"设备 {device_path} 已挂载到 {mount_point}\n请先卸载后再执行调整大小操作。")
return
if not messagebox.askyesno("确认",
f"即将对 {device_path} ({fstype}) 强制调整文件系统大小以匹配分区大小。\n"
f"命令: resize2fs -f {device_path}\n\n"
f"注意:此操作会强制调整文件系统大小,请确保分区大小已正确设置。",
default=messagebox.NO):
return
logger.info(f"开始调整文件系统大小: {device_path} ({fstype})")
success, stdout, stderr = self.lvm_ops._execute_shell_command(
["resize2fs", "-f", device_path],
f"调整文件系统大小 {device_path} 失败"
)
if success:
messagebox.showinfo("成功", f"文件系统大小调整完成:\n{stdout[:500] if stdout else '无输出'}")
else:
messagebox.showerror("错误", f"调整文件系统大小失败:\n{stderr[:500] if stderr else '未知错误'}")
self.refresh_all_info()
def on_disk_formatting_finished(self, success, device_path, stdout, stderr):
"""接收格式化完成回调并刷新界面"""
@@ -791,10 +1022,29 @@ class MainWindow:
if self.raid_ops.stop_raid_array(array_path):
self.refresh_all_info()
def _handle_delete_active_raid_array(self, array_path, member_devices, uuid):
"""处理删除活动的 RAID 阵列"""
if self.raid_ops.delete_active_raid_array(array_path, member_devices, uuid):
self.refresh_all_info()
def _handle_delete_active_raid_array(self, array_path, member_devices, uuid, array_data=None):
"""处理删除活动的 RAID 阵列 - 使用增强型对话框"""
# 获取阵列详情(如果未提供)
if array_data is None:
array_data = {}
try:
raid_arrays = self.system_manager.get_mdadm_arrays()
for array in raid_arrays:
if array.get('device') == array_path:
array_data = array
break
except:
pass
# 使用增强型删除对话框
dialog = EnhancedRaidDeleteDialog(
self.root, array_path, array_data, member_devices
)
if dialog.wait_for_result():
# 执行删除
if self.raid_ops.delete_active_raid_array(array_path, member_devices, uuid):
self.refresh_all_info()
def _handle_delete_configured_raid_array(self, uuid):
"""处理删除停止状态 RAID 阵列的配置文件条目"""
@@ -1051,3 +1301,10 @@ class MainWindow:
self.refresh_all_info()
else:
messagebox.showerror("删除卷组失败", f"删除卷组 {vg_name} 失败,请检查日志。")
if __name__ == "__main__":
# 创建主窗口
root = tk.Tk()
app = MainWindow(root)
root.mainloop()

456
operation_history.py Normal file
View File

@@ -0,0 +1,456 @@
# operation_history.py
"""操作历史与撤销管理模块"""
import json
import os
import time
import logging
import subprocess
from datetime import datetime
from typing import List, Dict, Optional, Callable
from dataclasses import dataclass, asdict
from enum import Enum
logger = logging.getLogger(__name__)
class OperationStatus(Enum):
"""操作状态"""
PENDING = "待执行"
RUNNING = "执行中"
SUCCESS = "成功"
FAILED = "失败"
CANCELLED = "已取消"
ROLLED_BACK = "已撤销"
class OperationType(Enum):
"""操作类型"""
CREATE_PARTITION = "创建分区"
DELETE_PARTITION = "删除分区"
FORMAT_PARTITION = "格式化分区"
MOUNT_PARTITION = "挂载分区"
UNMOUNT_PARTITION = "卸载分区"
CREATE_RAID = "创建 RAID"
DELETE_RAID = "删除 RAID"
STOP_RAID = "停止 RAID"
CREATE_PV = "创建物理卷"
DELETE_PV = "删除物理卷"
CREATE_VG = "创建卷组"
DELETE_VG = "删除卷组"
CREATE_LV = "创建逻辑卷"
DELETE_LV = "删除逻辑卷"
ACTIVATE_LV = "激活逻辑卷"
DEACTIVATE_LV = "停用逻辑卷"
WIPE_PARTITION_TABLE = "擦除分区表"
@dataclass
class OperationRecord:
"""操作记录"""
id: str
timestamp: str
operation_type: str
device_path: str
status: str
details: Dict
rollback_data: Optional[Dict] = None
error_message: Optional[str] = None
execution_time: Optional[float] = None
class OperationHistory:
"""操作历史管理器"""
def __init__(self, history_file: str = None):
if history_file is None:
# 默认存储在用户目录
home_dir = os.path.expanduser("~")
self.history_dir = os.path.join(home_dir, ".linux_storage_manager")
os.makedirs(self.history_dir, exist_ok=True)
self.history_file = os.path.join(self.history_dir, "operation_history.json")
self.backup_dir = os.path.join(self.history_dir, "backups")
os.makedirs(self.backup_dir, exist_ok=True)
else:
self.history_file = history_file
self.backup_dir = os.path.join(os.path.dirname(history_file), "backups")
os.makedirs(self.backup_dir, exist_ok=True)
self.records: List[OperationRecord] = []
self.pending_operations: List[OperationRecord] = []
self._load_history()
def _load_history(self):
"""加载历史记录"""
if os.path.exists(self.history_file):
try:
with open(self.history_file, 'r', encoding='utf-8') as f:
data = json.load(f)
self.records = [OperationRecord(**record) for record in data]
logger.info(f"已加载 {len(self.records)} 条历史记录")
except Exception as e:
logger.error(f"加载历史记录失败: {e}")
self.records = []
else:
self.records = []
def _save_history(self):
"""保存历史记录"""
try:
with open(self.history_file, 'w', encoding='utf-8') as f:
json.dump([asdict(r) for r in self.records], f,
ensure_ascii=False, indent=2)
except Exception as e:
logger.error(f"保存历史记录失败: {e}")
def add_record(self, operation_type: OperationType, device_path: str,
details: Dict, rollback_data: Optional[Dict] = None) -> str:
"""添加操作记录"""
record_id = f"{int(time.time() * 1000)}_{operation_type.name}"
record = OperationRecord(
id=record_id,
timestamp=datetime.now().isoformat(),
operation_type=operation_type.value,
device_path=device_path,
status=OperationStatus.SUCCESS.value,
details=details,
rollback_data=rollback_data
)
self.records.append(record)
self._save_history()
logger.info(f"添加操作记录: {record_id} - {operation_type.value}")
return record_id
def update_status(self, record_id: str, status: OperationStatus,
error_message: str = None, execution_time: float = None):
"""更新操作状态"""
for record in self.records:
if record.id == record_id:
record.status = status.value
if error_message:
record.error_message = error_message
if execution_time:
record.execution_time = execution_time
# 从待执行列表中移除
if status in [OperationStatus.SUCCESS, OperationStatus.FAILED,
OperationStatus.CANCELLED]:
self.pending_operations = [
r for r in self.pending_operations if r.id != record_id
]
self._save_history()
logger.info(f"更新操作状态: {record_id} -> {status.value}")
break
def get_recent_records(self, limit: int = 50) -> List[OperationRecord]:
"""获取最近的操作记录"""
return sorted(self.records, key=lambda r: r.timestamp, reverse=True)[:limit]
def get_pending_operations(self) -> List[OperationRecord]:
"""获取待执行的操作"""
return self.pending_operations
def export_history(self, file_path: str):
"""导出历史记录"""
try:
with open(file_path, 'w', encoding='utf-8') as f:
json.dump([asdict(r) for r in self.records], f,
ensure_ascii=False, indent=2)
return True
except Exception as e:
logger.error(f"导出历史记录失败: {e}")
return False
def clear_history(self, days: int = None):
"""清理历史记录"""
if days is None:
# 清空所有记录
self.records = []
else:
# 清理指定天数之前的记录
cutoff = time.time() - (days * 24 * 60 * 60)
self.records = [
r for r in self.records
if datetime.fromisoformat(r.timestamp).timestamp() > cutoff
]
self._save_history()
class PartitionTableBackup:
"""分区表备份管理器"""
def __init__(self, backup_dir: str = None):
if backup_dir is None:
home_dir = os.path.expanduser("~")
self.backup_dir = os.path.join(
home_dir, ".linux_storage_manager", "partition_backups"
)
else:
self.backup_dir = backup_dir
os.makedirs(self.backup_dir, exist_ok=True)
def backup_partition_table(self, device_path: str) -> Optional[str]:
"""备份分区表"""
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
device_name = os.path.basename(device_path)
backup_file = os.path.join(
self.backup_dir,
f"{device_name}_pt_{timestamp}.bin"
)
try:
# 使用 sfdisk 备份分区表
result = subprocess.run(
["sudo", "sfdisk", "-d", device_path],
capture_output=True,
text=True,
check=True,
timeout=30
)
with open(backup_file, 'w') as f:
f.write(result.stdout)
logger.info(f"分区表已备份: {backup_file}")
return backup_file
except subprocess.CalledProcessError as e:
logger.error(f"备份分区表失败: {e}")
return None
except Exception as e:
logger.error(f"备份分区表出错: {e}")
return None
def restore_partition_table(self, device_path: str, backup_file: str) -> bool:
"""恢复分区表"""
try:
if not os.path.exists(backup_file):
logger.error(f"备份文件不存在: {backup_file}")
return False
with open(backup_file, 'r') as f:
partition_data = f.read()
# 使用 sfdisk 恢复分区表
result = subprocess.run(
["sudo", "sfdisk", device_path],
input=partition_data,
capture_output=True,
text=True,
check=True,
timeout=30
)
logger.info(f"分区表已从 {backup_file} 恢复到 {device_path}")
return True
except subprocess.CalledProcessError as e:
logger.error(f"恢复分区表失败: {e.stderr}")
return False
except Exception as e:
logger.error(f"恢复分区表出错: {e}")
return False
def list_backups(self, device_path: str = None) -> List[Dict]:
"""列出可用的备份"""
backups = []
try:
for filename in os.listdir(self.backup_dir):
if filename.endswith('.bin'):
filepath = os.path.join(self.backup_dir, filename)
stat = os.stat(filepath)
# 解析文件名
parts = filename.rsplit('_pt_', 1)
if len(parts) == 2:
device_name = parts[0]
timestamp = parts[1].replace('.bin', '')
if device_path is None or device_name == os.path.basename(device_path):
backups.append({
'file': filepath,
'device': device_name,
'timestamp': timestamp,
'size': stat.st_size,
'created': datetime.fromtimestamp(stat.st_mtime).isoformat()
})
except Exception as e:
logger.error(f"列出备份失败: {e}")
return sorted(backups, key=lambda x: x['created'], reverse=True)
def delete_backup(self, backup_file: str) -> bool:
"""删除备份"""
try:
if os.path.exists(backup_file):
os.remove(backup_file)
logger.info(f"备份已删除: {backup_file}")
return True
return False
except Exception as e:
logger.error(f"删除备份失败: {e}")
return False
class OperationQueue:
"""操作队列管理器"""
def __init__(self, history: OperationHistory):
self.history = history
self.queue: List[Dict] = []
self.running = False
self.current_operation = None
self.callbacks: Dict[str, List[Callable]] = {
'on_start': [],
'on_complete': [],
'on_error': [],
'on_cancel': []
}
def add_operation(self, operation_type: OperationType, device_path: str,
execute_func: Callable, details: Dict,
rollback_data: Dict = None) -> str:
"""添加操作到队列"""
record_id = self.history.add_record(
operation_type, device_path, details, rollback_data
)
operation = {
'id': record_id,
'type': operation_type,
'device_path': device_path,
'execute_func': execute_func,
'status': OperationStatus.PENDING
}
self.queue.append(operation)
logger.info(f"操作已加入队列: {record_id}")
return record_id
def start_execution(self):
"""开始执行队列"""
if self.running:
return
self.running = True
self._execute_next()
def _execute_next(self):
"""执行下一个操作"""
if not self.queue:
self.running = False
return
operation = self.queue[0]
self.current_operation = operation
# 更新状态为运行中
self.history.update_status(
operation['id'],
OperationStatus.RUNNING
)
operation['status'] = OperationStatus.RUNNING
# 触发开始回调
for callback in self.callbacks['on_start']:
callback(operation)
# 执行操作
start_time = time.time()
try:
result = operation['execute_func']()
execution_time = time.time() - start_time
if result:
operation['status'] = OperationStatus.SUCCESS
self.history.update_status(
operation['id'],
OperationStatus.SUCCESS,
execution_time=execution_time
)
# 触发完成回调
for callback in self.callbacks['on_complete']:
callback(operation, True)
else:
operation['status'] = OperationStatus.FAILED
self.history.update_status(
operation['id'],
OperationStatus.FAILED,
error_message="操作执行失败",
execution_time=execution_time
)
# 触发错误回调
for callback in self.callbacks['on_error']:
callback(operation, "操作执行失败")
except Exception as e:
execution_time = time.time() - start_time
operation['status'] = OperationStatus.FAILED
self.history.update_status(
operation['id'],
OperationStatus.FAILED,
error_message=str(e),
execution_time=execution_time
)
# 触发错误回调
for callback in self.callbacks['on_error']:
callback(operation, str(e))
finally:
# 从队列中移除
if operation in self.queue:
self.queue.remove(operation)
self.current_operation = None
# 继续执行下一个
if self.running and self.queue:
self._execute_next()
else:
self.running = False
def cancel_current(self):
"""取消当前操作"""
if self.current_operation:
self.history.update_status(
self.current_operation['id'],
OperationStatus.CANCELLED
)
self.current_operation['status'] = OperationStatus.CANCELLED
# 触发取消回调
for callback in self.callbacks['on_cancel']:
callback(self.current_operation)
def clear_queue(self):
"""清空队列"""
for op in self.queue:
if op['status'] == OperationStatus.PENDING:
self.history.update_status(
op['id'],
OperationStatus.CANCELLED
)
self.queue = []
self.running = False
def register_callback(self, event: str, callback: Callable):
"""注册回调函数"""
if event in self.callbacks:
self.callbacks[event].append(callback)
def get_queue_status(self) -> Dict:
"""获取队列状态"""
return {
'running': self.running,
'pending_count': len([o for o in self.queue if o['status'] == OperationStatus.PENDING]),
'current_operation': self.current_operation
}
def get_pending_operations(self) -> List[Dict]:
"""获取待执行的操作列表"""
return [o for o in self.queue if o['status'] == OperationStatus.PENDING]

290
smart_monitor.py Normal file
View File

@@ -0,0 +1,290 @@
# smart_monitor.py
"""SMART 磁盘健康监控模块"""
import subprocess
import logging
import re
import json
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass
from enum import Enum
logger = logging.getLogger(__name__)
class HealthStatus(Enum):
"""磁盘健康状态"""
GOOD = "良好"
WARNING = "警告"
DANGER = "危险"
UNKNOWN = "未知"
@dataclass
class SmartAttribute:
"""SMART 属性"""
id: int
name: str
value: int
worst: int
threshold: int
raw_value: str
status: str
@dataclass
class SmartInfo:
"""SMART 信息"""
device: str
model: str
serial: str
firmware: str
health_status: HealthStatus
temperature: Optional[int] = None
power_on_hours: Optional[int] = None
power_cycle_count: Optional[int] = None
attributes: List[SmartAttribute] = None
overall_health: str = ""
errors: List[str] = None
def __post_init__(self):
if self.attributes is None:
self.attributes = []
if self.errors is None:
self.errors = []
class SmartMonitor:
"""SMART 监控类"""
# 关键属性 ID
CRITICAL_ATTRIBUTES = {
5: "Reallocated_Sector_Ct", # 重映射扇区数
10: "Spin_Retry_Count", # 旋转重试次数
184: "End-to-End_Error", # 端到端错误
187: "Reported_Uncorrect", # 报告不可纠正错误
188: "Command_Timeout", # 命令超时
196: "Reallocation_Event_Count", # 重映射事件计数
197: "Current_Pending_Sector", # 当前待处理扇区
198: "Offline_Uncorrectable", # 离线不可纠正
}
def __init__(self):
self._check_smartctl()
def _check_smartctl(self) -> bool:
"""检查 smartctl 是否可用"""
try:
result = subprocess.run(
["which", "smartctl"],
capture_output=True,
text=True,
check=False
)
return result.returncode == 0
except Exception:
return False
def is_available(self) -> bool:
"""检查 SMART 监控是否可用"""
return self._check_smartctl()
def get_disk_smart_info(self, device_path: str) -> Optional[SmartInfo]:
"""获取指定设备的 SMART 信息"""
if not self.is_available():
logger.warning("smartctl 不可用,无法获取 SMART 信息")
return None
try:
# 获取基本信息
result = subprocess.run(
["sudo", "smartctl", "-a", device_path],
capture_output=True,
text=True,
check=False,
timeout=30
)
if result.returncode not in [0, 4, 8]: # 4=磁盘处于故障状态, 8=无法获取部分信息
logger.error(f"获取 SMART 信息失败: {result.stderr}")
return None
return self._parse_smart_output(device_path, result.stdout)
except subprocess.TimeoutExpired:
logger.error(f"获取 {device_path} SMART 信息超时")
return None
except Exception as e:
logger.error(f"获取 {device_path} SMART 信息出错: {e}")
return None
def _parse_smart_output(self, device: str, output: str) -> SmartInfo:
"""解析 smartctl 输出"""
info = SmartInfo(
device=device,
model="",
serial="",
firmware="",
health_status=HealthStatus.UNKNOWN
)
lines = output.split('\n')
in_attributes = False
for line in lines:
line = line.strip()
# 解析设备信息
if line.startswith("Device Model:"):
info.model = line.split(":", 1)[1].strip()
elif line.startswith("Serial Number:"):
info.serial = line.split(":", 1)[1].strip()
elif line.startswith("Firmware Version:"):
info.firmware = line.split(":", 1)[1].strip()
# 解析整体健康状态
elif "SMART overall-health self-assessment test result:" in line:
result = line.split(":")[-1].strip().upper()
info.overall_health = result
if result == "PASSED":
info.health_status = HealthStatus.GOOD
else:
info.health_status = HealthStatus.DANGER
# 解析温度
elif "Temperature:" in line or "Current Drive Temperature:" in line:
match = re.search(r'(\d+)\s*(?:Celsius|°C|C)', line)
if match:
info.temperature = int(match.group(1))
# 解析通电时间
elif line.startswith("Power_On_Hours"):
match = re.search(r'(\d+)\s*\(', line)
if match:
info.power_on_hours = int(match.group(1))
# 解析通电次数
elif line.startswith("Power_Cycle_Count"):
match = re.search(r'(\d+)\s*\(', line)
if match:
info.power_cycle_count = int(match.group(1))
# 解析 SMART 属性表
elif "Vendor Specific SMART Attributes" in line:
in_attributes = True
continue
if in_attributes and line.startswith("0x"):
attr = self._parse_attribute_line(line)
if attr:
info.attributes.append(attr)
# 检查关键属性
if attr.id in self.CRITICAL_ATTRIBUTES:
if attr.raw_value != "0" and int(attr.raw_value) > 0:
if info.health_status != HealthStatus.DANGER:
info.health_status = HealthStatus.WARNING
info.errors.append(
f"{attr.name}: {attr.raw_value}"
)
# 如果没有明确的整体健康状态,根据属性判断
if info.health_status == HealthStatus.UNKNOWN:
if info.errors:
info.health_status = HealthStatus.WARNING
else:
info.health_status = HealthStatus.GOOD
return info
def _parse_attribute_line(self, line: str) -> Optional[SmartAttribute]:
"""解析 SMART 属性行"""
# 格式: 0x05 0x64 0x64 0x64 0x0000 0x0000 0x0000 000
parts = line.split()
if len(parts) < 10:
return None
try:
attr_id = int(parts[0], 16)
name = parts[1]
value = int(parts[3])
worst = int(parts[4])
threshold = int(parts[5])
raw_value = parts[9]
status = "正常" if value > threshold else "预警"
return SmartAttribute(
id=attr_id,
name=name,
value=value,
worst=worst,
threshold=threshold,
raw_value=raw_value,
status=status
)
except (ValueError, IndexError):
return None
def get_all_disks_smart(self) -> Dict[str, SmartInfo]:
"""获取所有支持 SMART 的磁盘信息"""
results = {}
try:
# 获取所有块设备
result = subprocess.run(
["lsblk", "-d", "-n", "-o", "NAME,TYPE,ROTA"],
capture_output=True,
text=True,
check=True
)
for line in result.stdout.strip().split('\n'):
parts = line.split()
if len(parts) >= 3:
name = parts[0]
device_type = parts[1]
is_rotational = parts[2] == "1"
# 只对物理磁盘检查 SMART
if device_type in ["disk", "rom"] and is_rotational:
device_path = f"/dev/{name}"
smart_info = self.get_disk_smart_info(device_path)
if smart_info:
results[device_path] = smart_info
except Exception as e:
logger.error(f"获取磁盘列表失败: {e}")
return results
def get_health_score(self, smart_info: SmartInfo) -> int:
"""计算健康评分 (0-100)"""
if not smart_info:
return 0
score = 100
# 根据整体健康状态调整
if smart_info.health_status == HealthStatus.DANGER:
score = 30
elif smart_info.health_status == HealthStatus.WARNING:
score = 70
# 根据错误数量调整
error_count = len(smart_info.errors)
score -= error_count * 10
# 确保分数在合理范围内
return max(0, min(100, score))
def get_temperature_status(self, temp: Optional[int]) -> Tuple[str, str]:
"""获取温度状态"""
if temp is None:
return "未知", "gray"
elif temp < 35:
return "正常", "green"
elif temp < 45:
return "偏高", "orange"
elif temp < 55:
return "高温", "red"
else:
return "危险", "red"