fix 33
This commit is contained in:
Binary file not shown.
Binary file not shown.
@@ -1,80 +1,78 @@
|
||||
# disk_operations.py
|
||||
import subprocess
|
||||
import logging
|
||||
import re
|
||||
import os
|
||||
from PySide6.QtWidgets import QMessageBox, QInputDialog
|
||||
from system_info import SystemInfoManager
|
||||
from PySide6.QtCore import QObject, Signal, QThread # <--- 导入 QObject, Signal, QThread
|
||||
from system_info import SystemInfoManager # 确保 SystemInfoManager 已导入
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class DiskOperations:
|
||||
def __init__(self, system_manager: SystemInfoManager): # NEW: 接收 system_manager 实例
|
||||
self.system_manager = system_manager
|
||||
# --- 新增: 后台格式化工作线程 ---
|
||||
class FormatWorker(QObject):
|
||||
# 定义信号,用于向主线程发送格式化结果
|
||||
finished = Signal(bool, str, str, str) # 成功状态, 设备路径, 标准输出, 标准错误
|
||||
started = Signal(str) # 设备路径
|
||||
|
||||
def __init__(self, device_path, fs_type, execute_shell_command_func, parent=None):
|
||||
super().__init__(parent)
|
||||
self.device_path = device_path
|
||||
self.fs_type = fs_type
|
||||
# 接收一个可调用的函数,用于执行shell命令 (这里是 DiskOperations._execute_shell_command)
|
||||
self._execute_shell_command_func = execute_shell_command_func
|
||||
|
||||
def run(self):
|
||||
"""
|
||||
在单独线程中执行格式化命令。
|
||||
"""
|
||||
self.started.emit(self.device_path)
|
||||
logger.info(f"后台格式化开始: 设备 {self.device_path}, 文件系统 {self.fs_type}")
|
||||
|
||||
command_list = []
|
||||
if self.fs_type == "ext4":
|
||||
command_list = ["mkfs.ext4", "-F", self.device_path] # -F 强制执行
|
||||
elif self.fs_type == "xfs":
|
||||
command_list = ["mkfs.xfs", "-f", self.device_path] # -f 强制执行
|
||||
elif self.fs_type == "ntfs":
|
||||
command_list = ["mkfs.ntfs", "-f", self.device_path] # -f 强制执行
|
||||
elif self.fs_type == "fat32":
|
||||
command_list = ["mkfs.vfat", "-F", "32", self.device_path] # -F 32 指定FAT32
|
||||
else:
|
||||
logger.error(f"不支持的文件系统类型: {self.fs_type}")
|
||||
self.finished.emit(False, self.device_path, "", f"不支持的文件系统类型: {self.fs_type}")
|
||||
return
|
||||
|
||||
# 调用传入的shell命令执行函数,并禁用其内部的QMessageBox显示
|
||||
success, stdout, stderr = self._execute_shell_command_func(
|
||||
command_list,
|
||||
f"格式化设备 {self.device_path} 为 {self.fs_type} 失败",
|
||||
root_privilege=True,
|
||||
show_dialog=False # <--- 关键:阻止工作线程弹出 QMessageBox
|
||||
)
|
||||
self.finished.emit(success, self.device_path, stdout, stderr)
|
||||
logger.info(f"后台格式化完成: 设备 {self.device_path}, 成功: {success}")
|
||||
|
||||
|
||||
class DiskOperations(QObject): # <--- 继承 QObject 以便发出信号
|
||||
# 定义信号,用于通知主线程格式化操作的开始和结束
|
||||
formatting_finished = Signal(bool, str, str, str) # 成功状态, 设备路径, 标准输出, 标准错误
|
||||
formatting_started = Signal(str) # 设备路径
|
||||
|
||||
def __init__(self, system_manager: SystemInfoManager, lvm_ops, parent=None): # <--- 构造函数现在接收 lvm_ops 实例
|
||||
super().__init__(parent)
|
||||
self.system_manager = system_manager
|
||||
self._lvm_ops = lvm_ops # 保存 LvmOperations 实例,以便调用其 _execute_shell_command
|
||||
self.active_format_workers = {} # 用于跟踪正在运行的格式化线程
|
||||
|
||||
# DiskOperations 内部的 _execute_shell_command 只是对 LvmOperations._execute_shell_command 的包装
|
||||
# 这样所有 shell 命令都通过 LvmOperations 的统一入口,可以控制 show_dialog
|
||||
def _execute_shell_command(self, command_list, error_message, root_privilege=True,
|
||||
suppress_critical_dialog_on_stderr_match=None, input_data=None):
|
||||
"""
|
||||
通用地运行一个 shell 命令,并处理错误。
|
||||
:param command_list: 命令及其参数的列表。
|
||||
:param error_message: 命令失败时显示给用户的错误消息。
|
||||
:param root_privilege: 如果为 True,则使用 sudo 执行命令。
|
||||
:param suppress_critical_dialog_on_stderr_match: 如果 stderr 包含此字符串,则不显示关键错误对话框。
|
||||
可以是字符串或字符串元组。
|
||||
:param input_data: 传递给命令stdin的数据 (str)。
|
||||
:return: (True/False, stdout_str, stderr_str)
|
||||
"""
|
||||
if not all(isinstance(arg, str) for arg in command_list):
|
||||
logger.error(f"命令列表包含非字符串元素: {command_list}")
|
||||
QMessageBox.critical(None, "错误", f"内部错误:尝试执行的命令包含无效参数。\n命令详情: {command_list}")
|
||||
return False, "", "内部错误:命令参数类型不正确。"
|
||||
|
||||
if root_privilege:
|
||||
command_list = ["sudo"] + command_list
|
||||
|
||||
full_cmd_str = ' '.join(command_list)
|
||||
logger.debug(f"执行命令: {full_cmd_str}")
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
command_list,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
encoding='utf-8',
|
||||
input=input_data
|
||||
)
|
||||
logger.info(f"命令成功: {full_cmd_str}")
|
||||
return True, result.stdout.strip(), result.stderr.strip()
|
||||
except subprocess.CalledProcessError as e:
|
||||
stderr_output = e.stderr.strip()
|
||||
logger.error(f"命令失败: {full_cmd_str}")
|
||||
logger.error(f"退出码: {e.returncode}")
|
||||
logger.error(f"标准输出: {e.stdout.strip()}")
|
||||
logger.error(f"标准错误: {stderr_output}")
|
||||
|
||||
# Determine if the error dialog should be suppressed by this function
|
||||
should_suppress_dialog_here = False
|
||||
if suppress_critical_dialog_on_stderr_match:
|
||||
if isinstance(suppress_critical_dialog_on_stderr_match, str):
|
||||
if suppress_critical_dialog_on_stderr_match in stderr_output:
|
||||
should_suppress_dialog_here = True
|
||||
elif isinstance(suppress_critical_dialog_on_stderr_match, tuple):
|
||||
if any(s in stderr_output for s in suppress_critical_dialog_on_stderr_match):
|
||||
should_suppress_dialog_here = True
|
||||
|
||||
if should_suppress_dialog_here:
|
||||
logger.info(f"错误信息 '{stderr_output}' 匹配抑制条件,不显示关键错误对话框。")
|
||||
else:
|
||||
QMessageBox.critical(None, "错误", f"{error_message}\n错误详情: {stderr_output}")
|
||||
return False, e.stdout.strip(), stderr_output
|
||||
except FileNotFoundError:
|
||||
QMessageBox.critical(None, "错误", f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。")
|
||||
logger.error(f"命令 '{command_list[0]}' 未找到。")
|
||||
return False, "", f"命令 '{command_list[0]}' 未找到。"
|
||||
except Exception as e:
|
||||
QMessageBox.critical(None, "错误", f"执行命令时发生未知错误: {e}")
|
||||
logger.error(f"执行命令 {full_cmd_str} 时发生未知错误: {e}")
|
||||
return False, "", str(e)
|
||||
suppress_critical_dialog_on_stderr_match=None, input_data=None,
|
||||
show_dialog=True):
|
||||
return self._lvm_ops._execute_shell_command(
|
||||
command_list, error_message, root_privilege, suppress_critical_dialog_on_stderr_match, input_data, show_dialog
|
||||
)
|
||||
|
||||
def _add_to_fstab(self, device_path, mount_point, fstype, uuid):
|
||||
"""
|
||||
@@ -223,7 +221,8 @@ class DiskOperations:
|
||||
success, _, stderr = self._execute_shell_command(
|
||||
["umount", device_path],
|
||||
f"卸载设备 {device_path} 失败",
|
||||
suppress_critical_dialog_on_stderr_match=already_unmounted_errors
|
||||
suppress_critical_dialog_on_stderr_match=already_unmounted_errors,
|
||||
show_dialog=show_dialog_on_error # <--- 将 show_dialog_on_error 传递给底层命令执行器
|
||||
)
|
||||
|
||||
if success:
|
||||
@@ -460,52 +459,67 @@ class DiskOperations:
|
||||
|
||||
def format_partition(self, device_path, fstype=None):
|
||||
"""
|
||||
格式化指定分区。
|
||||
启动一个 QInputDialog 让用户选择文件系统类型,然后将格式化操作提交到后台线程。
|
||||
:param device_path: 要格式化的分区路径。
|
||||
:param fstype: 文件系统类型 (例如 'ext4', 'xfs', 'fat32', 'ntfs')。如果为 None,则弹出对话框让用户选择。
|
||||
:return: True 如果成功,否则 False。
|
||||
:return: True 如果成功启动后台任务,否则 False。
|
||||
"""
|
||||
if fstype is None:
|
||||
items = ("ext4", "xfs", "fat32", "ntfs")
|
||||
fstype, ok = QInputDialog.getItem(None, "选择文件系统", "请选择要使用的文件系统类型:", items, 0, False)
|
||||
fstype, ok = QInputDialog.getItem(None, "选择文件系统", f"请选择要使用的文件系统类型 for {device_path}:", items, 0, False)
|
||||
if not ok or not fstype:
|
||||
logger.info("用户取消了文件系统选择。")
|
||||
return False
|
||||
|
||||
reply = QMessageBox.question(None, "确认格式化分区",
|
||||
f"您确定要将分区 {device_path} 格式化为 {fstype} 吗?此操作将擦除分区上的所有数据!",
|
||||
reply = QMessageBox.question(None, "确认格式化",
|
||||
f"您确定要格式化设备 {device_path} 为 {fstype} 吗?此操作将擦除所有数据!",
|
||||
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
|
||||
if reply == QMessageBox.No:
|
||||
logger.info(f"用户取消了格式化分区 {device_path} 的操作。")
|
||||
logger.info(f"用户取消了格式化 {device_path} 的操作。")
|
||||
return False
|
||||
|
||||
# 尝试卸载分区
|
||||
self.unmount_partition(device_path, show_dialog_on_error=False) # 静默卸载
|
||||
# 检查是否已有格式化任务正在进行
|
||||
if device_path in self.active_format_workers:
|
||||
QMessageBox.warning(None, "警告", f"设备 {device_path} 正在格式化中,请勿重复操作。")
|
||||
return False
|
||||
|
||||
# 尝试卸载分区(静默,不弹对话框)
|
||||
self.unmount_partition(device_path, show_dialog_on_error=False)
|
||||
# 从 fstab 中移除条目 (因为格式化会改变 UUID)
|
||||
self._remove_fstab_entry(device_path)
|
||||
|
||||
logger.info(f"尝试将分区 {device_path} 格式化为 {fstype}。")
|
||||
format_cmd = []
|
||||
if fstype == "ext4":
|
||||
format_cmd = ["mkfs.ext4", "-F", device_path] # -F 强制执行
|
||||
elif fstype == "xfs":
|
||||
format_cmd = ["mkfs.xfs", "-f", device_path] # -f 强制执行
|
||||
elif fstype == "fat32":
|
||||
format_cmd = ["mkfs.fat", "-F", "32", device_path]
|
||||
elif fstype == "ntfs":
|
||||
format_cmd = ["mkfs.ntfs", "-f", device_path]
|
||||
else:
|
||||
QMessageBox.critical(None, "错误", f"不支持的文件系统类型: {fstype}")
|
||||
logger.error(f"不支持的文件系统类型: {fstype}")
|
||||
return False
|
||||
# 创建 QThread 和 FormatWorker 实例
|
||||
thread = QThread()
|
||||
# 将 self._execute_shell_command (它内部调用 LvmOperations._execute_shell_command) 传递给工作线程
|
||||
worker = FormatWorker(device_path, fstype, self._execute_shell_command)
|
||||
worker.moveToThread(thread)
|
||||
|
||||
success, _, stderr = self._execute_shell_command(
|
||||
format_cmd,
|
||||
f"格式化分区 {device_path} 失败"
|
||||
)
|
||||
# 连接信号和槽
|
||||
thread.started.connect(worker.run) # 线程启动时执行 worker 的 run 方法
|
||||
worker.finished.connect(lambda s, dp, o, e: self._on_formatting_finished(s, dp, o, e)) # worker 完成时调用处理函数
|
||||
worker.finished.connect(thread.quit) # worker 完成时退出线程
|
||||
worker.finished.connect(worker.deleteLater) # worker 完成时自动删除 worker 对象
|
||||
thread.finished.connect(thread.deleteLater) # 线程退出时自动删除线程对象
|
||||
|
||||
self.active_format_workers[device_path] = thread # 存储线程以便管理
|
||||
self.formatting_started.emit(device_path) # 发出信号通知主界面格式化已开始
|
||||
thread.start() # 启动线程
|
||||
|
||||
QMessageBox.information(None, "开始格式化", f"设备 {device_path} 正在后台格式化为 {fstype}。完成后将通知您。")
|
||||
return True
|
||||
|
||||
def _on_formatting_finished(self, success, device_path, stdout, stderr):
|
||||
"""
|
||||
处理格式化工作线程完成后的结果。此槽函数在主线程中执行。
|
||||
"""
|
||||
if device_path in self.active_format_workers:
|
||||
del self.active_format_workers[device_path] # 从跟踪列表中移除
|
||||
|
||||
# 在主线程中显示最终结果的消息框
|
||||
if success:
|
||||
QMessageBox.information(None, "成功", f"分区 {device_path} 已成功格式化为 {fstype}。")
|
||||
return True
|
||||
QMessageBox.information(None, "格式化成功", f"设备 {device_path} 已成功格式化。")
|
||||
else:
|
||||
return False
|
||||
QMessageBox.critical(None, "格式化失败", f"格式化设备 {device_path} 失败。\n错误详情: {stderr}")
|
||||
|
||||
self.formatting_finished.emit(success, device_path, stdout, stderr) # 发出信号通知 MainWindow 刷新界面
|
||||
|
||||
|
||||
@@ -1,512 +0,0 @@
|
||||
# disk_operations.py
|
||||
import subprocess
|
||||
import logging
|
||||
import re
|
||||
import os
|
||||
from PySide6.QtWidgets import QMessageBox, QInputDialog
|
||||
from system_info import SystemInfoManager
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
class DiskOperations:
|
||||
def __init__(self, system_manager: SystemInfoManager): # NEW: 接收 system_manager 实例
|
||||
self.system_manager = system_manager
|
||||
|
||||
def _execute_shell_command(self, command_list, error_message, root_privilege=True,
|
||||
suppress_critical_dialog_on_stderr_match=None, input_data=None):
|
||||
"""
|
||||
通用地运行一个 shell 命令,并处理错误。
|
||||
:param command_list: 命令及其参数的列表。
|
||||
:param error_message: 命令失败时显示给用户的错误消息。
|
||||
:param root_privilege: 如果为 True,则使用 sudo 执行命令。
|
||||
:param suppress_critical_dialog_on_stderr_match: 如果 stderr 包含此字符串,则不显示关键错误对话框。
|
||||
可以是字符串或字符串元组。
|
||||
:param input_data: 传递给命令stdin的数据 (str)。
|
||||
:return: (True/False, stdout_str, stderr_str)
|
||||
"""
|
||||
if not all(isinstance(arg, str) for arg in command_list):
|
||||
logger.error(f"命令列表包含非字符串元素: {command_list}")
|
||||
QMessageBox.critical(None, "错误", f"内部错误:尝试执行的命令包含无效参数。\n命令详情: {command_list}")
|
||||
return False, "", "内部错误:命令参数类型不正确。"
|
||||
|
||||
if root_privilege:
|
||||
command_list = ["sudo"] + command_list
|
||||
|
||||
full_cmd_str = ' '.join(command_list)
|
||||
logger.debug(f"执行命令: {full_cmd_str}")
|
||||
|
||||
try:
|
||||
result = subprocess.run(
|
||||
command_list,
|
||||
capture_output=True,
|
||||
text=True,
|
||||
check=True,
|
||||
encoding='utf-8',
|
||||
input=input_data
|
||||
)
|
||||
logger.info(f"命令成功: {full_cmd_str}")
|
||||
return True, result.stdout.strip(), result.stderr.strip()
|
||||
except subprocess.CalledProcessError as e:
|
||||
stderr_output = e.stderr.strip()
|
||||
logger.error(f"命令失败: {full_cmd_str}")
|
||||
logger.error(f"退出码: {e.returncode}")
|
||||
logger.error(f"标准输出: {e.stdout.strip()}")
|
||||
logger.error(f"标准错误: {stderr_output}")
|
||||
|
||||
# Determine if the error dialog should be suppressed by this function
|
||||
should_suppress_dialog_here = False
|
||||
if suppress_critical_dialog_on_stderr_match:
|
||||
if isinstance(suppress_critical_dialog_on_stderr_match, str):
|
||||
if suppress_critical_dialog_on_stderr_match in stderr_output:
|
||||
should_suppress_dialog_here = True
|
||||
elif isinstance(suppress_critical_dialog_on_stderr_match, tuple):
|
||||
if any(s in stderr_output for s in suppress_critical_dialog_on_stderr_match):
|
||||
should_suppress_dialog_here = True
|
||||
|
||||
if should_suppress_dialog_here:
|
||||
logger.info(f"错误信息 '{stderr_output}' 匹配抑制条件,不显示关键错误对话框。")
|
||||
else:
|
||||
QMessageBox.critical(None, "错误", f"{error_message}\n错误详情: {stderr_output}")
|
||||
return False, e.stdout.strip(), stderr_output
|
||||
except FileNotFoundError:
|
||||
QMessageBox.critical(None, "错误", f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。")
|
||||
logger.error(f"命令 '{command_list[0]}' 未找到。")
|
||||
return False, "", f"命令 '{command_list[0]}' 未找到。"
|
||||
except Exception as e:
|
||||
QMessageBox.critical(None, "错误", f"执行命令时发生未知错误: {e}")
|
||||
logger.error(f"执行命令 {full_cmd_str} 时发生未知错误: {e}")
|
||||
return False, "", str(e)
|
||||
|
||||
def _add_to_fstab(self, device_path, mount_point, fstype, uuid):
|
||||
"""
|
||||
将设备的挂载信息添加到 /etc/fstab。
|
||||
使用 UUID 识别设备以提高稳定性。
|
||||
"""
|
||||
if not uuid or not mount_point or not fstype:
|
||||
logger.error(f"无法将设备 {device_path} 添加到 fstab:缺少 UUID/挂载点/文件系统类型。")
|
||||
QMessageBox.warning(None, "警告", f"无法将设备 {device_path} 添加到 fstab:缺少 UUID/挂载点/文件系统类型。")
|
||||
return False
|
||||
|
||||
fstab_entry = f"UUID={uuid} {mount_point} {fstype} defaults 0 2"
|
||||
fstab_path = "/etc/fstab"
|
||||
|
||||
try:
|
||||
# 检查 fstab 中是否已存在相同 UUID 的条目
|
||||
# 使用 _execute_shell_command 来读取 fstab,尽管通常不需要 sudo
|
||||
# 这里为了简化,直接读取文件,但写入时使用 _execute_shell_command
|
||||
with open(fstab_path, 'r') as f:
|
||||
fstab_content = f.readlines()
|
||||
|
||||
for line in fstab_content:
|
||||
if f"UUID={uuid}" in line:
|
||||
logger.warning(f"设备 {device_path} (UUID={uuid}) 已存在于 fstab 中。跳过添加。")
|
||||
QMessageBox.information(None, "信息", f"设备 {device_path} (UUID={uuid}) 已存在于 fstab 中。")
|
||||
return True # 认为成功,因为目标已达成
|
||||
|
||||
# 如果不存在,则追加到 fstab
|
||||
# 使用 _execute_shell_command for sudo write
|
||||
success, _, stderr = self._execute_shell_command(
|
||||
["sh", "-c", f"echo '{fstab_entry}' >> {fstab_path}"],
|
||||
f"将 {device_path} 添加到 {fstab_path} 失败",
|
||||
root_privilege=True
|
||||
)
|
||||
if success:
|
||||
logger.info(f"已将 {fstab_entry} 添加到 {fstab_path}。")
|
||||
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功添加到 /etc/fstab。")
|
||||
return True
|
||||
else:
|
||||
return False # Error handled by _execute_shell_command
|
||||
except Exception as e:
|
||||
logger.error(f"处理 {fstab_path} 失败: {e}")
|
||||
QMessageBox.critical(None, "错误", f"处理 {fstab_path} 失败: {e}")
|
||||
return False
|
||||
|
||||
def _remove_fstab_entry(self, device_path):
|
||||
"""
|
||||
从 /etc/fstab 中移除指定设备的条目。
|
||||
此方法需要 SystemInfoManager 来获取设备的 UUID。
|
||||
"""
|
||||
# NEW: 使用 system_manager 获取 UUID,它会处理路径解析
|
||||
device_details = self.system_manager.get_device_details_by_path(device_path)
|
||||
if not device_details or not device_details.get('uuid'):
|
||||
logger.warning(f"无法获取设备 {device_path} 的 UUID,无法从 fstab 中移除。")
|
||||
return False
|
||||
uuid = device_details.get('uuid')
|
||||
|
||||
fstab_path = "/etc/fstab"
|
||||
# Use sed for robust removal with sudo
|
||||
# suppress_critical_dialog_on_stderr_match is added to handle cases where fstab might not exist
|
||||
# or sed reports no changes, which is not a critical error for removal.
|
||||
command = ["sed", "-i", f"/UUID={uuid}/d", fstab_path]
|
||||
success, _, stderr = self._execute_shell_command(
|
||||
command,
|
||||
f"从 {fstab_path} 中删除 UUID={uuid} 的条目失败",
|
||||
root_privilege=True,
|
||||
suppress_critical_dialog_on_stderr_match=(
|
||||
f"sed: {fstab_path}: No such file or directory", # English
|
||||
f"sed: {fstab_path}: 没有那个文件或目录", # Chinese
|
||||
"no changes were made" # if sed finds nothing to delete
|
||||
)
|
||||
)
|
||||
if success:
|
||||
logger.info(f"已从 {fstab_path} 中删除 UUID={uuid} 的条目。")
|
||||
return True
|
||||
else:
|
||||
# If sed failed, it might be because the entry wasn't found, which is fine.
|
||||
# _execute_shell_command would have suppressed the dialog if it matched the suppress_critical_dialog_on_stderr_match.
|
||||
# So, if we reach here, it's either a real error (dialog shown by _execute_shell_command)
|
||||
# or a suppressed "no changes" type of error. In both cases, if no real error, we return True.
|
||||
if any(s in stderr for s in (
|
||||
f"sed: {fstab_path}: No such file or directory",
|
||||
f"sed: {fstab_path}: 没有那个文件或目录",
|
||||
"no changes were made",
|
||||
"No such file or directory" # more general check
|
||||
)):
|
||||
logger.info(f"fstab 中未找到 UUID={uuid} 的条目,无需删除。")
|
||||
return True # Consider it a success if the entry is not there
|
||||
return False # Other errors are already handled by _execute_shell_command
|
||||
|
||||
def mount_partition(self, device_path, mount_point, add_to_fstab=False):
|
||||
"""
|
||||
挂载指定设备到指定挂载点。
|
||||
:param device_path: 要挂载的设备路径。
|
||||
:param mount_point: 挂载点。
|
||||
:param add_to_fstab: 是否添加到 /etc/fstab。
|
||||
:return: True 如果成功,否则 False。
|
||||
"""
|
||||
if not os.path.exists(mount_point):
|
||||
try:
|
||||
os.makedirs(mount_point)
|
||||
logger.info(f"创建挂载点目录: {mount_point}")
|
||||
except OSError as e:
|
||||
QMessageBox.critical(None, "错误", f"创建挂载点目录 {mount_point} 失败: {e}")
|
||||
logger.error(f"创建挂载点目录 {mount_point} 失败: {e}")
|
||||
return False
|
||||
|
||||
logger.info(f"尝试挂载设备 {device_path} 到 {mount_point}。")
|
||||
success, _, stderr = self._execute_shell_command(
|
||||
["mount", device_path, mount_point],
|
||||
f"挂载设备 {device_path} 失败"
|
||||
)
|
||||
if success:
|
||||
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功挂载到 {mount_point}。")
|
||||
if add_to_fstab:
|
||||
# NEW: 使用 self.system_manager 获取 fstype 和 UUID
|
||||
device_details = self.system_manager.get_device_details_by_path(device_path)
|
||||
if device_details:
|
||||
fstype = device_details.get('fstype')
|
||||
uuid = device_details.get('uuid')
|
||||
if fstype and uuid:
|
||||
self._add_to_fstab(device_path, mount_point, fstype, uuid)
|
||||
else:
|
||||
logger.error(f"无法获取设备 {device_path} 的文件系统类型或 UUID 以添加到 fstab。")
|
||||
QMessageBox.warning(None, "警告", f"设备 {device_path} 已挂载,但无法获取文件系统类型或 UUID 以添加到 fstab。")
|
||||
else:
|
||||
logger.error(f"无法获取设备 {device_path} 的详细信息以添加到 fstab。")
|
||||
QMessageBox.warning(None, "警告", f"设备 {device_path} 已挂载,但无法获取详细信息以添加到 fstab。")
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def unmount_partition(self, device_path, show_dialog_on_error=True):
|
||||
"""
|
||||
卸载指定设备。
|
||||
:param device_path: 要卸载的设备路径。
|
||||
:param show_dialog_on_error: 是否在发生错误时显示对话框。
|
||||
:return: True 如果成功,否则 False。
|
||||
"""
|
||||
logger.info(f"尝试卸载设备 {device_path}。")
|
||||
# 定义表示设备已未挂载的错误信息(中英文)
|
||||
# 增加了 "未指定挂载点"
|
||||
already_unmounted_errors = ("not mounted", "未挂载", "未指定挂载点")
|
||||
|
||||
# 调用 _execute_shell_command,并告诉它在遇到“已未挂载”错误时,不要弹出其自身的关键错误对话框。
|
||||
success, _, stderr = self._execute_shell_command(
|
||||
["umount", device_path],
|
||||
f"卸载设备 {device_path} 失败",
|
||||
suppress_critical_dialog_on_stderr_match=already_unmounted_errors
|
||||
)
|
||||
|
||||
if success:
|
||||
# 如果命令成功执行,则卸载成功。
|
||||
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功卸载。")
|
||||
return True
|
||||
else:
|
||||
# 如果命令失败,检查是否是因为设备已经未挂载或挂载点未指定。
|
||||
is_already_unmounted_error = any(s in stderr for s in already_unmounted_errors)
|
||||
|
||||
if is_already_unmounted_error:
|
||||
logger.info(f"设备 {device_path} 已经处于未挂载状态(或挂载点未指定),无需重复卸载。")
|
||||
# 这种情况下,操作结果符合预期(设备已是未挂载),不弹出对话框,并返回 True。
|
||||
return True
|
||||
else:
|
||||
# 对于其他类型的卸载失败(例如设备忙、权限不足等),
|
||||
# _execute_shell_command 应该已经弹出了关键错误对话框
|
||||
# (因为它没有匹配到 `already_unmounted_errors` 进行抑制)。
|
||||
# 所以,这里我们不需要再次弹出对话框,直接返回 False。
|
||||
return False
|
||||
|
||||
def get_disk_free_space_info_mib(self, disk_path, total_disk_mib):
|
||||
"""
|
||||
获取磁盘上最大的空闲空间块的起始位置 (MiB) 和大小 (MiB)。
|
||||
:param disk_path: 磁盘路径。
|
||||
:param total_disk_mib: 磁盘的总大小 (MiB),用于全新磁盘的计算。
|
||||
:return: (start_mib, size_mib) 元组。如果磁盘是全新的,返回 (0.0, total_disk_mib)。
|
||||
如果磁盘有分区表但没有空闲空间,返回 (None, None)。
|
||||
"""
|
||||
logger.debug(f"尝试获取磁盘 {disk_path} 的空闲空间信息 (MiB)。")
|
||||
# suppress_critical_dialog_on_stderr_match is added to handle cases where parted might complain about
|
||||
# an unrecognized disk label, which is expected for a fresh disk.
|
||||
success, stdout, stderr = self._execute_shell_command(
|
||||
["parted", "-s", disk_path, "unit", "MiB", "print", "free"],
|
||||
f"获取磁盘 {disk_path} 分区信息失败",
|
||||
root_privilege=True,
|
||||
suppress_critical_dialog_on_stderr_match=("无法辨识的磁盘卷标", "unrecognized disk label")
|
||||
)
|
||||
|
||||
if not success:
|
||||
# If parted failed and it wasn't due to an unrecognized label (handled by suppress_critical_dialog_on_stderr_match),
|
||||
# then _execute_shell_command would have shown a dialog.
|
||||
# We just log and return None.
|
||||
logger.error(f"获取磁盘 {disk_path} 空闲空间信息失败: {stderr}")
|
||||
return None, None
|
||||
|
||||
logger.debug(f"parted print free 命令原始输出:\n{stdout}")
|
||||
|
||||
free_spaces = []
|
||||
lines = stdout.splitlines()
|
||||
# Regex to capture StartMiB and SizeMiB from a "Free Space" line
|
||||
# It's made more flexible to match "Free Space", "空闲空间", and "可用空间"
|
||||
# Example: " 0.02MiB 8192MiB 8192MiB 可用空间"
|
||||
# We need to capture the first numeric value (Start) and the third numeric value (Size).
|
||||
free_space_line_pattern = re.compile(r'^\s*(\d+\.?\d*)MiB\s+(\d+\.?\d*)MiB\s+(\d+\.?\d*)MiB\s+(?:Free Space|空闲空间|可用空间)')
|
||||
|
||||
for line in lines:
|
||||
match = free_space_line_pattern.match(line)
|
||||
if match:
|
||||
try:
|
||||
start_mib = float(match.group(1)) # Capture StartMiB
|
||||
size_mib = float(match.group(3)) # Capture SizeMiB (the third numeric value)
|
||||
free_spaces.append({'start_mib': start_mib, 'size_mib': size_mib})
|
||||
except ValueError as ve:
|
||||
logger.warning(f"解析 parted free space 行 '{line}' 失败: {ve}")
|
||||
continue
|
||||
|
||||
if not free_spaces:
|
||||
logger.warning(f"在磁盘 {disk_path} 上未找到空闲空间。")
|
||||
return None, None
|
||||
|
||||
# 找到最大的空闲空间块
|
||||
largest_free_space = max(free_spaces, key=lambda x: x['size_mib'])
|
||||
start_mib = largest_free_space['start_mib']
|
||||
size_mib = largest_free_space['size_mib']
|
||||
|
||||
logger.debug(f"磁盘 {disk_path} 的最大空闲空间块起始于 {start_mib:.2f} MiB,大小为 {size_mib:.2f} MiB。")
|
||||
return start_mib, size_mib
|
||||
|
||||
def create_partition(self, disk_path, partition_table_type, size_gb, total_disk_mib, use_max_space):
|
||||
"""
|
||||
在指定磁盘上创建分区。
|
||||
:param disk_path: 磁盘路径 (例如 /dev/sdb)。
|
||||
:param partition_table_type: 分区表类型 ('gpt' 或 'msdos')。
|
||||
:param size_gb: 分区大小 (GB)。
|
||||
:param total_disk_mib: 磁盘总大小 (MiB)。
|
||||
:param use_max_space: 是否使用最大可用空间。
|
||||
:return: True 如果成功,否则 False。
|
||||
"""
|
||||
if not isinstance(disk_path, str) or not isinstance(partition_table_type, str):
|
||||
logger.error(f"尝试创建分区时传入无效参数。磁盘路径: {disk_path}, 分区表类型: {partition_table_type}")
|
||||
QMessageBox.critical(None, "错误", "无效的磁盘路径或分区表类型。")
|
||||
return False
|
||||
|
||||
# 1. 检查磁盘是否有分区表
|
||||
has_partition_table = False
|
||||
# Use suppress_critical_dialog_on_stderr_match for "unrecognized disk label" when checking
|
||||
success_check, stdout_check, stderr_check = self._execute_shell_command(
|
||||
["parted", "-s", disk_path, "print"],
|
||||
f"检查磁盘 {disk_path} 分区表失败",
|
||||
suppress_critical_dialog_on_stderr_match=("无法辨识的磁盘卷标", "unrecognized disk label"),
|
||||
root_privilege=True
|
||||
)
|
||||
# If success_check is False, it means _execute_shell_command encountered an error.
|
||||
# If that error was "unrecognized disk label", it was suppressed, and we treat it as no partition table.
|
||||
# If it was another error, a dialog was shown by _execute_shell_command, and we should stop.
|
||||
if not success_check:
|
||||
# Check if the failure was due to "unrecognized disk label" (which means no partition table)
|
||||
if any(s in stderr_check for s in ("无法辨识的磁盘卷标", "unrecognized disk label")):
|
||||
logger.info(f"磁盘 {disk_path} 没有可识别的分区表。")
|
||||
has_partition_table = False # Explicitly set to False
|
||||
else:
|
||||
logger.error(f"检查磁盘 {disk_path} 分区表时发生非预期的错误: {stderr_check}")
|
||||
return False # Other critical error, stop operation.
|
||||
else: # success_check is True
|
||||
if "Partition Table: unknown" not in stdout_check and "分区表:unknown" not in stdout_check:
|
||||
has_partition_table = True
|
||||
else:
|
||||
logger.info(f"parted print 报告磁盘 {disk_path} 的分区表为 'unknown'。")
|
||||
has_partition_table = False
|
||||
|
||||
|
||||
actual_start_mib_for_parted = 0.0
|
||||
|
||||
# 2. 如果没有分区表,则创建分区表
|
||||
if not has_partition_table:
|
||||
reply = QMessageBox.question(None, "确认创建分区表",
|
||||
f"磁盘 {disk_path} 没有分区表。您确定要创建 {partition_table_type} 分区表吗?"
|
||||
f"此操作将擦除磁盘上的所有数据。",
|
||||
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
|
||||
if reply == QMessageBox.No:
|
||||
logger.info(f"用户取消了在 {disk_path} 上创建分区表的操作。")
|
||||
return False
|
||||
|
||||
logger.info(f"尝试在 {disk_path} 上创建 {partition_table_type} 分区表。")
|
||||
success, _, stderr = self._execute_shell_command(
|
||||
["parted", "-s", disk_path, "mklabel", partition_table_type],
|
||||
f"创建 {partition_table_type} 分区表失败"
|
||||
)
|
||||
if not success:
|
||||
return False
|
||||
# 对于新创建分区表的磁盘,第一个分区通常从 1MiB 开始以确保兼容性和对齐
|
||||
actual_start_mib_for_parted = 1.0
|
||||
else:
|
||||
# 如果有分区表,获取下一个可用分区的起始位置
|
||||
start_mib_from_parted, _ = self.get_disk_free_space_info_mib(disk_path, total_disk_mib)
|
||||
if start_mib_from_parted is None:
|
||||
QMessageBox.critical(None, "错误", f"无法确定磁盘 {disk_path} 的分区起始位置或没有空闲空间。")
|
||||
return False
|
||||
|
||||
# 如果 parted 报告的起始位置非常接近 0 MiB (例如 0.0 MiB 或 0.02 MiB),
|
||||
# 为了安全和兼容性,也将其调整为 1.0 MiB。
|
||||
# 否则,使用 parted 报告的精确起始位置。
|
||||
if start_mib_from_parted < 1.0: # covers 0.0MiB and values like 0.017MiB (17.4kB)
|
||||
actual_start_mib_for_parted = 1.0
|
||||
else:
|
||||
actual_start_mib_for_parted = start_mib_from_parted
|
||||
|
||||
# 3. 确定分区结束位置
|
||||
if use_max_space:
|
||||
end_pos = "100%"
|
||||
size_for_log = "最大可用空间"
|
||||
else:
|
||||
# 计算结束 MiB
|
||||
# 注意:这里计算的 end_mib 是基于用户请求的大小,
|
||||
# parted 会根据实际可用空间和对齐进行微调。
|
||||
end_mib = actual_start_mib_for_parted + size_gb * 1024
|
||||
|
||||
# 简单检查,如果请求的大小导致结束位置超出磁盘总大小,则使用最大可用
|
||||
if end_mib > total_disk_mib:
|
||||
QMessageBox.warning(None, "警告", f"请求的分区大小 ({size_gb}GB) 超出了可用空间。将调整为最大可用空间。")
|
||||
end_pos = "100%"
|
||||
size_for_log = "最大可用空间"
|
||||
else:
|
||||
end_pos = f"{end_mib}MiB"
|
||||
size_for_log = f"{size_gb}GB"
|
||||
|
||||
# 4. 创建分区
|
||||
create_cmd = ["parted", "-s", disk_path, "mkpart", "primary", f"{actual_start_mib_for_parted}MiB", end_pos]
|
||||
logger.info(f"尝试在 {disk_path} 上创建 {size_for_log} 的主分区。命令: {' '.join(create_cmd)}")
|
||||
success, _, stderr = self._execute_shell_command(
|
||||
create_cmd,
|
||||
f"在 {disk_path} 上创建分区失败"
|
||||
)
|
||||
if success:
|
||||
QMessageBox.information(None, "成功", f"在 {disk_path} 上成功创建了 {size_for_log} 的分区。")
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def delete_partition(self, device_path):
|
||||
"""
|
||||
删除指定分区。
|
||||
:param device_path: 要删除的分区路径。
|
||||
:return: True 如果成功,否则 False。
|
||||
"""
|
||||
reply = QMessageBox.question(None, "确认删除分区",
|
||||
f"您确定要删除分区 {device_path} 吗?此操作将擦除分区上的所有数据!",
|
||||
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
|
||||
if reply == QMessageBox.No:
|
||||
logger.info(f"用户取消了删除分区 {device_path} 的操作。")
|
||||
return False
|
||||
|
||||
# 尝试卸载分区
|
||||
# The unmount_partition method now handles "not mounted" gracefully without dialog and returns True.
|
||||
# So, we just call it.
|
||||
self.unmount_partition(device_path, show_dialog_on_error=False) # show_dialog_on_error=False means no dialog for other errors too.
|
||||
|
||||
# 从 fstab 中移除条目
|
||||
# _remove_fstab_entry also handles "not found" gracefully.
|
||||
self._remove_fstab_entry(device_path)
|
||||
|
||||
# 获取父磁盘和分区号
|
||||
# 例如 /dev/sdb1 -> /dev/sdb, 1
|
||||
match = re.match(r'(/dev/[a-z]+)(\d+)', device_path)
|
||||
if not match:
|
||||
QMessageBox.critical(None, "错误", f"无法解析设备路径 {device_path}。")
|
||||
logger.error(f"无法解析设备路径 {device_path}。")
|
||||
return False
|
||||
|
||||
disk_path = match.group(1)
|
||||
partition_number = match.group(2)
|
||||
|
||||
logger.info(f"尝试删除分区 {device_path} (磁盘: {disk_path}, 分区号: {partition_number})。")
|
||||
success, _, stderr = self._execute_shell_command(
|
||||
["parted", "-s", disk_path, "rm", partition_number],
|
||||
f"删除分区 {device_path} 失败"
|
||||
)
|
||||
if success:
|
||||
QMessageBox.information(None, "成功", f"分区 {device_path} 已成功删除。")
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
def format_partition(self, device_path, fstype=None):
|
||||
"""
|
||||
格式化指定分区。
|
||||
:param device_path: 要格式化的分区路径。
|
||||
:param fstype: 文件系统类型 (例如 'ext4', 'xfs', 'fat32', 'ntfs')。如果为 None,则弹出对话框让用户选择。
|
||||
:return: True 如果成功,否则 False。
|
||||
"""
|
||||
if fstype is None:
|
||||
items = ("ext4", "xfs", "fat32", "ntfs")
|
||||
fstype, ok = QInputDialog.getItem(None, "选择文件系统", "请选择要使用的文件系统类型:", items, 0, False)
|
||||
if not ok or not fstype:
|
||||
logger.info("用户取消了文件系统选择。")
|
||||
return False
|
||||
|
||||
reply = QMessageBox.question(None, "确认格式化分区",
|
||||
f"您确定要将分区 {device_path} 格式化为 {fstype} 吗?此操作将擦除分区上的所有数据!",
|
||||
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
|
||||
if reply == QMessageBox.No:
|
||||
logger.info(f"用户取消了格式化分区 {device_path} 的操作。")
|
||||
return False
|
||||
|
||||
# 尝试卸载分区
|
||||
self.unmount_partition(device_path, show_dialog_on_error=False) # 静默卸载
|
||||
|
||||
# 从 fstab 中移除条目 (因为格式化会改变 UUID)
|
||||
self._remove_fstab_entry(device_path)
|
||||
|
||||
logger.info(f"尝试将分区 {device_path} 格式化为 {fstype}。")
|
||||
format_cmd = []
|
||||
if fstype == "ext4":
|
||||
format_cmd = ["mkfs.ext4", "-F", device_path] # -F 强制执行
|
||||
elif fstype == "xfs":
|
||||
format_cmd = ["mkfs.xfs", "-f", device_path] # -f 强制执行
|
||||
elif fstype == "fat32":
|
||||
format_cmd = ["mkfs.fat", "-F", "32", device_path]
|
||||
elif fstype == "ntfs":
|
||||
format_cmd = ["mkfs.ntfs", "-f", device_path]
|
||||
else:
|
||||
QMessageBox.critical(None, "错误", f"不支持的文件系统类型: {fstype}")
|
||||
logger.error(f"不支持的文件系统类型: {fstype}")
|
||||
return False
|
||||
|
||||
success, _, stderr = self._execute_shell_command(
|
||||
format_cmd,
|
||||
f"格式化分区 {device_path} 失败"
|
||||
)
|
||||
if success:
|
||||
QMessageBox.information(None, "成功", f"分区 {device_path} 已成功格式化为 {fstype}。")
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
@@ -1,3 +1,4 @@
|
||||
# lvm_operations.py
|
||||
import subprocess
|
||||
import logging
|
||||
from PySide6.QtWidgets import QMessageBox
|
||||
@@ -6,10 +7,11 @@ logger = logging.getLogger(__name__)
|
||||
|
||||
class LvmOperations:
|
||||
def __init__(self):
|
||||
pass # LVM操作不直接依赖SystemInfoManager,通过参数传递所需信息
|
||||
pass
|
||||
|
||||
def _execute_shell_command(self, command_list, error_message, root_privilege=True,
|
||||
suppress_critical_dialog_on_stderr_match=None, input_data=None):
|
||||
suppress_critical_dialog_on_stderr_match=None, input_data=None,
|
||||
show_dialog=True): # <--- 新增 show_dialog 参数,默认为 True
|
||||
"""
|
||||
通用地运行一个 shell 命令,并处理错误。
|
||||
:param command_list: 命令及其参数的列表。
|
||||
@@ -18,11 +20,13 @@ class LvmOperations:
|
||||
:param suppress_critical_dialog_on_stderr_match: 如果 stderr 包含此字符串,则不显示关键错误对话框。
|
||||
可以是字符串或字符串元组/列表。
|
||||
:param input_data: 传递给命令stdin的数据 (str)。
|
||||
:param show_dialog: 如果为 False,则不显示关键错误对话框。
|
||||
:return: (True/False, stdout_str, stderr_str)
|
||||
"""
|
||||
if not all(isinstance(arg, str) for arg in command_list):
|
||||
logger.error(f"命令列表包含非字符串元素: {command_list}")
|
||||
QMessageBox.critical(None, "错误", f"内部错误:尝试执行的命令包含无效参数。\n命令详情: {command_list}")
|
||||
if show_dialog: # <--- 根据 show_dialog 决定是否弹出对话框
|
||||
QMessageBox.critical(None, "错误", f"内部错误:尝试执行的命令包含无效参数。\n命令详情: {command_list}")
|
||||
return False, "", "内部错误:命令参数类型不正确。"
|
||||
|
||||
if root_privilege:
|
||||
@@ -49,7 +53,6 @@ class LvmOperations:
|
||||
logger.error(f"标准输出: {e.stdout.strip()}")
|
||||
logger.error(f"标准错误: {stderr_output}")
|
||||
|
||||
# --- 修改开始:处理 suppress_critical_dialog_on_stderr_match 可以是字符串或元组/列表 ---
|
||||
should_suppress_dialog = False
|
||||
if suppress_critical_dialog_on_stderr_match:
|
||||
if isinstance(suppress_critical_dialog_on_stderr_match, str):
|
||||
@@ -59,24 +62,25 @@ class LvmOperations:
|
||||
for pattern in suppress_critical_dialog_on_stderr_match:
|
||||
if pattern in stderr_output:
|
||||
should_suppress_dialog = True
|
||||
break # 找到一个匹配就足够了
|
||||
break
|
||||
|
||||
if should_suppress_dialog:
|
||||
logger.info(f"错误信息 '{stderr_output}' 匹配抑制条件,不显示关键错误对话框。")
|
||||
else:
|
||||
if show_dialog and not should_suppress_dialog: # <--- 根据 show_dialog 决定是否弹出对话框
|
||||
QMessageBox.critical(None, "错误", f"{error_message}\n错误详情: {stderr_output}")
|
||||
# --- 修改结束 ---
|
||||
|
||||
return False, e.stdout.strip(), stderr_output
|
||||
except FileNotFoundError:
|
||||
QMessageBox.critical(None, "错误", f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。")
|
||||
if show_dialog: # <--- 根据 show_dialog 决定是否弹出对话框
|
||||
QMessageBox.critical(None, "错误", f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。")
|
||||
logger.error(f"命令 '{command_list[0]}' 未找到。")
|
||||
return False, "", f"命令 '{command_list[0]}' 未找到。"
|
||||
except Exception as e:
|
||||
QMessageBox.critical(None, "错误", f"执行命令时发生未知错误: {e}")
|
||||
if show_dialog: # <--- 根据 show_dialog 决定是否弹出对话框
|
||||
QMessageBox.critical(None, "错误", f"执行命令时发生未知错误: {e}")
|
||||
logger.error(f"执行命令 {full_cmd_str} 时发生未知错误: {e}")
|
||||
return False, "", str(e)
|
||||
|
||||
# 以下是 LvmOperations 中其他方法的代码,保持不变。
|
||||
# ... (create_pv, delete_pv, create_vg, delete_vg, create_lv, delete_lv, activate_lv, deactivate_lv) ...
|
||||
def create_pv(self, device_path):
|
||||
"""
|
||||
创建物理卷 (PV)。
|
||||
@@ -361,3 +365,4 @@ class LvmOperations:
|
||||
return True
|
||||
else:
|
||||
return False
|
||||
|
||||
|
||||
@@ -1,3 +1,4 @@
|
||||
# mainwindow.py
|
||||
import sys
|
||||
import logging
|
||||
import re
|
||||
@@ -5,7 +6,7 @@ import os
|
||||
|
||||
from PySide6.QtWidgets import (QApplication, QMainWindow, QTreeWidgetItem,
|
||||
QMessageBox, QHeaderView, QMenu, QInputDialog, QDialog)
|
||||
from PySide6.QtCore import Qt, QPoint
|
||||
from PySide6.QtCore import Qt, QPoint, QThread # <--- 确保导入 QThread
|
||||
|
||||
# 导入自动生成的 UI 文件
|
||||
from ui_form import Ui_MainWindow
|
||||
@@ -35,9 +36,11 @@ class MainWindow(QMainWindow):
|
||||
|
||||
# 初始化管理器和操作类
|
||||
self.system_manager = SystemInfoManager()
|
||||
self.disk_ops = DiskOperations(self.system_manager)
|
||||
self.lvm_ops = LvmOperations() # <--- 先初始化 LvmOperations
|
||||
# <--- 将 lvm_ops 实例传递给 DiskOperations
|
||||
self.disk_ops = DiskOperations(self.system_manager, self.lvm_ops)
|
||||
self.raid_ops = RaidOperations()
|
||||
self.lvm_ops = LvmOperations() # LvmOperations 包含通用的 _execute_shell_command
|
||||
# self.lvm_ops = LvmOperations() # 这一行是重复的,可以删除
|
||||
|
||||
# 连接刷新按钮的信号到槽函数
|
||||
if hasattr(self.ui, 'refreshButton'):
|
||||
@@ -55,6 +58,11 @@ class MainWindow(QMainWindow):
|
||||
self.ui.treeWidget_lvm.setContextMenuPolicy(Qt.CustomContextMenu)
|
||||
self.ui.treeWidget_lvm.customContextMenuRequested.connect(self.show_lvm_context_menu)
|
||||
|
||||
# <--- 新增: 连接 DiskOperations 的格式化完成信号
|
||||
self.disk_ops.formatting_finished.connect(self.on_disk_formatting_finished)
|
||||
# 可选:连接格式化开始信号,用于显示进度或禁用相关操作
|
||||
# self.disk_ops.formatting_started.connect(self.on_disk_formatting_started)
|
||||
|
||||
# 初始化时刷新所有数据
|
||||
self.refresh_all_info()
|
||||
logger.info("所有设备信息已初始化加载。")
|
||||
@@ -195,6 +203,7 @@ class MainWindow(QMainWindow):
|
||||
|
||||
logger.warning(f"尝试擦除 {device_path} 上的分区表。")
|
||||
# 使用 LvmOperations 中通用的 _execute_shell_command 来执行 parted 命令
|
||||
# 这里不需要 show_dialog=False,因为这个操作本身就是同步且需要用户确认的
|
||||
success, _, stderr = self.lvm_ops._execute_shell_command(
|
||||
["parted", "-s", device_path, "mklabel", "gpt"], # 默认使用 gpt 分区表类型
|
||||
f"擦除 {device_path} 上的分区表失败"
|
||||
@@ -290,8 +299,21 @@ class MainWindow(QMainWindow):
|
||||
self.refresh_all_info()
|
||||
|
||||
def _handle_format_partition(self, device_path):
|
||||
if self.disk_ops.format_partition(device_path):
|
||||
self.refresh_all_info()
|
||||
"""
|
||||
调用 DiskOperations 的异步格式化方法。
|
||||
"""
|
||||
# DiskOperations.format_partition 会处理用户确认和启动后台线程
|
||||
self.disk_ops.format_partition(device_path)
|
||||
# 界面刷新将在格式化完成后由 on_disk_formatting_finished 槽函数触发,所以这里不需要 refresh_all_info()
|
||||
|
||||
# <--- 新增: 格式化完成后的槽函数
|
||||
def on_disk_formatting_finished(self, success, device_path, stdout, stderr):
|
||||
"""
|
||||
接收 DiskOperations 发出的格式化完成信号,并刷新界面。
|
||||
"""
|
||||
logger.info(f"格式化完成信号接收: 设备 {device_path}, 成功: {success}")
|
||||
# QMessageBox 已经在 DiskOperations 的 _on_formatting_finished 中处理
|
||||
self.refresh_all_info() # 刷新所有信息以显示更新后的文件系统类型等
|
||||
|
||||
# --- RAID 管理 Tab ---
|
||||
def refresh_raid_info(self):
|
||||
@@ -424,8 +446,12 @@ class MainWindow(QMainWindow):
|
||||
self.refresh_all_info()
|
||||
|
||||
def _handle_format_raid_array(self, array_path):
|
||||
if self.disk_ops.format_partition(array_path):
|
||||
self.refresh_all_info()
|
||||
"""
|
||||
处理 RAID 阵列的格式化。
|
||||
"""
|
||||
# 这将调用 DiskOperations 的异步格式化方法
|
||||
self.disk_ops.format_partition(array_path)
|
||||
# 刷新操作会在 on_disk_formatting_finished 中触发,所以这里不需要 refresh_all_info()
|
||||
|
||||
# --- LVM 管理 Tab ---
|
||||
def refresh_lvm_info(self):
|
||||
|
||||
@@ -1,762 +0,0 @@
|
||||
# mainwindow.py
|
||||
import sys
|
||||
import logging
|
||||
import re
|
||||
import os
|
||||
|
||||
from PySide6.QtWidgets import (QApplication, QMainWindow, QTreeWidgetItem,
|
||||
QMessageBox, QHeaderView, QMenu, QInputDialog, QDialog)
|
||||
from PySide6.QtCore import Qt, QPoint
|
||||
|
||||
# 导入自动生成的 UI 文件
|
||||
from ui_form import Ui_MainWindow
|
||||
# 导入我们自己编写的系统信息管理模块
|
||||
from system_info import SystemInfoManager
|
||||
# 导入日志配置
|
||||
from logger_config import setup_logging, logger
|
||||
# 导入磁盘操作模块
|
||||
from disk_operations import DiskOperations
|
||||
# 导入 RAID 操作模块
|
||||
from raid_operations import RaidOperations
|
||||
# 导入 LVM 操作模块
|
||||
from lvm_operations import LvmOperations
|
||||
# 导入自定义对话框
|
||||
from dialogs import (CreatePartitionDialog, MountDialog, CreateRaidDialog,
|
||||
CreatePvDialog, CreateVgDialog, CreateLvDialog)
|
||||
|
||||
|
||||
class MainWindow(QMainWindow):
|
||||
def __init__(self, parent=None):
|
||||
super().__init__(parent)
|
||||
self.ui = Ui_MainWindow()
|
||||
self.ui.setupUi(self)
|
||||
|
||||
setup_logging(self.ui.logOutputTextEdit)
|
||||
logger.info("应用程序启动。")
|
||||
|
||||
# 初始化管理器和操作类
|
||||
self.system_manager = SystemInfoManager()
|
||||
self.disk_ops = DiskOperations(self.system_manager)
|
||||
self.raid_ops = RaidOperations()
|
||||
self.lvm_ops = LvmOperations() # LvmOperations 包含通用的 _execute_shell_command
|
||||
|
||||
# 连接刷新按钮的信号到槽函数
|
||||
if hasattr(self.ui, 'refreshButton'):
|
||||
self.ui.refreshButton.clicked.connect(self.refresh_all_info)
|
||||
else:
|
||||
logger.warning("Warning: refreshButton not found in UI. Please add it in form.ui and regenerate ui_form.py.")
|
||||
|
||||
# 启用 treeWidget 的自定义上下文菜单
|
||||
self.ui.treeWidget_block_devices.setContextMenuPolicy(Qt.CustomContextMenu)
|
||||
self.ui.treeWidget_block_devices.customContextMenuRequested.connect(self.show_block_device_context_menu)
|
||||
|
||||
self.ui.treeWidget_raid.setContextMenuPolicy(Qt.CustomContextMenu)
|
||||
self.ui.treeWidget_raid.customContextMenuRequested.connect(self.show_raid_context_menu)
|
||||
|
||||
self.ui.treeWidget_lvm.setContextMenuPolicy(Qt.CustomContextMenu)
|
||||
self.ui.treeWidget_lvm.customContextMenuRequested.connect(self.show_lvm_context_menu)
|
||||
|
||||
# 初始化时刷新所有数据
|
||||
self.refresh_all_info()
|
||||
logger.info("所有设备信息已初始化加载。")
|
||||
|
||||
def refresh_all_info(self):
|
||||
"""
|
||||
刷新所有设备信息:块设备、RAID和LVM。
|
||||
"""
|
||||
logger.info("开始刷新所有设备信息...")
|
||||
self.refresh_block_devices_info()
|
||||
self.refresh_raid_info()
|
||||
self.refresh_lvm_info()
|
||||
logger.info("所有设备信息刷新完成。")
|
||||
|
||||
# --- 块设备概览 Tab ---
|
||||
def refresh_block_devices_info(self):
|
||||
self.ui.treeWidget_block_devices.clear()
|
||||
|
||||
columns = [
|
||||
("设备名", 'name'), ("类型", 'type'), ("大小", 'size'), ("挂载点", 'mountpoint'),
|
||||
("文件系统", 'fstype'), ("只读", 'ro'), ("UUID", 'uuid'), ("PARTUUID", 'partuuid'),
|
||||
("厂商", 'vendor'), ("型号", 'model'), ("序列号", 'serial'),
|
||||
("主次号", 'maj:min'), ("父设备名", 'pkname'),
|
||||
]
|
||||
|
||||
headers = [col[0] for col in columns]
|
||||
self.field_keys = [col[1] for col in columns]
|
||||
|
||||
self.ui.treeWidget_block_devices.setColumnCount(len(headers))
|
||||
self.ui.treeWidget_block_devices.setHeaderLabels(headers)
|
||||
|
||||
for i in range(len(headers)):
|
||||
self.ui.treeWidget_block_devices.header().setSectionResizeMode(i, QHeaderView.ResizeToContents)
|
||||
|
||||
try:
|
||||
devices = self.system_manager.get_block_devices()
|
||||
for dev in devices:
|
||||
self._add_device_to_tree(self.ui.treeWidget_block_devices, dev)
|
||||
|
||||
for i in range(len(headers)):
|
||||
self.ui.treeWidget_block_devices.resizeColumnToContents(i)
|
||||
logger.info("块设备信息刷新成功。")
|
||||
|
||||
except Exception as e:
|
||||
QMessageBox.critical(self, "错误", f"刷新块设备信息失败: {e}")
|
||||
logger.error(f"刷新块设备信息失败: {e}")
|
||||
|
||||
def _add_device_to_tree(self, parent_item, dev_data):
|
||||
item = QTreeWidgetItem(parent_item)
|
||||
for i, key in enumerate(self.field_keys):
|
||||
value = dev_data.get(key)
|
||||
if key == 'ro':
|
||||
item.setText(i, "是" if value else "否")
|
||||
elif value is None:
|
||||
item.setText(i, "")
|
||||
else:
|
||||
item.setText(i, str(value))
|
||||
|
||||
item.setData(0, Qt.UserRole, dev_data)
|
||||
|
||||
if 'children' in dev_data:
|
||||
for child in dev_data['children']:
|
||||
self._add_device_to_tree(item, child)
|
||||
item.setExpanded(True)
|
||||
|
||||
def show_block_device_context_menu(self, pos: QPoint):
|
||||
item = self.ui.treeWidget_block_devices.itemAt(pos)
|
||||
menu = QMenu(self)
|
||||
|
||||
create_menu = QMenu("创建...", self)
|
||||
create_raid_action = create_menu.addAction("创建 RAID 阵列...")
|
||||
create_raid_action.triggered.connect(self._handle_create_raid_array)
|
||||
create_pv_action = create_menu.addAction("创建物理卷 (PV)...")
|
||||
create_pv_action.triggered.connect(self._handle_create_pv)
|
||||
menu.addMenu(create_menu)
|
||||
menu.addSeparator()
|
||||
|
||||
if item:
|
||||
dev_data = item.data(0, Qt.UserRole)
|
||||
if not dev_data:
|
||||
logger.warning(f"无法获取设备 {item.text(0)} 的详细数据。")
|
||||
return
|
||||
|
||||
device_name = dev_data.get('name')
|
||||
device_type = dev_data.get('type')
|
||||
mount_point = dev_data.get('mountpoint')
|
||||
device_path = dev_data.get('path')
|
||||
|
||||
if not device_path:
|
||||
device_path = f"/dev/{device_name}"
|
||||
|
||||
if device_type == 'disk':
|
||||
create_partition_action = menu.addAction(f"创建分区 {device_path}...")
|
||||
create_partition_action.triggered.connect(lambda: self._handle_create_partition(device_path, dev_data))
|
||||
# --- 新增功能:擦除分区表 ---
|
||||
wipe_action = menu.addAction(f"擦除分区表 {device_path}...")
|
||||
wipe_action.triggered.connect(lambda: self._handle_wipe_partition_table(device_path))
|
||||
# --- 新增功能结束 ---
|
||||
menu.addSeparator()
|
||||
|
||||
if device_type == 'part':
|
||||
if not mount_point or mount_point == '' or mount_point == 'N/A':
|
||||
mount_action = menu.addAction(f"挂载 {device_path}...")
|
||||
mount_action.triggered.connect(lambda: self._handle_mount(device_path))
|
||||
elif mount_point != '[SWAP]':
|
||||
unmount_action = menu.addAction(f"卸载 {device_path}")
|
||||
unmount_action.triggered.connect(lambda: self._unmount_and_refresh(device_path))
|
||||
menu.addSeparator()
|
||||
|
||||
delete_action = menu.addAction(f"删除分区 {device_path}")
|
||||
delete_action.triggered.connect(lambda: self._handle_delete_partition(device_path))
|
||||
|
||||
format_action = menu.addAction(f"格式化分区 {device_path}...")
|
||||
format_action.triggered.connect(lambda: self._handle_format_partition(device_path))
|
||||
|
||||
if menu.actions():
|
||||
menu.exec(self.ui.treeWidget_block_devices.mapToGlobal(pos))
|
||||
else:
|
||||
logger.info("右键点击了空白区域或设备没有可用的操作。")
|
||||
|
||||
# --- 新增方法:处理擦除分区表 ---
|
||||
def _handle_wipe_partition_table(self, device_path):
|
||||
"""
|
||||
处理擦除物理盘分区表的操作。
|
||||
"""
|
||||
reply = QMessageBox.question(
|
||||
self,
|
||||
"确认擦除分区表",
|
||||
f"您确定要擦除设备 {device_path} 上的所有分区表吗?\n"
|
||||
f"**此操作将永久删除设备上的所有分区和数据,且不可恢复!**\n"
|
||||
f"请谨慎操作!",
|
||||
QMessageBox.Yes | QMessageBox.No,
|
||||
QMessageBox.No
|
||||
)
|
||||
if reply == QMessageBox.No:
|
||||
logger.info(f"用户取消了擦除 {device_path} 分区表的操作。")
|
||||
return
|
||||
|
||||
logger.warning(f"尝试擦除 {device_path} 上的分区表。")
|
||||
# 使用 LvmOperations 中通用的 _execute_shell_command 来执行 parted 命令
|
||||
success, _, stderr = self.lvm_ops._execute_shell_command(
|
||||
["parted", "-s", device_path, "mklabel", "gpt"], # 默认使用 gpt 分区表类型
|
||||
f"擦除 {device_path} 上的分区表失败"
|
||||
)
|
||||
|
||||
if success:
|
||||
QMessageBox.information(self, "成功", f"设备 {device_path} 上的分区表已成功擦除。")
|
||||
self.refresh_all_info()
|
||||
else:
|
||||
# 错误信息已由 _execute_shell_command 处理并显示
|
||||
pass
|
||||
# --- 新增方法结束 ---
|
||||
|
||||
def _handle_create_partition(self, disk_path, dev_data):
|
||||
total_disk_mib = 0.0
|
||||
total_size_str = dev_data.get('size')
|
||||
logger.debug(f"尝试为磁盘 {disk_path} 创建分区。原始大小字符串: '{total_size_str}'")
|
||||
|
||||
if total_size_str:
|
||||
match = re.match(r'(\d+(\.\d+)?)\s*([KMGT]?B?)', total_size_str, re.IGNORECASE)
|
||||
if match:
|
||||
value = float(match.group(1))
|
||||
unit = match.group(3).upper() if match.group(3) else ''
|
||||
|
||||
if unit == 'KB' or unit == 'K': total_disk_mib = value / 1024
|
||||
elif unit == 'MB' or unit == 'M': total_disk_mib = value
|
||||
elif unit == 'GB' or unit == 'G': total_disk_mib = value * 1024
|
||||
elif unit == 'TB' or unit == 'T': total_disk_mib = value * 1024 * 1024
|
||||
elif unit == 'B':
|
||||
total_disk_mib = value / (1024 * 1024)
|
||||
else:
|
||||
logger.warning(f"无法识别磁盘 {disk_path} 的大小单位: '{unit}' (原始: '{total_size_str}')")
|
||||
total_disk_mib = 0.0
|
||||
logger.debug(f"解析后的磁盘总大小 (MiB): {total_disk_mib}")
|
||||
else:
|
||||
logger.warning(f"无法解析磁盘 {disk_path} 的大小字符串 '{total_size_str}'。正则表达式不匹配。")
|
||||
total_disk_mib = 0.0
|
||||
else:
|
||||
logger.warning(f"获取磁盘 {disk_path} 的大小字符串为空或None。")
|
||||
total_disk_mib = 0.0
|
||||
|
||||
if total_disk_mib <= 0.0:
|
||||
QMessageBox.critical(self, "错误", f"无法获取磁盘 {disk_path} 的有效总大小。")
|
||||
return
|
||||
|
||||
start_position_mib = 0.0
|
||||
max_available_mib = total_disk_mib
|
||||
|
||||
if dev_data.get('children'):
|
||||
logger.debug(f"磁盘 {disk_path} 存在现有分区,尝试计算下一个分区起始位置。")
|
||||
# 假设 disk_ops.get_disk_free_space_info_mib 能够正确处理
|
||||
calculated_start_mib, largest_free_space_mib = self.disk_ops.get_disk_free_space_info_mib(disk_path, total_disk_mib)
|
||||
if calculated_start_mib is None or largest_free_space_mib is None:
|
||||
QMessageBox.critical(self, "错误", f"无法确定磁盘 {disk_path} 的分区起始位置。")
|
||||
return
|
||||
start_position_mib = calculated_start_mib
|
||||
max_available_mib = largest_free_space_mib
|
||||
if max_available_mib < 0:
|
||||
max_available_mib = 0.0
|
||||
else:
|
||||
logger.debug(f"磁盘 {disk_path} 没有现有分区,假定从 0 MiB 开始,最大可用空间为整个磁盘。")
|
||||
max_available_mib = max(0.0, total_disk_mib - 1.0) # 留一点空间,避免边界问题
|
||||
start_position_mib = 1.0 # 现代分区表通常从1MB或更大偏移开始
|
||||
logger.debug(f"磁盘 {disk_path} 没有现有分区,假定从 {start_position_mib} MiB 开始,最大可用空间为 {max_available_mib} MiB。")
|
||||
|
||||
dialog = CreatePartitionDialog(self, disk_path, total_disk_mib, max_available_mib)
|
||||
if dialog.exec() == QDialog.Accepted:
|
||||
info = dialog.get_partition_info()
|
||||
if info:
|
||||
if self.disk_ops.create_partition(
|
||||
info['disk_path'],
|
||||
info['partition_table_type'],
|
||||
info['size_gb'],
|
||||
info['total_disk_mib'],
|
||||
info['use_max_space']
|
||||
):
|
||||
self.refresh_all_info()
|
||||
|
||||
def _handle_mount(self, device_path):
|
||||
dialog = MountDialog(self, device_path)
|
||||
if dialog.exec() == QDialog.Accepted:
|
||||
info = dialog.get_mount_info()
|
||||
if info:
|
||||
if self.disk_ops.mount_partition(device_path, info['mount_point'], info['add_to_fstab']):
|
||||
self.refresh_all_info()
|
||||
|
||||
def _unmount_and_refresh(self, device_path):
|
||||
if self.disk_ops.unmount_partition(device_path, show_dialog_on_error=True):
|
||||
self.refresh_all_info()
|
||||
|
||||
def _handle_delete_partition(self, device_path):
|
||||
if self.disk_ops.delete_partition(device_path):
|
||||
self.refresh_all_info()
|
||||
|
||||
def _handle_format_partition(self, device_path):
|
||||
if self.disk_ops.format_partition(device_path):
|
||||
self.refresh_all_info()
|
||||
|
||||
# --- RAID 管理 Tab ---
|
||||
def refresh_raid_info(self):
|
||||
self.ui.treeWidget_raid.clear()
|
||||
|
||||
raid_headers = [
|
||||
"阵列设备", "级别", "状态", "大小", "活动设备", "失败设备", "备用设备",
|
||||
"总设备数", "UUID", "名称", "Chunk Size", "挂载点"
|
||||
]
|
||||
self.ui.treeWidget_raid.setColumnCount(len(raid_headers))
|
||||
self.ui.treeWidget_raid.setHeaderLabels(raid_headers)
|
||||
|
||||
for i in range(len(raid_headers)):
|
||||
self.ui.treeWidget_raid.header().setSectionResizeMode(i, QHeaderView.ResizeToContents)
|
||||
|
||||
try:
|
||||
raid_arrays = self.system_manager.get_mdadm_arrays()
|
||||
if not raid_arrays:
|
||||
item = QTreeWidgetItem(self.ui.treeWidget_raid)
|
||||
item.setText(0, "未找到RAID阵列。")
|
||||
logger.info("未找到RAID阵列。")
|
||||
return
|
||||
|
||||
for array in raid_arrays:
|
||||
array_item = QTreeWidgetItem(self.ui.treeWidget_raid)
|
||||
array_path = array.get('device', 'N/A')
|
||||
if not array_path or array_path == 'N/A':
|
||||
logger.warning(f"RAID阵列 '{array.get('name', '未知')}' 的设备路径无效,跳过。")
|
||||
continue
|
||||
|
||||
current_mount_point = self.system_manager.get_mountpoint_for_device(array_path)
|
||||
|
||||
array_item.setText(0, array_path)
|
||||
array_item.setText(1, array.get('level', 'N/A'))
|
||||
array_item.setText(2, array.get('state', 'N/A'))
|
||||
array_item.setText(3, array.get('array_size', 'N/A'))
|
||||
array_item.setText(4, array.get('active_devices', 'N/A'))
|
||||
array_item.setText(5, array.get('failed_devices', 'N/A'))
|
||||
array_item.setText(6, array.get('spare_devices', 'N/A'))
|
||||
array_item.setText(7, array.get('total_devices', 'N/A'))
|
||||
array_item.setText(8, array.get('uuid', 'N/A'))
|
||||
array_item.setText(9, array.get('name', 'N/A'))
|
||||
array_item.setText(10, array.get('chunk_size', 'N/A'))
|
||||
array_item.setText(11, current_mount_point if current_mount_point else "")
|
||||
array_item.setExpanded(True)
|
||||
array_data_for_context = array.copy()
|
||||
array_data_for_context['device'] = array_path
|
||||
array_item.setData(0, Qt.UserRole, array_data_for_context)
|
||||
|
||||
for member in array.get('member_devices', []):
|
||||
member_item = QTreeWidgetItem(array_item)
|
||||
member_item.setText(0, f" {member.get('device_path', 'N/A')}")
|
||||
member_item.setText(1, f"成员: {member.get('raid_device', 'N/A')}")
|
||||
member_item.setText(2, member.get('state', 'N/A'))
|
||||
member_item.setText(3, f"Major: {member.get('major', 'N/A')}, Minor: {member.get('minor', 'N/A')}")
|
||||
|
||||
for i in range(len(raid_headers)):
|
||||
self.ui.treeWidget_raid.resizeColumnToContents(i)
|
||||
logger.info("RAID阵列信息刷新成功。")
|
||||
|
||||
except Exception as e:
|
||||
QMessageBox.critical(self, "错误", f"刷新RAID阵列信息失败: {e}")
|
||||
logger.error(f"刷新RAID阵列信息失败: {e}")
|
||||
|
||||
def show_raid_context_menu(self, pos: QPoint):
|
||||
item = self.ui.treeWidget_raid.itemAt(pos)
|
||||
menu = QMenu(self)
|
||||
|
||||
create_raid_action = menu.addAction("创建 RAID 阵列...")
|
||||
create_raid_action.triggered.connect(self._handle_create_raid_array)
|
||||
menu.addSeparator()
|
||||
|
||||
if item and item.parent() is None:
|
||||
array_data = item.data(0, Qt.UserRole)
|
||||
if not array_data:
|
||||
logger.warning(f"无法获取 RAID 阵列 {item.text(0)} 的详细数据。")
|
||||
return
|
||||
|
||||
array_path = array_data.get('device')
|
||||
member_devices = [m.get('device_path') for m in array_data.get('member_devices', [])]
|
||||
|
||||
if not array_path or array_path == 'N/A':
|
||||
logger.warning(f"RAID阵列 '{array_data.get('name', '未知')}' 的设备路径无效,无法显示操作。")
|
||||
return
|
||||
|
||||
current_mount_point = self.system_manager.get_mountpoint_for_device(array_path)
|
||||
|
||||
if current_mount_point and current_mount_point != '[SWAP]' and current_mount_point != '':
|
||||
unmount_action = menu.addAction(f"卸载 {array_path} ({current_mount_point})")
|
||||
unmount_action.triggered.connect(lambda: self._unmount_and_refresh(array_path))
|
||||
else:
|
||||
mount_action = menu.addAction(f"挂载 {array_path}...")
|
||||
mount_action.triggered.connect(lambda: self._handle_mount(array_path))
|
||||
menu.addSeparator()
|
||||
|
||||
stop_action = menu.addAction(f"停止阵列 {array_path}")
|
||||
stop_action.triggered.connect(lambda: self._handle_stop_raid_array(array_path))
|
||||
|
||||
delete_action = menu.addAction(f"删除阵列 {array_path}")
|
||||
delete_action.triggered.connect(lambda: self._handle_delete_raid_array(array_path, member_devices))
|
||||
|
||||
format_action = menu.addAction(f"格式化阵列 {array_path}...")
|
||||
format_action.triggered.connect(lambda: self._handle_format_raid_array(array_path))
|
||||
|
||||
if menu.actions():
|
||||
menu.exec(self.ui.treeWidget_raid.mapToGlobal(pos))
|
||||
else:
|
||||
logger.info("右键点击了空白区域或没有可用的RAID操作。")
|
||||
|
||||
def _handle_create_raid_array(self):
|
||||
available_devices = self.system_manager.get_unallocated_partitions()
|
||||
|
||||
if not available_devices:
|
||||
QMessageBox.warning(self, "警告", "没有可用于创建 RAID 阵列的设备。请确保有未挂载、未被LVM或RAID使用的磁盘或分区。")
|
||||
return
|
||||
|
||||
dialog = CreateRaidDialog(self, available_devices)
|
||||
if dialog.exec() == QDialog.Accepted:
|
||||
info = dialog.get_raid_info()
|
||||
if info:
|
||||
if self.raid_ops.create_raid_array(info['devices'], info['level'], info['chunk_size']):
|
||||
self.refresh_all_info()
|
||||
|
||||
def _handle_stop_raid_array(self, array_path):
|
||||
if self.raid_ops.stop_raid_array(array_path):
|
||||
self.refresh_all_info()
|
||||
|
||||
def _handle_delete_raid_array(self, array_path, member_devices):
|
||||
if self.raid_ops.delete_raid_array(array_path, member_devices):
|
||||
self.refresh_all_info()
|
||||
|
||||
def _handle_format_raid_array(self, array_path):
|
||||
if self.disk_ops.format_partition(array_path):
|
||||
self.refresh_all_info()
|
||||
|
||||
# --- LVM 管理 Tab ---
|
||||
def refresh_lvm_info(self):
|
||||
self.ui.treeWidget_lvm.clear()
|
||||
|
||||
lvm_headers = ["名称", "大小", "属性", "UUID", "关联", "空闲/已用", "路径/格式", "挂载点"]
|
||||
self.ui.treeWidget_lvm.setColumnCount(len(lvm_headers))
|
||||
self.ui.treeWidget_lvm.setHeaderLabels(lvm_headers)
|
||||
|
||||
for i in range(len(lvm_headers)):
|
||||
self.ui.treeWidget_lvm.header().setSectionResizeMode(i, QHeaderView.ResizeToContents)
|
||||
|
||||
try:
|
||||
lvm_data = self.system_manager.get_lvm_info()
|
||||
|
||||
if not lvm_data.get('pvs') and not lvm_data.get('vgs') and not lvm_data.get('lvs'):
|
||||
item = QTreeWidgetItem(self.ui.treeWidget_lvm)
|
||||
item.setText(0, "未找到LVM信息。")
|
||||
logger.info("未找到LVM信息。")
|
||||
return
|
||||
|
||||
# 物理卷 (PVs)
|
||||
pv_root_item = QTreeWidgetItem(self.ui.treeWidget_lvm)
|
||||
pv_root_item.setText(0, "物理卷 (PVs)")
|
||||
pv_root_item.setExpanded(True)
|
||||
pv_root_item.setData(0, Qt.UserRole, {'type': 'pv_root'})
|
||||
if lvm_data.get('pvs'):
|
||||
for pv in lvm_data['pvs']:
|
||||
pv_item = QTreeWidgetItem(pv_root_item)
|
||||
pv_name = pv.get('pv_name', 'N/A')
|
||||
if pv_name.startswith('/dev/'):
|
||||
pv_path = pv_name
|
||||
else:
|
||||
pv_path = f"/dev/{pv_name}" if pv_name != 'N/A' else 'N/A'
|
||||
|
||||
pv_item.setText(0, pv_name)
|
||||
pv_item.setText(1, pv.get('pv_size', 'N/A'))
|
||||
pv_item.setText(2, pv.get('pv_attr', 'N/A'))
|
||||
pv_item.setText(3, pv.get('pv_uuid', 'N/A'))
|
||||
pv_item.setText(4, f"VG: {pv.get('vg_name', 'N/A')}")
|
||||
pv_item.setText(5, f"空闲: {pv.get('pv_free', 'N/A')}")
|
||||
pv_item.setText(6, pv.get('pv_fmt', 'N/A'))
|
||||
pv_item.setText(7, "")
|
||||
pv_data_for_context = pv.copy()
|
||||
pv_data_for_context['pv_name'] = pv_path
|
||||
pv_item.setData(0, Qt.UserRole, {'type': 'pv', 'data': pv_data_for_context})
|
||||
else:
|
||||
item = QTreeWidgetItem(pv_root_item)
|
||||
item.setText(0, "未找到物理卷。")
|
||||
|
||||
# 卷组 (VGs)
|
||||
vg_root_item = QTreeWidgetItem(self.ui.treeWidget_lvm)
|
||||
vg_root_item.setText(0, "卷组 (VGs)")
|
||||
vg_root_item.setExpanded(True)
|
||||
vg_root_item.setData(0, Qt.UserRole, {'type': 'vg_root'})
|
||||
if lvm_data.get('vgs'):
|
||||
for vg in lvm_data['vgs']:
|
||||
vg_item = QTreeWidgetItem(vg_root_item)
|
||||
vg_name = vg.get('vg_name', 'N/A')
|
||||
vg_item.setText(0, vg_name)
|
||||
vg_item.setText(1, vg.get('vg_size', 'N/A'))
|
||||
vg_item.setText(2, vg.get('vg_attr', 'N/A'))
|
||||
vg_item.setText(3, vg.get('vg_uuid', 'N/A'))
|
||||
vg_item.setText(4, f"PVs: {vg.get('pv_count', 'N/A')}, LVs: {vg.get('lv_count', 'N/A')}")
|
||||
vg_item.setText(5, f"空闲: {vg.get('vg_free', 'N/A')}, 已分配: {vg.get('vg_alloc_percent', 'N/A')}%")
|
||||
vg_item.setText(6, vg.get('vg_fmt', 'N/A'))
|
||||
vg_item.setText(7, "")
|
||||
vg_data_for_context = vg.copy()
|
||||
vg_data_for_context['vg_name'] = vg_name
|
||||
vg_item.setData(0, Qt.UserRole, {'type': 'vg', 'data': vg_data_for_context})
|
||||
else:
|
||||
item = QTreeWidgetItem(vg_root_item)
|
||||
item.setText(0, "未找到卷组。")
|
||||
|
||||
# 逻辑卷 (LVs)
|
||||
lv_root_item = QTreeWidgetItem(self.ui.treeWidget_lvm)
|
||||
lv_root_item.setText(0, "逻辑卷 (LVs)")
|
||||
lv_root_item.setExpanded(True)
|
||||
lv_root_item.setData(0, Qt.UserRole, {'type': 'lv_root'})
|
||||
if lvm_data.get('lvs'):
|
||||
for lv in lvm_data['lvs']:
|
||||
lv_item = QTreeWidgetItem(lv_root_item)
|
||||
lv_name = lv.get('lv_name', 'N/A')
|
||||
vg_name = lv.get('vg_name', 'N/A')
|
||||
lv_attr = lv.get('lv_attr', '')
|
||||
|
||||
lv_path = lv.get('lv_path')
|
||||
if not lv_path or lv_path == 'N/A':
|
||||
if vg_name != 'N/A' and lv_name != 'N/A':
|
||||
lv_path = f"/dev/{vg_name}/{lv_name}"
|
||||
else:
|
||||
lv_path = 'N/A'
|
||||
|
||||
current_mount_point = self.system_manager.get_mountpoint_for_device(lv_path)
|
||||
|
||||
lv_item.setText(0, lv_name)
|
||||
lv_item.setText(1, lv.get('lv_size', 'N/A'))
|
||||
lv_item.setText(2, lv_attr)
|
||||
lv_item.setText(3, lv.get('lv_uuid', 'N/A'))
|
||||
lv_item.setText(4, f"VG: {vg_name}, Origin: {lv.get('origin', 'N/A')}")
|
||||
lv_item.setText(5, f"快照: {lv.get('snap_percent', 'N/A')}%")
|
||||
lv_item.setText(6, lv_path)
|
||||
lv_item.setText(7, current_mount_point if current_mount_point else "")
|
||||
|
||||
lv_data_for_context = lv.copy()
|
||||
lv_data_for_context['lv_path'] = lv_path
|
||||
lv_data_for_context['lv_name'] = lv_name
|
||||
lv_data_for_context['vg_name'] = vg_name
|
||||
lv_data_for_context['lv_attr'] = lv_attr
|
||||
lv_item.setData(0, Qt.UserRole, {'type': 'lv', 'data': lv_data_for_context})
|
||||
else:
|
||||
item = QTreeWidgetItem(lv_root_item)
|
||||
item.setText(0, "未找到逻辑卷。")
|
||||
|
||||
for i in range(len(lvm_headers)):
|
||||
self.ui.treeWidget_lvm.resizeColumnToContents(i)
|
||||
logger.info("LVM信息刷新成功。")
|
||||
|
||||
except Exception as e:
|
||||
QMessageBox.critical(self, "错误", f"刷新LVM信息失败: {e}")
|
||||
logger.error(f"刷新LVM信息失败: {e}")
|
||||
|
||||
def show_lvm_context_menu(self, pos: QPoint):
|
||||
item = self.ui.treeWidget_lvm.itemAt(pos)
|
||||
menu = QMenu(self)
|
||||
|
||||
create_menu = QMenu("创建...", self)
|
||||
create_pv_action = create_menu.addAction("创建物理卷 (PV)...")
|
||||
create_pv_action.triggered.connect(self._handle_create_pv)
|
||||
create_vg_action = create_menu.addAction("创建卷组 (VG)...")
|
||||
create_vg_action.triggered.connect(self._handle_create_vg)
|
||||
create_lv_action = create_menu.addAction("创建逻辑卷 (LV)...")
|
||||
create_lv_action.triggered.connect(self._handle_create_lv)
|
||||
menu.addMenu(create_menu)
|
||||
menu.addSeparator()
|
||||
|
||||
if item:
|
||||
item_data = item.data(0, Qt.UserRole)
|
||||
if not item_data:
|
||||
logger.warning(f"无法获取 LVM 项 {item.text(0)} 的详细数据。")
|
||||
return
|
||||
|
||||
item_type = item_data.get('type')
|
||||
data = item_data.get('data', {})
|
||||
|
||||
if item_type == 'pv':
|
||||
pv_name = data.get('pv_name')
|
||||
if pv_name and pv_name != 'N/A':
|
||||
delete_pv_action = menu.addAction(f"删除物理卷 {pv_name}")
|
||||
delete_pv_action.triggered.connect(lambda: self._handle_delete_pv(pv_name))
|
||||
elif item_type == 'vg':
|
||||
vg_name = data.get('vg_name')
|
||||
if vg_name and vg_name != 'N/A':
|
||||
delete_vg_action = menu.addAction(f"删除卷组 {vg_name}")
|
||||
delete_vg_action.triggered.connect(lambda: self._handle_delete_vg(vg_name))
|
||||
elif item_type == 'lv':
|
||||
lv_name = data.get('lv_name')
|
||||
vg_name = data.get('vg_name')
|
||||
lv_attr = data.get('lv_attr', '')
|
||||
lv_path = data.get('lv_path')
|
||||
|
||||
if lv_name and vg_name and lv_path and lv_path != 'N/A':
|
||||
if 'a' in lv_attr:
|
||||
deactivate_lv_action = menu.addAction(f"停用逻辑卷 {lv_name}")
|
||||
deactivate_lv_action.triggered.connect(lambda: self._handle_deactivate_lv(lv_name, vg_name))
|
||||
else:
|
||||
activate_lv_action = menu.addAction(f"激活逻辑卷 {lv_name}")
|
||||
activate_lv_action.triggered.connect(lambda: self._handle_activate_lv(lv_name, vg_name))
|
||||
|
||||
current_mount_point = self.system_manager.get_mountpoint_for_device(lv_path)
|
||||
if current_mount_point and current_mount_point != '[SWAP]' and current_mount_point != '':
|
||||
unmount_lv_action = menu.addAction(f"卸载 {lv_name} ({current_mount_point})")
|
||||
unmount_lv_action.triggered.connect(lambda: self._unmount_and_refresh(lv_path))
|
||||
else:
|
||||
mount_lv_action = menu.addAction(f"挂载 {lv_name}...")
|
||||
mount_lv_action.triggered.connect(lambda: self._handle_mount(lv_path))
|
||||
menu.addSeparator()
|
||||
|
||||
delete_lv_action = menu.addAction(f"删除逻辑卷 {lv_name}")
|
||||
delete_lv_action.triggered.connect(lambda: self._handle_delete_lv(lv_name, vg_name))
|
||||
|
||||
format_lv_action = menu.addAction(f"格式化逻辑卷 {lv_name}...")
|
||||
format_lv_action.triggered.connect(lambda: self._handle_format_partition(lv_path))
|
||||
else:
|
||||
logger.warning(f"逻辑卷 '{lv_name}' (VG: {vg_name}) 的路径无效,无法显示操作。Lv Path: {lv_path}")
|
||||
|
||||
if menu.actions():
|
||||
menu.exec(self.ui.treeWidget_lvm.mapToGlobal(pos))
|
||||
else:
|
||||
logger.info("右键点击了空白区域或没有可用的LVM操作。")
|
||||
|
||||
def _handle_create_pv(self):
|
||||
available_partitions = self.system_manager.get_unallocated_partitions()
|
||||
if not available_partitions:
|
||||
QMessageBox.warning(self, "警告", "没有可用于创建物理卷的未分配分区。")
|
||||
return
|
||||
|
||||
dialog = CreatePvDialog(self, available_partitions)
|
||||
if dialog.exec() == QDialog.Accepted:
|
||||
info = dialog.get_pv_info()
|
||||
if info:
|
||||
if self.lvm_ops.create_pv(info['device_path']):
|
||||
self.refresh_all_info()
|
||||
|
||||
def _handle_delete_pv(self, device_path):
|
||||
if self.lvm_ops.delete_pv(device_path):
|
||||
self.refresh_all_info()
|
||||
|
||||
def _handle_create_vg(self):
|
||||
lvm_info = self.system_manager.get_lvm_info()
|
||||
available_pvs = []
|
||||
for pv in lvm_info.get('pvs', []):
|
||||
pv_name = pv.get('pv_name')
|
||||
if pv_name and pv_name != 'N/A' and not pv.get('vg_name'):
|
||||
if pv_name.startswith('/dev/'):
|
||||
available_pvs.append(pv_name)
|
||||
else:
|
||||
available_pvs.append(f"/dev/{pv_name}")
|
||||
|
||||
if not available_pvs:
|
||||
QMessageBox.warning(self, "警告", "没有可用于创建卷组的物理卷。请确保有未分配给任何卷组的物理卷。")
|
||||
return
|
||||
|
||||
dialog = CreateVgDialog(self, available_pvs)
|
||||
if dialog.exec() == QDialog.Accepted:
|
||||
info = dialog.get_vg_info()
|
||||
if info:
|
||||
if self.lvm_ops.create_vg(info['vg_name'], info['pvs']):
|
||||
self.refresh_all_info()
|
||||
|
||||
def _handle_create_lv(self):
|
||||
lvm_info = self.system_manager.get_lvm_info()
|
||||
available_vgs = []
|
||||
vg_sizes = {}
|
||||
for vg in lvm_info.get('vgs', []):
|
||||
vg_name = vg.get('vg_name')
|
||||
if vg_name and vg_name != 'N/A':
|
||||
available_vgs.append(vg_name)
|
||||
|
||||
free_size_str = vg.get('vg_free', '0B').strip()
|
||||
current_vg_size_gb = 0.0
|
||||
|
||||
match = re.match(r'(\d+\.?\d*)\s*([gmktb])?', free_size_str, re.IGNORECASE)
|
||||
if match:
|
||||
value = float(match.group(1))
|
||||
unit = match.group(2).lower() if match.group(2) else ''
|
||||
|
||||
if unit == 'k':
|
||||
current_vg_size_gb = value / (1024 * 1024)
|
||||
elif unit == 'm':
|
||||
current_vg_size_gb = value / 1024
|
||||
elif unit == 'g':
|
||||
current_vg_size_gb = value
|
||||
elif unit == 't':
|
||||
current_vg_size_gb = value * 1024
|
||||
elif unit == 'b' or unit == '':
|
||||
current_vg_size_gb = value / (1024 * 1024 * 1024)
|
||||
else:
|
||||
logger.warning(f"未知LVM单位: '{unit}' for '{free_size_str}'")
|
||||
else:
|
||||
logger.warning(f"无法解析LVM空闲大小字符串: '{free_size_str}'")
|
||||
|
||||
vg_sizes[vg_name] = current_vg_size_gb
|
||||
|
||||
if not available_vgs:
|
||||
QMessageBox.warning(self, "警告", "没有可用于创建逻辑卷的卷组。")
|
||||
return
|
||||
|
||||
dialog = CreateLvDialog(self, available_vgs, vg_sizes)
|
||||
if dialog.exec() == QDialog.Accepted:
|
||||
info = dialog.get_lv_info()
|
||||
if info:
|
||||
if self.lvm_ops.create_lv(info['lv_name'], info['vg_name'], info['size_gb'], info['use_max_space']):
|
||||
self.refresh_all_info()
|
||||
|
||||
def _handle_delete_lv(self, lv_name, vg_name):
|
||||
lv_path = f"/dev/{vg_name}/{lv_name}"
|
||||
self.disk_ops.unmount_partition(lv_path, show_dialog_on_error=False)
|
||||
self.disk_ops._remove_fstab_entry(lv_path)
|
||||
|
||||
if self.lvm_ops.delete_lv(lv_name, vg_name):
|
||||
self.refresh_all_info()
|
||||
|
||||
def _handle_activate_lv(self, lv_name, vg_name):
|
||||
if self.lvm_ops.activate_lv(lv_name, vg_name):
|
||||
self.refresh_all_info()
|
||||
|
||||
def _handle_deactivate_lv(self, lv_name, vg_name):
|
||||
if self.lvm_ops.deactivate_lv(lv_name, vg_name):
|
||||
self.refresh_all_info()
|
||||
|
||||
def _handle_delete_vg(self, vg_name):
|
||||
"""
|
||||
处理删除卷组 (VG) 的操作。
|
||||
"""
|
||||
logger.info(f"尝试删除卷组 (VG) {vg_name}。")
|
||||
reply = QMessageBox.question(
|
||||
self,
|
||||
"确认删除卷组",
|
||||
f"您确定要删除卷组 {vg_name} 吗?此操作将永久删除该卷组及其所有逻辑卷和数据!",
|
||||
QMessageBox.Yes | QMessageBox.No,
|
||||
QMessageBox.No
|
||||
)
|
||||
if reply == QMessageBox.No:
|
||||
logger.info(f"用户取消了删除卷组 {vg_name} 的操作。")
|
||||
return
|
||||
|
||||
lvm_info = self.system_manager.get_lvm_info()
|
||||
vg_info = next((vg for vg in lvm_info.get('vgs', []) if vg.get('vg_name') == vg_name), None)
|
||||
|
||||
lv_count_raw = vg_info.get('lv_count', 0) if vg_info else 0
|
||||
try:
|
||||
lv_count = int(float(lv_count_raw))
|
||||
except (ValueError, TypeError):
|
||||
lv_count = 0
|
||||
|
||||
if lv_count > 0:
|
||||
QMessageBox.critical(self, "删除失败", f"卷组 {vg_name} 中仍包含逻辑卷。请先删除所有逻辑卷。")
|
||||
logger.error(f"尝试删除包含逻辑卷的卷组 {vg_name}。操作被阻止。")
|
||||
return
|
||||
|
||||
success = self.lvm_ops.delete_vg(vg_name)
|
||||
if success:
|
||||
self.refresh_all_info()
|
||||
else:
|
||||
QMessageBox.critical(self, "删除卷组失败", f"删除卷组 {vg_name} 失败,请检查日志。")
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
app = QApplication(sys.argv)
|
||||
widget = MainWindow()
|
||||
widget.show()
|
||||
sys.exit(app.exec())
|
||||
Reference in New Issue
Block a user