This commit is contained in:
zj
2026-02-03 03:27:14 +08:00
parent 10cc7d1c91
commit c6c9ece1e9
7 changed files with 160 additions and 1389 deletions

View File

@@ -1,80 +1,78 @@
# disk_operations.py
import subprocess import subprocess
import logging import logging
import re import re
import os import os
from PySide6.QtWidgets import QMessageBox, QInputDialog from PySide6.QtWidgets import QMessageBox, QInputDialog
from system_info import SystemInfoManager from PySide6.QtCore import QObject, Signal, QThread # <--- 导入 QObject, Signal, QThread
from system_info import SystemInfoManager # 确保 SystemInfoManager 已导入
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
class DiskOperations: # --- 新增: 后台格式化工作线程 ---
def __init__(self, system_manager: SystemInfoManager): # NEW: 接收 system_manager 实例 class FormatWorker(QObject):
self.system_manager = system_manager # 定义信号,用于向主线程发送格式化结果
finished = Signal(bool, str, str, str) # 成功状态, 设备路径, 标准输出, 标准错误
started = Signal(str) # 设备路径
def _execute_shell_command(self, command_list, error_message, root_privilege=True, def __init__(self, device_path, fs_type, execute_shell_command_func, parent=None):
suppress_critical_dialog_on_stderr_match=None, input_data=None): super().__init__(parent)
self.device_path = device_path
self.fs_type = fs_type
# 接收一个可调用的函数用于执行shell命令 (这里是 DiskOperations._execute_shell_command)
self._execute_shell_command_func = execute_shell_command_func
def run(self):
""" """
通用地运行一个 shell 命令,并处理错误 在单独线程中执行格式化命令
:param command_list: 命令及其参数的列表。
:param error_message: 命令失败时显示给用户的错误消息。
:param root_privilege: 如果为 True则使用 sudo 执行命令。
:param suppress_critical_dialog_on_stderr_match: 如果 stderr 包含此字符串,则不显示关键错误对话框。
可以是字符串或字符串元组。
:param input_data: 传递给命令stdin的数据 (str)。
:return: (True/False, stdout_str, stderr_str)
""" """
if not all(isinstance(arg, str) for arg in command_list): self.started.emit(self.device_path)
logger.error(f"命令列表包含非字符串元素: {command_list}") logger.info(f"后台格式化开始: 设备 {self.device_path}, 文件系统 {self.fs_type}")
QMessageBox.critical(None, "错误", f"内部错误:尝试执行的命令包含无效参数。\n命令详情: {command_list}")
return False, "", "内部错误:命令参数类型不正确。"
if root_privilege: command_list = []
command_list = ["sudo"] + command_list if self.fs_type == "ext4":
command_list = ["mkfs.ext4", "-F", self.device_path] # -F 强制执行
full_cmd_str = ' '.join(command_list) elif self.fs_type == "xfs":
logger.debug(f"执行命令: {full_cmd_str}") command_list = ["mkfs.xfs", "-f", self.device_path] # -f 强制执行
elif self.fs_type == "ntfs":
try: command_list = ["mkfs.ntfs", "-f", self.device_path] # -f 强制执行
result = subprocess.run( elif self.fs_type == "fat32":
command_list, command_list = ["mkfs.vfat", "-F", "32", self.device_path] # -F 32 指定FAT32
capture_output=True,
text=True,
check=True,
encoding='utf-8',
input=input_data
)
logger.info(f"命令成功: {full_cmd_str}")
return True, result.stdout.strip(), result.stderr.strip()
except subprocess.CalledProcessError as e:
stderr_output = e.stderr.strip()
logger.error(f"命令失败: {full_cmd_str}")
logger.error(f"退出码: {e.returncode}")
logger.error(f"标准输出: {e.stdout.strip()}")
logger.error(f"标准错误: {stderr_output}")
# Determine if the error dialog should be suppressed by this function
should_suppress_dialog_here = False
if suppress_critical_dialog_on_stderr_match:
if isinstance(suppress_critical_dialog_on_stderr_match, str):
if suppress_critical_dialog_on_stderr_match in stderr_output:
should_suppress_dialog_here = True
elif isinstance(suppress_critical_dialog_on_stderr_match, tuple):
if any(s in stderr_output for s in suppress_critical_dialog_on_stderr_match):
should_suppress_dialog_here = True
if should_suppress_dialog_here:
logger.info(f"错误信息 '{stderr_output}' 匹配抑制条件,不显示关键错误对话框。")
else: else:
QMessageBox.critical(None, "错误", f"{error_message}\n错误详情: {stderr_output}") logger.error(f"不支持的文件系统类型: {self.fs_type}")
return False, e.stdout.strip(), stderr_output self.finished.emit(False, self.device_path, "", f"不支持的文件系统类型: {self.fs_type}")
except FileNotFoundError: return
QMessageBox.critical(None, "错误", f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。")
logger.error(f"命令 '{command_list[0]}' 未找到。") # 调用传入的shell命令执行函数并禁用其内部的QMessageBox显示
return False, "", f"命令 '{command_list[0]}' 未找到。" success, stdout, stderr = self._execute_shell_command_func(
except Exception as e: command_list,
QMessageBox.critical(None, "错误", f"执行命令时发生未知错误: {e}") f"格式化设备 {self.device_path}{self.fs_type} 失败",
logger.error(f"执行命令 {full_cmd_str} 时发生未知错误: {e}") root_privilege=True,
return False, "", str(e) show_dialog=False # <--- 关键:阻止工作线程弹出 QMessageBox
)
self.finished.emit(success, self.device_path, stdout, stderr)
logger.info(f"后台格式化完成: 设备 {self.device_path}, 成功: {success}")
class DiskOperations(QObject): # <--- 继承 QObject 以便发出信号
# 定义信号,用于通知主线程格式化操作的开始和结束
formatting_finished = Signal(bool, str, str, str) # 成功状态, 设备路径, 标准输出, 标准错误
formatting_started = Signal(str) # 设备路径
def __init__(self, system_manager: SystemInfoManager, lvm_ops, parent=None): # <--- 构造函数现在接收 lvm_ops 实例
super().__init__(parent)
self.system_manager = system_manager
self._lvm_ops = lvm_ops # 保存 LvmOperations 实例,以便调用其 _execute_shell_command
self.active_format_workers = {} # 用于跟踪正在运行的格式化线程
# DiskOperations 内部的 _execute_shell_command 只是对 LvmOperations._execute_shell_command 的包装
# 这样所有 shell 命令都通过 LvmOperations 的统一入口,可以控制 show_dialog
def _execute_shell_command(self, command_list, error_message, root_privilege=True,
suppress_critical_dialog_on_stderr_match=None, input_data=None,
show_dialog=True):
return self._lvm_ops._execute_shell_command(
command_list, error_message, root_privilege, suppress_critical_dialog_on_stderr_match, input_data, show_dialog
)
def _add_to_fstab(self, device_path, mount_point, fstype, uuid): def _add_to_fstab(self, device_path, mount_point, fstype, uuid):
""" """
@@ -223,7 +221,8 @@ class DiskOperations:
success, _, stderr = self._execute_shell_command( success, _, stderr = self._execute_shell_command(
["umount", device_path], ["umount", device_path],
f"卸载设备 {device_path} 失败", f"卸载设备 {device_path} 失败",
suppress_critical_dialog_on_stderr_match=already_unmounted_errors suppress_critical_dialog_on_stderr_match=already_unmounted_errors,
show_dialog=show_dialog_on_error # <--- 将 show_dialog_on_error 传递给底层命令执行器
) )
if success: if success:
@@ -460,52 +459,67 @@ class DiskOperations:
def format_partition(self, device_path, fstype=None): def format_partition(self, device_path, fstype=None):
""" """
格式化指定分区 启动一个 QInputDialog 让用户选择文件系统类型,然后将格式化操作提交到后台线程
:param device_path: 要格式化的分区路径。 :param device_path: 要格式化的分区路径。
:param fstype: 文件系统类型 (例如 'ext4', 'xfs', 'fat32', 'ntfs')。如果为 None则弹出对话框让用户选择。 :param fstype: 文件系统类型 (例如 'ext4', 'xfs', 'fat32', 'ntfs')。如果为 None则弹出对话框让用户选择。
:return: True 如果成功,否则 False。 :return: True 如果成功启动后台任务,否则 False。
""" """
if fstype is None: if fstype is None:
items = ("ext4", "xfs", "fat32", "ntfs") items = ("ext4", "xfs", "fat32", "ntfs")
fstype, ok = QInputDialog.getItem(None, "选择文件系统", "请选择要使用的文件系统类型:", items, 0, False) fstype, ok = QInputDialog.getItem(None, "选择文件系统", f"请选择要使用的文件系统类型 for {device_path}:", items, 0, False)
if not ok or not fstype: if not ok or not fstype:
logger.info("用户取消了文件系统选择。") logger.info("用户取消了文件系统选择。")
return False return False
reply = QMessageBox.question(None, "确认格式化分区", reply = QMessageBox.question(None, "确认格式化",
f"您确定要将分区 {device_path} 格式化{fstype} 吗?此操作将擦除分区上的所有数据!", f"您确定要格式化设备 {device_path}{fstype} 吗?此操作将擦除所有数据!",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No) QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.No: if reply == QMessageBox.No:
logger.info(f"用户取消了格式化分区 {device_path} 的操作。") logger.info(f"用户取消了格式化 {device_path} 的操作。")
return False return False
# 尝试卸载分区 # 检查是否已有格式化任务正在进行
self.unmount_partition(device_path, show_dialog_on_error=False) # 静默卸载 if device_path in self.active_format_workers:
QMessageBox.warning(None, "警告", f"设备 {device_path} 正在格式化中,请勿重复操作。")
return False
# 尝试卸载分区(静默,不弹对话框)
self.unmount_partition(device_path, show_dialog_on_error=False)
# 从 fstab 中移除条目 (因为格式化会改变 UUID) # 从 fstab 中移除条目 (因为格式化会改变 UUID)
self._remove_fstab_entry(device_path) self._remove_fstab_entry(device_path)
logger.info(f"尝试将分区 {device_path} 格式化为 {fstype}") # 创建 QThread 和 FormatWorker 实例
format_cmd = [] thread = QThread()
if fstype == "ext4": # 将 self._execute_shell_command (它内部调用 LvmOperations._execute_shell_command) 传递给工作线程
format_cmd = ["mkfs.ext4", "-F", device_path] # -F 强制执行 worker = FormatWorker(device_path, fstype, self._execute_shell_command)
elif fstype == "xfs": worker.moveToThread(thread)
format_cmd = ["mkfs.xfs", "-f", device_path] # -f 强制执行
elif fstype == "fat32":
format_cmd = ["mkfs.fat", "-F", "32", device_path]
elif fstype == "ntfs":
format_cmd = ["mkfs.ntfs", "-f", device_path]
else:
QMessageBox.critical(None, "错误", f"不支持的文件系统类型: {fstype}")
logger.error(f"不支持的文件系统类型: {fstype}")
return False
success, _, stderr = self._execute_shell_command( # 连接信号和槽
format_cmd, thread.started.connect(worker.run) # 线程启动时执行 worker 的 run 方法
f"格式化分区 {device_path} 失败" worker.finished.connect(lambda s, dp, o, e: self._on_formatting_finished(s, dp, o, e)) # worker 完成时调用处理函数
) worker.finished.connect(thread.quit) # worker 完成时退出线程
if success: worker.finished.connect(worker.deleteLater) # worker 完成时自动删除 worker 对象
QMessageBox.information(None, "成功", f"分区 {device_path} 已成功格式化为 {fstype}") thread.finished.connect(thread.deleteLater) # 线程退出时自动删除线程对象
self.active_format_workers[device_path] = thread # 存储线程以便管理
self.formatting_started.emit(device_path) # 发出信号通知主界面格式化已开始
thread.start() # 启动线程
QMessageBox.information(None, "开始格式化", f"设备 {device_path} 正在后台格式化为 {fstype}。完成后将通知您。")
return True return True
def _on_formatting_finished(self, success, device_path, stdout, stderr):
"""
处理格式化工作线程完成后的结果。此槽函数在主线程中执行。
"""
if device_path in self.active_format_workers:
del self.active_format_workers[device_path] # 从跟踪列表中移除
# 在主线程中显示最终结果的消息框
if success:
QMessageBox.information(None, "格式化成功", f"设备 {device_path} 已成功格式化。")
else: else:
return False QMessageBox.critical(None, "格式化失败", f"格式化设备 {device_path} 失败。\n错误详情: {stderr}")
self.formatting_finished.emit(success, device_path, stdout, stderr) # 发出信号通知 MainWindow 刷新界面

View File

@@ -1,512 +0,0 @@
# disk_operations.py
import subprocess
import logging
import re
import os
from PySide6.QtWidgets import QMessageBox, QInputDialog
from system_info import SystemInfoManager
logger = logging.getLogger(__name__)
class DiskOperations:
def __init__(self, system_manager: SystemInfoManager): # NEW: 接收 system_manager 实例
self.system_manager = system_manager
def _execute_shell_command(self, command_list, error_message, root_privilege=True,
suppress_critical_dialog_on_stderr_match=None, input_data=None):
"""
通用地运行一个 shell 命令,并处理错误。
:param command_list: 命令及其参数的列表。
:param error_message: 命令失败时显示给用户的错误消息。
:param root_privilege: 如果为 True则使用 sudo 执行命令。
:param suppress_critical_dialog_on_stderr_match: 如果 stderr 包含此字符串,则不显示关键错误对话框。
可以是字符串或字符串元组。
:param input_data: 传递给命令stdin的数据 (str)。
:return: (True/False, stdout_str, stderr_str)
"""
if not all(isinstance(arg, str) for arg in command_list):
logger.error(f"命令列表包含非字符串元素: {command_list}")
QMessageBox.critical(None, "错误", f"内部错误:尝试执行的命令包含无效参数。\n命令详情: {command_list}")
return False, "", "内部错误:命令参数类型不正确。"
if root_privilege:
command_list = ["sudo"] + command_list
full_cmd_str = ' '.join(command_list)
logger.debug(f"执行命令: {full_cmd_str}")
try:
result = subprocess.run(
command_list,
capture_output=True,
text=True,
check=True,
encoding='utf-8',
input=input_data
)
logger.info(f"命令成功: {full_cmd_str}")
return True, result.stdout.strip(), result.stderr.strip()
except subprocess.CalledProcessError as e:
stderr_output = e.stderr.strip()
logger.error(f"命令失败: {full_cmd_str}")
logger.error(f"退出码: {e.returncode}")
logger.error(f"标准输出: {e.stdout.strip()}")
logger.error(f"标准错误: {stderr_output}")
# Determine if the error dialog should be suppressed by this function
should_suppress_dialog_here = False
if suppress_critical_dialog_on_stderr_match:
if isinstance(suppress_critical_dialog_on_stderr_match, str):
if suppress_critical_dialog_on_stderr_match in stderr_output:
should_suppress_dialog_here = True
elif isinstance(suppress_critical_dialog_on_stderr_match, tuple):
if any(s in stderr_output for s in suppress_critical_dialog_on_stderr_match):
should_suppress_dialog_here = True
if should_suppress_dialog_here:
logger.info(f"错误信息 '{stderr_output}' 匹配抑制条件,不显示关键错误对话框。")
else:
QMessageBox.critical(None, "错误", f"{error_message}\n错误详情: {stderr_output}")
return False, e.stdout.strip(), stderr_output
except FileNotFoundError:
QMessageBox.critical(None, "错误", f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。")
logger.error(f"命令 '{command_list[0]}' 未找到。")
return False, "", f"命令 '{command_list[0]}' 未找到。"
except Exception as e:
QMessageBox.critical(None, "错误", f"执行命令时发生未知错误: {e}")
logger.error(f"执行命令 {full_cmd_str} 时发生未知错误: {e}")
return False, "", str(e)
def _add_to_fstab(self, device_path, mount_point, fstype, uuid):
"""
将设备的挂载信息添加到 /etc/fstab。
使用 UUID 识别设备以提高稳定性。
"""
if not uuid or not mount_point or not fstype:
logger.error(f"无法将设备 {device_path} 添加到 fstab缺少 UUID/挂载点/文件系统类型。")
QMessageBox.warning(None, "警告", f"无法将设备 {device_path} 添加到 fstab缺少 UUID/挂载点/文件系统类型。")
return False
fstab_entry = f"UUID={uuid} {mount_point} {fstype} defaults 0 2"
fstab_path = "/etc/fstab"
try:
# 检查 fstab 中是否已存在相同 UUID 的条目
# 使用 _execute_shell_command 来读取 fstab尽管通常不需要 sudo
# 这里为了简化,直接读取文件,但写入时使用 _execute_shell_command
with open(fstab_path, 'r') as f:
fstab_content = f.readlines()
for line in fstab_content:
if f"UUID={uuid}" in line:
logger.warning(f"设备 {device_path} (UUID={uuid}) 已存在于 fstab 中。跳过添加。")
QMessageBox.information(None, "信息", f"设备 {device_path} (UUID={uuid}) 已存在于 fstab 中。")
return True # 认为成功,因为目标已达成
# 如果不存在,则追加到 fstab
# 使用 _execute_shell_command for sudo write
success, _, stderr = self._execute_shell_command(
["sh", "-c", f"echo '{fstab_entry}' >> {fstab_path}"],
f"将 {device_path} 添加到 {fstab_path} 失败",
root_privilege=True
)
if success:
logger.info(f"已将 {fstab_entry} 添加到 {fstab_path}。")
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功添加到 /etc/fstab。")
return True
else:
return False # Error handled by _execute_shell_command
except Exception as e:
logger.error(f"处理 {fstab_path} 失败: {e}")
QMessageBox.critical(None, "错误", f"处理 {fstab_path} 失败: {e}")
return False
def _remove_fstab_entry(self, device_path):
"""
从 /etc/fstab 中移除指定设备的条目。
此方法需要 SystemInfoManager 来获取设备的 UUID。
"""
# NEW: 使用 system_manager 获取 UUID它会处理路径解析
device_details = self.system_manager.get_device_details_by_path(device_path)
if not device_details or not device_details.get('uuid'):
logger.warning(f"无法获取设备 {device_path} 的 UUID无法从 fstab 中移除。")
return False
uuid = device_details.get('uuid')
fstab_path = "/etc/fstab"
# Use sed for robust removal with sudo
# suppress_critical_dialog_on_stderr_match is added to handle cases where fstab might not exist
# or sed reports no changes, which is not a critical error for removal.
command = ["sed", "-i", f"/UUID={uuid}/d", fstab_path]
success, _, stderr = self._execute_shell_command(
command,
f"从 {fstab_path} 中删除 UUID={uuid} 的条目失败",
root_privilege=True,
suppress_critical_dialog_on_stderr_match=(
f"sed: {fstab_path}: No such file or directory", # English
f"sed: {fstab_path}: 没有那个文件或目录", # Chinese
"no changes were made" # if sed finds nothing to delete
)
)
if success:
logger.info(f"已从 {fstab_path} 中删除 UUID={uuid} 的条目。")
return True
else:
# If sed failed, it might be because the entry wasn't found, which is fine.
# _execute_shell_command would have suppressed the dialog if it matched the suppress_critical_dialog_on_stderr_match.
# So, if we reach here, it's either a real error (dialog shown by _execute_shell_command)
# or a suppressed "no changes" type of error. In both cases, if no real error, we return True.
if any(s in stderr for s in (
f"sed: {fstab_path}: No such file or directory",
f"sed: {fstab_path}: 没有那个文件或目录",
"no changes were made",
"No such file or directory" # more general check
)):
logger.info(f"fstab 中未找到 UUID={uuid} 的条目,无需删除。")
return True # Consider it a success if the entry is not there
return False # Other errors are already handled by _execute_shell_command
def mount_partition(self, device_path, mount_point, add_to_fstab=False):
"""
挂载指定设备到指定挂载点。
:param device_path: 要挂载的设备路径。
:param mount_point: 挂载点。
:param add_to_fstab: 是否添加到 /etc/fstab。
:return: True 如果成功,否则 False。
"""
if not os.path.exists(mount_point):
try:
os.makedirs(mount_point)
logger.info(f"创建挂载点目录: {mount_point}")
except OSError as e:
QMessageBox.critical(None, "错误", f"创建挂载点目录 {mount_point} 失败: {e}")
logger.error(f"创建挂载点目录 {mount_point} 失败: {e}")
return False
logger.info(f"尝试挂载设备 {device_path} 到 {mount_point}。")
success, _, stderr = self._execute_shell_command(
["mount", device_path, mount_point],
f"挂载设备 {device_path} 失败"
)
if success:
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功挂载到 {mount_point}。")
if add_to_fstab:
# NEW: 使用 self.system_manager 获取 fstype 和 UUID
device_details = self.system_manager.get_device_details_by_path(device_path)
if device_details:
fstype = device_details.get('fstype')
uuid = device_details.get('uuid')
if fstype and uuid:
self._add_to_fstab(device_path, mount_point, fstype, uuid)
else:
logger.error(f"无法获取设备 {device_path} 的文件系统类型或 UUID 以添加到 fstab。")
QMessageBox.warning(None, "警告", f"设备 {device_path} 已挂载,但无法获取文件系统类型或 UUID 以添加到 fstab。")
else:
logger.error(f"无法获取设备 {device_path} 的详细信息以添加到 fstab。")
QMessageBox.warning(None, "警告", f"设备 {device_path} 已挂载,但无法获取详细信息以添加到 fstab。")
return True
else:
return False
def unmount_partition(self, device_path, show_dialog_on_error=True):
"""
卸载指定设备。
:param device_path: 要卸载的设备路径。
:param show_dialog_on_error: 是否在发生错误时显示对话框。
:return: True 如果成功,否则 False。
"""
logger.info(f"尝试卸载设备 {device_path}。")
# 定义表示设备已未挂载的错误信息(中英文)
# 增加了 "未指定挂载点"
already_unmounted_errors = ("not mounted", "未挂载", "未指定挂载点")
# 调用 _execute_shell_command并告诉它在遇到“已未挂载”错误时不要弹出其自身的关键错误对话框。
success, _, stderr = self._execute_shell_command(
["umount", device_path],
f"卸载设备 {device_path} 失败",
suppress_critical_dialog_on_stderr_match=already_unmounted_errors
)
if success:
# 如果命令成功执行,则卸载成功。
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功卸载。")
return True
else:
# 如果命令失败,检查是否是因为设备已经未挂载或挂载点未指定。
is_already_unmounted_error = any(s in stderr for s in already_unmounted_errors)
if is_already_unmounted_error:
logger.info(f"设备 {device_path} 已经处于未挂载状态(或挂载点未指定),无需重复卸载。")
# 这种情况下,操作结果符合预期(设备已是未挂载),不弹出对话框,并返回 True。
return True
else:
# 对于其他类型的卸载失败(例如设备忙、权限不足等),
# _execute_shell_command 应该已经弹出了关键错误对话框
# (因为它没有匹配到 `already_unmounted_errors` 进行抑制)。
# 所以,这里我们不需要再次弹出对话框,直接返回 False。
return False
def get_disk_free_space_info_mib(self, disk_path, total_disk_mib):
"""
获取磁盘上最大的空闲空间块的起始位置 (MiB) 和大小 (MiB)。
:param disk_path: 磁盘路径。
:param total_disk_mib: 磁盘的总大小 (MiB),用于全新磁盘的计算。
:return: (start_mib, size_mib) 元组。如果磁盘是全新的,返回 (0.0, total_disk_mib)。
如果磁盘有分区表但没有空闲空间,返回 (None, None)。
"""
logger.debug(f"尝试获取磁盘 {disk_path} 的空闲空间信息 (MiB)。")
# suppress_critical_dialog_on_stderr_match is added to handle cases where parted might complain about
# an unrecognized disk label, which is expected for a fresh disk.
success, stdout, stderr = self._execute_shell_command(
["parted", "-s", disk_path, "unit", "MiB", "print", "free"],
f"获取磁盘 {disk_path} 分区信息失败",
root_privilege=True,
suppress_critical_dialog_on_stderr_match=("无法辨识的磁盘卷标", "unrecognized disk label")
)
if not success:
# If parted failed and it wasn't due to an unrecognized label (handled by suppress_critical_dialog_on_stderr_match),
# then _execute_shell_command would have shown a dialog.
# We just log and return None.
logger.error(f"获取磁盘 {disk_path} 空闲空间信息失败: {stderr}")
return None, None
logger.debug(f"parted print free 命令原始输出:\n{stdout}")
free_spaces = []
lines = stdout.splitlines()
# Regex to capture StartMiB and SizeMiB from a "Free Space" line
# It's made more flexible to match "Free Space", "空闲空间", and "可用空间"
# Example: " 0.02MiB 8192MiB 8192MiB 可用空间"
# We need to capture the first numeric value (Start) and the third numeric value (Size).
free_space_line_pattern = re.compile(r'^\s*(\d+\.?\d*)MiB\s+(\d+\.?\d*)MiB\s+(\d+\.?\d*)MiB\s+(?:Free Space|空闲空间|可用空间)')
for line in lines:
match = free_space_line_pattern.match(line)
if match:
try:
start_mib = float(match.group(1)) # Capture StartMiB
size_mib = float(match.group(3)) # Capture SizeMiB (the third numeric value)
free_spaces.append({'start_mib': start_mib, 'size_mib': size_mib})
except ValueError as ve:
logger.warning(f"解析 parted free space 行 '{line}' 失败: {ve}")
continue
if not free_spaces:
logger.warning(f"在磁盘 {disk_path} 上未找到空闲空间。")
return None, None
# 找到最大的空闲空间块
largest_free_space = max(free_spaces, key=lambda x: x['size_mib'])
start_mib = largest_free_space['start_mib']
size_mib = largest_free_space['size_mib']
logger.debug(f"磁盘 {disk_path} 的最大空闲空间块起始于 {start_mib:.2f} MiB大小为 {size_mib:.2f} MiB。")
return start_mib, size_mib
def create_partition(self, disk_path, partition_table_type, size_gb, total_disk_mib, use_max_space):
"""
在指定磁盘上创建分区。
:param disk_path: 磁盘路径 (例如 /dev/sdb)。
:param partition_table_type: 分区表类型 ('gpt' 或 'msdos')。
:param size_gb: 分区大小 (GB)。
:param total_disk_mib: 磁盘总大小 (MiB)。
:param use_max_space: 是否使用最大可用空间。
:return: True 如果成功,否则 False。
"""
if not isinstance(disk_path, str) or not isinstance(partition_table_type, str):
logger.error(f"尝试创建分区时传入无效参数。磁盘路径: {disk_path}, 分区表类型: {partition_table_type}")
QMessageBox.critical(None, "错误", "无效的磁盘路径或分区表类型。")
return False
# 1. 检查磁盘是否有分区表
has_partition_table = False
# Use suppress_critical_dialog_on_stderr_match for "unrecognized disk label" when checking
success_check, stdout_check, stderr_check = self._execute_shell_command(
["parted", "-s", disk_path, "print"],
f"检查磁盘 {disk_path} 分区表失败",
suppress_critical_dialog_on_stderr_match=("无法辨识的磁盘卷标", "unrecognized disk label"),
root_privilege=True
)
# If success_check is False, it means _execute_shell_command encountered an error.
# If that error was "unrecognized disk label", it was suppressed, and we treat it as no partition table.
# If it was another error, a dialog was shown by _execute_shell_command, and we should stop.
if not success_check:
# Check if the failure was due to "unrecognized disk label" (which means no partition table)
if any(s in stderr_check for s in ("无法辨识的磁盘卷标", "unrecognized disk label")):
logger.info(f"磁盘 {disk_path} 没有可识别的分区表。")
has_partition_table = False # Explicitly set to False
else:
logger.error(f"检查磁盘 {disk_path} 分区表时发生非预期的错误: {stderr_check}")
return False # Other critical error, stop operation.
else: # success_check is True
if "Partition Table: unknown" not in stdout_check and "分区表unknown" not in stdout_check:
has_partition_table = True
else:
logger.info(f"parted print 报告磁盘 {disk_path} 的分区表为 'unknown'。")
has_partition_table = False
actual_start_mib_for_parted = 0.0
# 2. 如果没有分区表,则创建分区表
if not has_partition_table:
reply = QMessageBox.question(None, "确认创建分区表",
f"磁盘 {disk_path} 没有分区表。您确定要创建 {partition_table_type} 分区表吗?"
f"此操作将擦除磁盘上的所有数据。",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.No:
logger.info(f"用户取消了在 {disk_path} 上创建分区表的操作。")
return False
logger.info(f"尝试在 {disk_path} 上创建 {partition_table_type} 分区表。")
success, _, stderr = self._execute_shell_command(
["parted", "-s", disk_path, "mklabel", partition_table_type],
f"创建 {partition_table_type} 分区表失败"
)
if not success:
return False
# 对于新创建分区表的磁盘,第一个分区通常从 1MiB 开始以确保兼容性和对齐
actual_start_mib_for_parted = 1.0
else:
# 如果有分区表,获取下一个可用分区的起始位置
start_mib_from_parted, _ = self.get_disk_free_space_info_mib(disk_path, total_disk_mib)
if start_mib_from_parted is None:
QMessageBox.critical(None, "错误", f"无法确定磁盘 {disk_path} 的分区起始位置或没有空闲空间。")
return False
# 如果 parted 报告的起始位置非常接近 0 MiB (例如 0.0 MiB 或 0.02 MiB)
# 为了安全和兼容性,也将其调整为 1.0 MiB。
# 否则,使用 parted 报告的精确起始位置。
if start_mib_from_parted < 1.0: # covers 0.0MiB and values like 0.017MiB (17.4kB)
actual_start_mib_for_parted = 1.0
else:
actual_start_mib_for_parted = start_mib_from_parted
# 3. 确定分区结束位置
if use_max_space:
end_pos = "100%"
size_for_log = "最大可用空间"
else:
# 计算结束 MiB
# 注意:这里计算的 end_mib 是基于用户请求的大小,
# parted 会根据实际可用空间和对齐进行微调。
end_mib = actual_start_mib_for_parted + size_gb * 1024
# 简单检查,如果请求的大小导致结束位置超出磁盘总大小,则使用最大可用
if end_mib > total_disk_mib:
QMessageBox.warning(None, "警告", f"请求的分区大小 ({size_gb}GB) 超出了可用空间。将调整为最大可用空间。")
end_pos = "100%"
size_for_log = "最大可用空间"
else:
end_pos = f"{end_mib}MiB"
size_for_log = f"{size_gb}GB"
# 4. 创建分区
create_cmd = ["parted", "-s", disk_path, "mkpart", "primary", f"{actual_start_mib_for_parted}MiB", end_pos]
logger.info(f"尝试在 {disk_path} 上创建 {size_for_log} 的主分区。命令: {' '.join(create_cmd)}")
success, _, stderr = self._execute_shell_command(
create_cmd,
f"在 {disk_path} 上创建分区失败"
)
if success:
QMessageBox.information(None, "成功", f"在 {disk_path} 上成功创建了 {size_for_log} 的分区。")
return True
else:
return False
def delete_partition(self, device_path):
"""
删除指定分区。
:param device_path: 要删除的分区路径。
:return: True 如果成功,否则 False。
"""
reply = QMessageBox.question(None, "确认删除分区",
f"您确定要删除分区 {device_path} 吗?此操作将擦除分区上的所有数据!",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.No:
logger.info(f"用户取消了删除分区 {device_path} 的操作。")
return False
# 尝试卸载分区
# The unmount_partition method now handles "not mounted" gracefully without dialog and returns True.
# So, we just call it.
self.unmount_partition(device_path, show_dialog_on_error=False) # show_dialog_on_error=False means no dialog for other errors too.
# 从 fstab 中移除条目
# _remove_fstab_entry also handles "not found" gracefully.
self._remove_fstab_entry(device_path)
# 获取父磁盘和分区号
# 例如 /dev/sdb1 -> /dev/sdb, 1
match = re.match(r'(/dev/[a-z]+)(\d+)', device_path)
if not match:
QMessageBox.critical(None, "错误", f"无法解析设备路径 {device_path}。")
logger.error(f"无法解析设备路径 {device_path}。")
return False
disk_path = match.group(1)
partition_number = match.group(2)
logger.info(f"尝试删除分区 {device_path} (磁盘: {disk_path}, 分区号: {partition_number})。")
success, _, stderr = self._execute_shell_command(
["parted", "-s", disk_path, "rm", partition_number],
f"删除分区 {device_path} 失败"
)
if success:
QMessageBox.information(None, "成功", f"分区 {device_path} 已成功删除。")
return True
else:
return False
def format_partition(self, device_path, fstype=None):
"""
格式化指定分区。
:param device_path: 要格式化的分区路径。
:param fstype: 文件系统类型 (例如 'ext4', 'xfs', 'fat32', 'ntfs')。如果为 None则弹出对话框让用户选择。
:return: True 如果成功,否则 False。
"""
if fstype is None:
items = ("ext4", "xfs", "fat32", "ntfs")
fstype, ok = QInputDialog.getItem(None, "选择文件系统", "请选择要使用的文件系统类型:", items, 0, False)
if not ok or not fstype:
logger.info("用户取消了文件系统选择。")
return False
reply = QMessageBox.question(None, "确认格式化分区",
f"您确定要将分区 {device_path} 格式化为 {fstype} 吗?此操作将擦除分区上的所有数据!",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.No:
logger.info(f"用户取消了格式化分区 {device_path} 的操作。")
return False
# 尝试卸载分区
self.unmount_partition(device_path, show_dialog_on_error=False) # 静默卸载
# 从 fstab 中移除条目 (因为格式化会改变 UUID)
self._remove_fstab_entry(device_path)
logger.info(f"尝试将分区 {device_path} 格式化为 {fstype}。")
format_cmd = []
if fstype == "ext4":
format_cmd = ["mkfs.ext4", "-F", device_path] # -F 强制执行
elif fstype == "xfs":
format_cmd = ["mkfs.xfs", "-f", device_path] # -f 强制执行
elif fstype == "fat32":
format_cmd = ["mkfs.fat", "-F", "32", device_path]
elif fstype == "ntfs":
format_cmd = ["mkfs.ntfs", "-f", device_path]
else:
QMessageBox.critical(None, "错误", f"不支持的文件系统类型: {fstype}")
logger.error(f"不支持的文件系统类型: {fstype}")
return False
success, _, stderr = self._execute_shell_command(
format_cmd,
f"格式化分区 {device_path} 失败"
)
if success:
QMessageBox.information(None, "成功", f"分区 {device_path} 已成功格式化为 {fstype}。")
return True
else:
return False

View File

@@ -1,3 +1,4 @@
# lvm_operations.py
import subprocess import subprocess
import logging import logging
from PySide6.QtWidgets import QMessageBox from PySide6.QtWidgets import QMessageBox
@@ -6,10 +7,11 @@ logger = logging.getLogger(__name__)
class LvmOperations: class LvmOperations:
def __init__(self): def __init__(self):
pass # LVM操作不直接依赖SystemInfoManager通过参数传递所需信息 pass
def _execute_shell_command(self, command_list, error_message, root_privilege=True, def _execute_shell_command(self, command_list, error_message, root_privilege=True,
suppress_critical_dialog_on_stderr_match=None, input_data=None): suppress_critical_dialog_on_stderr_match=None, input_data=None,
show_dialog=True): # <--- 新增 show_dialog 参数,默认为 True
""" """
通用地运行一个 shell 命令,并处理错误。 通用地运行一个 shell 命令,并处理错误。
:param command_list: 命令及其参数的列表。 :param command_list: 命令及其参数的列表。
@@ -18,10 +20,12 @@ class LvmOperations:
:param suppress_critical_dialog_on_stderr_match: 如果 stderr 包含此字符串,则不显示关键错误对话框。 :param suppress_critical_dialog_on_stderr_match: 如果 stderr 包含此字符串,则不显示关键错误对话框。
可以是字符串或字符串元组/列表。 可以是字符串或字符串元组/列表。
:param input_data: 传递给命令stdin的数据 (str)。 :param input_data: 传递给命令stdin的数据 (str)。
:param show_dialog: 如果为 False则不显示关键错误对话框。
:return: (True/False, stdout_str, stderr_str) :return: (True/False, stdout_str, stderr_str)
""" """
if not all(isinstance(arg, str) for arg in command_list): if not all(isinstance(arg, str) for arg in command_list):
logger.error(f"命令列表包含非字符串元素: {command_list}") logger.error(f"命令列表包含非字符串元素: {command_list}")
if show_dialog: # <--- 根据 show_dialog 决定是否弹出对话框
QMessageBox.critical(None, "错误", f"内部错误:尝试执行的命令包含无效参数。\n命令详情: {command_list}") QMessageBox.critical(None, "错误", f"内部错误:尝试执行的命令包含无效参数。\n命令详情: {command_list}")
return False, "", "内部错误:命令参数类型不正确。" return False, "", "内部错误:命令参数类型不正确。"
@@ -49,7 +53,6 @@ class LvmOperations:
logger.error(f"标准输出: {e.stdout.strip()}") logger.error(f"标准输出: {e.stdout.strip()}")
logger.error(f"标准错误: {stderr_output}") logger.error(f"标准错误: {stderr_output}")
# --- 修改开始:处理 suppress_critical_dialog_on_stderr_match 可以是字符串或元组/列表 ---
should_suppress_dialog = False should_suppress_dialog = False
if suppress_critical_dialog_on_stderr_match: if suppress_critical_dialog_on_stderr_match:
if isinstance(suppress_critical_dialog_on_stderr_match, str): if isinstance(suppress_critical_dialog_on_stderr_match, str):
@@ -59,24 +62,25 @@ class LvmOperations:
for pattern in suppress_critical_dialog_on_stderr_match: for pattern in suppress_critical_dialog_on_stderr_match:
if pattern in stderr_output: if pattern in stderr_output:
should_suppress_dialog = True should_suppress_dialog = True
break # 找到一个匹配就足够了 break
if should_suppress_dialog: if show_dialog and not should_suppress_dialog: # <--- 根据 show_dialog 决定是否弹出对话框
logger.info(f"错误信息 '{stderr_output}' 匹配抑制条件,不显示关键错误对话框。")
else:
QMessageBox.critical(None, "错误", f"{error_message}\n错误详情: {stderr_output}") QMessageBox.critical(None, "错误", f"{error_message}\n错误详情: {stderr_output}")
# --- 修改结束 ---
return False, e.stdout.strip(), stderr_output return False, e.stdout.strip(), stderr_output
except FileNotFoundError: except FileNotFoundError:
if show_dialog: # <--- 根据 show_dialog 决定是否弹出对话框
QMessageBox.critical(None, "错误", f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。") QMessageBox.critical(None, "错误", f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。")
logger.error(f"命令 '{command_list[0]}' 未找到。") logger.error(f"命令 '{command_list[0]}' 未找到。")
return False, "", f"命令 '{command_list[0]}' 未找到。" return False, "", f"命令 '{command_list[0]}' 未找到。"
except Exception as e: except Exception as e:
if show_dialog: # <--- 根据 show_dialog 决定是否弹出对话框
QMessageBox.critical(None, "错误", f"执行命令时发生未知错误: {e}") QMessageBox.critical(None, "错误", f"执行命令时发生未知错误: {e}")
logger.error(f"执行命令 {full_cmd_str} 时发生未知错误: {e}") logger.error(f"执行命令 {full_cmd_str} 时发生未知错误: {e}")
return False, "", str(e) return False, "", str(e)
# 以下是 LvmOperations 中其他方法的代码,保持不变。
# ... (create_pv, delete_pv, create_vg, delete_vg, create_lv, delete_lv, activate_lv, deactivate_lv) ...
def create_pv(self, device_path): def create_pv(self, device_path):
""" """
创建物理卷 (PV)。 创建物理卷 (PV)。
@@ -361,3 +365,4 @@ class LvmOperations:
return True return True
else: else:
return False return False

View File

@@ -1,3 +1,4 @@
# mainwindow.py
import sys import sys
import logging import logging
import re import re
@@ -5,7 +6,7 @@ import os
from PySide6.QtWidgets import (QApplication, QMainWindow, QTreeWidgetItem, from PySide6.QtWidgets import (QApplication, QMainWindow, QTreeWidgetItem,
QMessageBox, QHeaderView, QMenu, QInputDialog, QDialog) QMessageBox, QHeaderView, QMenu, QInputDialog, QDialog)
from PySide6.QtCore import Qt, QPoint from PySide6.QtCore import Qt, QPoint, QThread # <--- 确保导入 QThread
# 导入自动生成的 UI 文件 # 导入自动生成的 UI 文件
from ui_form import Ui_MainWindow from ui_form import Ui_MainWindow
@@ -35,9 +36,11 @@ class MainWindow(QMainWindow):
# 初始化管理器和操作类 # 初始化管理器和操作类
self.system_manager = SystemInfoManager() self.system_manager = SystemInfoManager()
self.disk_ops = DiskOperations(self.system_manager) self.lvm_ops = LvmOperations() # <--- 先初始化 LvmOperations
# <--- 将 lvm_ops 实例传递给 DiskOperations
self.disk_ops = DiskOperations(self.system_manager, self.lvm_ops)
self.raid_ops = RaidOperations() self.raid_ops = RaidOperations()
self.lvm_ops = LvmOperations() # LvmOperations 包含通用的 _execute_shell_command # self.lvm_ops = LvmOperations() # 这一行是重复的,可以删除
# 连接刷新按钮的信号到槽函数 # 连接刷新按钮的信号到槽函数
if hasattr(self.ui, 'refreshButton'): if hasattr(self.ui, 'refreshButton'):
@@ -55,6 +58,11 @@ class MainWindow(QMainWindow):
self.ui.treeWidget_lvm.setContextMenuPolicy(Qt.CustomContextMenu) self.ui.treeWidget_lvm.setContextMenuPolicy(Qt.CustomContextMenu)
self.ui.treeWidget_lvm.customContextMenuRequested.connect(self.show_lvm_context_menu) self.ui.treeWidget_lvm.customContextMenuRequested.connect(self.show_lvm_context_menu)
# <--- 新增: 连接 DiskOperations 的格式化完成信号
self.disk_ops.formatting_finished.connect(self.on_disk_formatting_finished)
# 可选:连接格式化开始信号,用于显示进度或禁用相关操作
# self.disk_ops.formatting_started.connect(self.on_disk_formatting_started)
# 初始化时刷新所有数据 # 初始化时刷新所有数据
self.refresh_all_info() self.refresh_all_info()
logger.info("所有设备信息已初始化加载。") logger.info("所有设备信息已初始化加载。")
@@ -195,6 +203,7 @@ class MainWindow(QMainWindow):
logger.warning(f"尝试擦除 {device_path} 上的分区表。") logger.warning(f"尝试擦除 {device_path} 上的分区表。")
# 使用 LvmOperations 中通用的 _execute_shell_command 来执行 parted 命令 # 使用 LvmOperations 中通用的 _execute_shell_command 来执行 parted 命令
# 这里不需要 show_dialog=False因为这个操作本身就是同步且需要用户确认的
success, _, stderr = self.lvm_ops._execute_shell_command( success, _, stderr = self.lvm_ops._execute_shell_command(
["parted", "-s", device_path, "mklabel", "gpt"], # 默认使用 gpt 分区表类型 ["parted", "-s", device_path, "mklabel", "gpt"], # 默认使用 gpt 分区表类型
f"擦除 {device_path} 上的分区表失败" f"擦除 {device_path} 上的分区表失败"
@@ -290,8 +299,21 @@ class MainWindow(QMainWindow):
self.refresh_all_info() self.refresh_all_info()
def _handle_format_partition(self, device_path): def _handle_format_partition(self, device_path):
if self.disk_ops.format_partition(device_path): """
self.refresh_all_info() 调用 DiskOperations 的异步格式化方法。
"""
# DiskOperations.format_partition 会处理用户确认和启动后台线程
self.disk_ops.format_partition(device_path)
# 界面刷新将在格式化完成后由 on_disk_formatting_finished 槽函数触发,所以这里不需要 refresh_all_info()
# <--- 新增: 格式化完成后的槽函数
def on_disk_formatting_finished(self, success, device_path, stdout, stderr):
"""
接收 DiskOperations 发出的格式化完成信号,并刷新界面。
"""
logger.info(f"格式化完成信号接收: 设备 {device_path}, 成功: {success}")
# QMessageBox 已经在 DiskOperations 的 _on_formatting_finished 中处理
self.refresh_all_info() # 刷新所有信息以显示更新后的文件系统类型等
# --- RAID 管理 Tab --- # --- RAID 管理 Tab ---
def refresh_raid_info(self): def refresh_raid_info(self):
@@ -424,8 +446,12 @@ class MainWindow(QMainWindow):
self.refresh_all_info() self.refresh_all_info()
def _handle_format_raid_array(self, array_path): def _handle_format_raid_array(self, array_path):
if self.disk_ops.format_partition(array_path): """
self.refresh_all_info() 处理 RAID 阵列的格式化。
"""
# 这将调用 DiskOperations 的异步格式化方法
self.disk_ops.format_partition(array_path)
# 刷新操作会在 on_disk_formatting_finished 中触发,所以这里不需要 refresh_all_info()
# --- LVM 管理 Tab --- # --- LVM 管理 Tab ---
def refresh_lvm_info(self): def refresh_lvm_info(self):

View File

@@ -1,762 +0,0 @@
# mainwindow.py
import sys
import logging
import re
import os
from PySide6.QtWidgets import (QApplication, QMainWindow, QTreeWidgetItem,
QMessageBox, QHeaderView, QMenu, QInputDialog, QDialog)
from PySide6.QtCore import Qt, QPoint
# 导入自动生成的 UI 文件
from ui_form import Ui_MainWindow
# 导入我们自己编写的系统信息管理模块
from system_info import SystemInfoManager
# 导入日志配置
from logger_config import setup_logging, logger
# 导入磁盘操作模块
from disk_operations import DiskOperations
# 导入 RAID 操作模块
from raid_operations import RaidOperations
# 导入 LVM 操作模块
from lvm_operations import LvmOperations
# 导入自定义对话框
from dialogs import (CreatePartitionDialog, MountDialog, CreateRaidDialog,
CreatePvDialog, CreateVgDialog, CreateLvDialog)
class MainWindow(QMainWindow):
def __init__(self, parent=None):
super().__init__(parent)
self.ui = Ui_MainWindow()
self.ui.setupUi(self)
setup_logging(self.ui.logOutputTextEdit)
logger.info("应用程序启动。")
# 初始化管理器和操作类
self.system_manager = SystemInfoManager()
self.disk_ops = DiskOperations(self.system_manager)
self.raid_ops = RaidOperations()
self.lvm_ops = LvmOperations() # LvmOperations 包含通用的 _execute_shell_command
# 连接刷新按钮的信号到槽函数
if hasattr(self.ui, 'refreshButton'):
self.ui.refreshButton.clicked.connect(self.refresh_all_info)
else:
logger.warning("Warning: refreshButton not found in UI. Please add it in form.ui and regenerate ui_form.py.")
# 启用 treeWidget 的自定义上下文菜单
self.ui.treeWidget_block_devices.setContextMenuPolicy(Qt.CustomContextMenu)
self.ui.treeWidget_block_devices.customContextMenuRequested.connect(self.show_block_device_context_menu)
self.ui.treeWidget_raid.setContextMenuPolicy(Qt.CustomContextMenu)
self.ui.treeWidget_raid.customContextMenuRequested.connect(self.show_raid_context_menu)
self.ui.treeWidget_lvm.setContextMenuPolicy(Qt.CustomContextMenu)
self.ui.treeWidget_lvm.customContextMenuRequested.connect(self.show_lvm_context_menu)
# 初始化时刷新所有数据
self.refresh_all_info()
logger.info("所有设备信息已初始化加载。")
def refresh_all_info(self):
"""
刷新所有设备信息块设备、RAID和LVM。
"""
logger.info("开始刷新所有设备信息...")
self.refresh_block_devices_info()
self.refresh_raid_info()
self.refresh_lvm_info()
logger.info("所有设备信息刷新完成。")
# --- 块设备概览 Tab ---
def refresh_block_devices_info(self):
self.ui.treeWidget_block_devices.clear()
columns = [
("设备名", 'name'), ("类型", 'type'), ("大小", 'size'), ("挂载点", 'mountpoint'),
("文件系统", 'fstype'), ("只读", 'ro'), ("UUID", 'uuid'), ("PARTUUID", 'partuuid'),
("厂商", 'vendor'), ("型号", 'model'), ("序列号", 'serial'),
("主次号", 'maj:min'), ("父设备名", 'pkname'),
]
headers = [col[0] for col in columns]
self.field_keys = [col[1] for col in columns]
self.ui.treeWidget_block_devices.setColumnCount(len(headers))
self.ui.treeWidget_block_devices.setHeaderLabels(headers)
for i in range(len(headers)):
self.ui.treeWidget_block_devices.header().setSectionResizeMode(i, QHeaderView.ResizeToContents)
try:
devices = self.system_manager.get_block_devices()
for dev in devices:
self._add_device_to_tree(self.ui.treeWidget_block_devices, dev)
for i in range(len(headers)):
self.ui.treeWidget_block_devices.resizeColumnToContents(i)
logger.info("块设备信息刷新成功。")
except Exception as e:
QMessageBox.critical(self, "错误", f"刷新块设备信息失败: {e}")
logger.error(f"刷新块设备信息失败: {e}")
def _add_device_to_tree(self, parent_item, dev_data):
item = QTreeWidgetItem(parent_item)
for i, key in enumerate(self.field_keys):
value = dev_data.get(key)
if key == 'ro':
item.setText(i, "是" if value else "否")
elif value is None:
item.setText(i, "")
else:
item.setText(i, str(value))
item.setData(0, Qt.UserRole, dev_data)
if 'children' in dev_data:
for child in dev_data['children']:
self._add_device_to_tree(item, child)
item.setExpanded(True)
def show_block_device_context_menu(self, pos: QPoint):
item = self.ui.treeWidget_block_devices.itemAt(pos)
menu = QMenu(self)
create_menu = QMenu("创建...", self)
create_raid_action = create_menu.addAction("创建 RAID 阵列...")
create_raid_action.triggered.connect(self._handle_create_raid_array)
create_pv_action = create_menu.addAction("创建物理卷 (PV)...")
create_pv_action.triggered.connect(self._handle_create_pv)
menu.addMenu(create_menu)
menu.addSeparator()
if item:
dev_data = item.data(0, Qt.UserRole)
if not dev_data:
logger.warning(f"无法获取设备 {item.text(0)} 的详细数据。")
return
device_name = dev_data.get('name')
device_type = dev_data.get('type')
mount_point = dev_data.get('mountpoint')
device_path = dev_data.get('path')
if not device_path:
device_path = f"/dev/{device_name}"
if device_type == 'disk':
create_partition_action = menu.addAction(f"创建分区 {device_path}...")
create_partition_action.triggered.connect(lambda: self._handle_create_partition(device_path, dev_data))
# --- 新增功能:擦除分区表 ---
wipe_action = menu.addAction(f"擦除分区表 {device_path}...")
wipe_action.triggered.connect(lambda: self._handle_wipe_partition_table(device_path))
# --- 新增功能结束 ---
menu.addSeparator()
if device_type == 'part':
if not mount_point or mount_point == '' or mount_point == 'N/A':
mount_action = menu.addAction(f"挂载 {device_path}...")
mount_action.triggered.connect(lambda: self._handle_mount(device_path))
elif mount_point != '[SWAP]':
unmount_action = menu.addAction(f"卸载 {device_path}")
unmount_action.triggered.connect(lambda: self._unmount_and_refresh(device_path))
menu.addSeparator()
delete_action = menu.addAction(f"删除分区 {device_path}")
delete_action.triggered.connect(lambda: self._handle_delete_partition(device_path))
format_action = menu.addAction(f"格式化分区 {device_path}...")
format_action.triggered.connect(lambda: self._handle_format_partition(device_path))
if menu.actions():
menu.exec(self.ui.treeWidget_block_devices.mapToGlobal(pos))
else:
logger.info("右键点击了空白区域或设备没有可用的操作。")
# --- 新增方法:处理擦除分区表 ---
def _handle_wipe_partition_table(self, device_path):
"""
处理擦除物理盘分区表的操作。
"""
reply = QMessageBox.question(
self,
"确认擦除分区表",
f"您确定要擦除设备 {device_path} 上的所有分区表吗?\n"
f"**此操作将永久删除设备上的所有分区和数据,且不可恢复!**\n"
f"请谨慎操作!",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.No:
logger.info(f"用户取消了擦除 {device_path} 分区表的操作。")
return
logger.warning(f"尝试擦除 {device_path} 上的分区表。")
# 使用 LvmOperations 中通用的 _execute_shell_command 来执行 parted 命令
success, _, stderr = self.lvm_ops._execute_shell_command(
["parted", "-s", device_path, "mklabel", "gpt"], # 默认使用 gpt 分区表类型
f"擦除 {device_path} 上的分区表失败"
)
if success:
QMessageBox.information(self, "成功", f"设备 {device_path} 上的分区表已成功擦除。")
self.refresh_all_info()
else:
# 错误信息已由 _execute_shell_command 处理并显示
pass
# --- 新增方法结束 ---
def _handle_create_partition(self, disk_path, dev_data):
total_disk_mib = 0.0
total_size_str = dev_data.get('size')
logger.debug(f"尝试为磁盘 {disk_path} 创建分区。原始大小字符串: '{total_size_str}'")
if total_size_str:
match = re.match(r'(\d+(\.\d+)?)\s*([KMGT]?B?)', total_size_str, re.IGNORECASE)
if match:
value = float(match.group(1))
unit = match.group(3).upper() if match.group(3) else ''
if unit == 'KB' or unit == 'K': total_disk_mib = value / 1024
elif unit == 'MB' or unit == 'M': total_disk_mib = value
elif unit == 'GB' or unit == 'G': total_disk_mib = value * 1024
elif unit == 'TB' or unit == 'T': total_disk_mib = value * 1024 * 1024
elif unit == 'B':
total_disk_mib = value / (1024 * 1024)
else:
logger.warning(f"无法识别磁盘 {disk_path} 的大小单位: '{unit}' (原始: '{total_size_str}')")
total_disk_mib = 0.0
logger.debug(f"解析后的磁盘总大小 (MiB): {total_disk_mib}")
else:
logger.warning(f"无法解析磁盘 {disk_path} 的大小字符串 '{total_size_str}'。正则表达式不匹配。")
total_disk_mib = 0.0
else:
logger.warning(f"获取磁盘 {disk_path} 的大小字符串为空或None。")
total_disk_mib = 0.0
if total_disk_mib <= 0.0:
QMessageBox.critical(self, "错误", f"无法获取磁盘 {disk_path} 的有效总大小。")
return
start_position_mib = 0.0
max_available_mib = total_disk_mib
if dev_data.get('children'):
logger.debug(f"磁盘 {disk_path} 存在现有分区,尝试计算下一个分区起始位置。")
# 假设 disk_ops.get_disk_free_space_info_mib 能够正确处理
calculated_start_mib, largest_free_space_mib = self.disk_ops.get_disk_free_space_info_mib(disk_path, total_disk_mib)
if calculated_start_mib is None or largest_free_space_mib is None:
QMessageBox.critical(self, "错误", f"无法确定磁盘 {disk_path} 的分区起始位置。")
return
start_position_mib = calculated_start_mib
max_available_mib = largest_free_space_mib
if max_available_mib < 0:
max_available_mib = 0.0
else:
logger.debug(f"磁盘 {disk_path} 没有现有分区,假定从 0 MiB 开始,最大可用空间为整个磁盘。")
max_available_mib = max(0.0, total_disk_mib - 1.0) # 留一点空间,避免边界问题
start_position_mib = 1.0 # 现代分区表通常从1MB或更大偏移开始
logger.debug(f"磁盘 {disk_path} 没有现有分区,假定从 {start_position_mib} MiB 开始,最大可用空间为 {max_available_mib} MiB。")
dialog = CreatePartitionDialog(self, disk_path, total_disk_mib, max_available_mib)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_partition_info()
if info:
if self.disk_ops.create_partition(
info['disk_path'],
info['partition_table_type'],
info['size_gb'],
info['total_disk_mib'],
info['use_max_space']
):
self.refresh_all_info()
def _handle_mount(self, device_path):
dialog = MountDialog(self, device_path)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_mount_info()
if info:
if self.disk_ops.mount_partition(device_path, info['mount_point'], info['add_to_fstab']):
self.refresh_all_info()
def _unmount_and_refresh(self, device_path):
if self.disk_ops.unmount_partition(device_path, show_dialog_on_error=True):
self.refresh_all_info()
def _handle_delete_partition(self, device_path):
if self.disk_ops.delete_partition(device_path):
self.refresh_all_info()
def _handle_format_partition(self, device_path):
if self.disk_ops.format_partition(device_path):
self.refresh_all_info()
# --- RAID 管理 Tab ---
def refresh_raid_info(self):
self.ui.treeWidget_raid.clear()
raid_headers = [
"阵列设备", "级别", "状态", "大小", "活动设备", "失败设备", "备用设备",
"总设备数", "UUID", "名称", "Chunk Size", "挂载点"
]
self.ui.treeWidget_raid.setColumnCount(len(raid_headers))
self.ui.treeWidget_raid.setHeaderLabels(raid_headers)
for i in range(len(raid_headers)):
self.ui.treeWidget_raid.header().setSectionResizeMode(i, QHeaderView.ResizeToContents)
try:
raid_arrays = self.system_manager.get_mdadm_arrays()
if not raid_arrays:
item = QTreeWidgetItem(self.ui.treeWidget_raid)
item.setText(0, "未找到RAID阵列。")
logger.info("未找到RAID阵列。")
return
for array in raid_arrays:
array_item = QTreeWidgetItem(self.ui.treeWidget_raid)
array_path = array.get('device', 'N/A')
if not array_path or array_path == 'N/A':
logger.warning(f"RAID阵列 '{array.get('name', '未知')}' 的设备路径无效,跳过。")
continue
current_mount_point = self.system_manager.get_mountpoint_for_device(array_path)
array_item.setText(0, array_path)
array_item.setText(1, array.get('level', 'N/A'))
array_item.setText(2, array.get('state', 'N/A'))
array_item.setText(3, array.get('array_size', 'N/A'))
array_item.setText(4, array.get('active_devices', 'N/A'))
array_item.setText(5, array.get('failed_devices', 'N/A'))
array_item.setText(6, array.get('spare_devices', 'N/A'))
array_item.setText(7, array.get('total_devices', 'N/A'))
array_item.setText(8, array.get('uuid', 'N/A'))
array_item.setText(9, array.get('name', 'N/A'))
array_item.setText(10, array.get('chunk_size', 'N/A'))
array_item.setText(11, current_mount_point if current_mount_point else "")
array_item.setExpanded(True)
array_data_for_context = array.copy()
array_data_for_context['device'] = array_path
array_item.setData(0, Qt.UserRole, array_data_for_context)
for member in array.get('member_devices', []):
member_item = QTreeWidgetItem(array_item)
member_item.setText(0, f" {member.get('device_path', 'N/A')}")
member_item.setText(1, f"成员: {member.get('raid_device', 'N/A')}")
member_item.setText(2, member.get('state', 'N/A'))
member_item.setText(3, f"Major: {member.get('major', 'N/A')}, Minor: {member.get('minor', 'N/A')}")
for i in range(len(raid_headers)):
self.ui.treeWidget_raid.resizeColumnToContents(i)
logger.info("RAID阵列信息刷新成功。")
except Exception as e:
QMessageBox.critical(self, "错误", f"刷新RAID阵列信息失败: {e}")
logger.error(f"刷新RAID阵列信息失败: {e}")
def show_raid_context_menu(self, pos: QPoint):
item = self.ui.treeWidget_raid.itemAt(pos)
menu = QMenu(self)
create_raid_action = menu.addAction("创建 RAID 阵列...")
create_raid_action.triggered.connect(self._handle_create_raid_array)
menu.addSeparator()
if item and item.parent() is None:
array_data = item.data(0, Qt.UserRole)
if not array_data:
logger.warning(f"无法获取 RAID 阵列 {item.text(0)} 的详细数据。")
return
array_path = array_data.get('device')
member_devices = [m.get('device_path') for m in array_data.get('member_devices', [])]
if not array_path or array_path == 'N/A':
logger.warning(f"RAID阵列 '{array_data.get('name', '未知')}' 的设备路径无效,无法显示操作。")
return
current_mount_point = self.system_manager.get_mountpoint_for_device(array_path)
if current_mount_point and current_mount_point != '[SWAP]' and current_mount_point != '':
unmount_action = menu.addAction(f"卸载 {array_path} ({current_mount_point})")
unmount_action.triggered.connect(lambda: self._unmount_and_refresh(array_path))
else:
mount_action = menu.addAction(f"挂载 {array_path}...")
mount_action.triggered.connect(lambda: self._handle_mount(array_path))
menu.addSeparator()
stop_action = menu.addAction(f"停止阵列 {array_path}")
stop_action.triggered.connect(lambda: self._handle_stop_raid_array(array_path))
delete_action = menu.addAction(f"删除阵列 {array_path}")
delete_action.triggered.connect(lambda: self._handle_delete_raid_array(array_path, member_devices))
format_action = menu.addAction(f"格式化阵列 {array_path}...")
format_action.triggered.connect(lambda: self._handle_format_raid_array(array_path))
if menu.actions():
menu.exec(self.ui.treeWidget_raid.mapToGlobal(pos))
else:
logger.info("右键点击了空白区域或没有可用的RAID操作。")
def _handle_create_raid_array(self):
available_devices = self.system_manager.get_unallocated_partitions()
if not available_devices:
QMessageBox.warning(self, "警告", "没有可用于创建 RAID 阵列的设备。请确保有未挂载、未被LVM或RAID使用的磁盘或分区。")
return
dialog = CreateRaidDialog(self, available_devices)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_raid_info()
if info:
if self.raid_ops.create_raid_array(info['devices'], info['level'], info['chunk_size']):
self.refresh_all_info()
def _handle_stop_raid_array(self, array_path):
if self.raid_ops.stop_raid_array(array_path):
self.refresh_all_info()
def _handle_delete_raid_array(self, array_path, member_devices):
if self.raid_ops.delete_raid_array(array_path, member_devices):
self.refresh_all_info()
def _handle_format_raid_array(self, array_path):
if self.disk_ops.format_partition(array_path):
self.refresh_all_info()
# --- LVM 管理 Tab ---
def refresh_lvm_info(self):
self.ui.treeWidget_lvm.clear()
lvm_headers = ["名称", "大小", "属性", "UUID", "关联", "空闲/已用", "路径/格式", "挂载点"]
self.ui.treeWidget_lvm.setColumnCount(len(lvm_headers))
self.ui.treeWidget_lvm.setHeaderLabels(lvm_headers)
for i in range(len(lvm_headers)):
self.ui.treeWidget_lvm.header().setSectionResizeMode(i, QHeaderView.ResizeToContents)
try:
lvm_data = self.system_manager.get_lvm_info()
if not lvm_data.get('pvs') and not lvm_data.get('vgs') and not lvm_data.get('lvs'):
item = QTreeWidgetItem(self.ui.treeWidget_lvm)
item.setText(0, "未找到LVM信息。")
logger.info("未找到LVM信息。")
return
# 物理卷 (PVs)
pv_root_item = QTreeWidgetItem(self.ui.treeWidget_lvm)
pv_root_item.setText(0, "物理卷 (PVs)")
pv_root_item.setExpanded(True)
pv_root_item.setData(0, Qt.UserRole, {'type': 'pv_root'})
if lvm_data.get('pvs'):
for pv in lvm_data['pvs']:
pv_item = QTreeWidgetItem(pv_root_item)
pv_name = pv.get('pv_name', 'N/A')
if pv_name.startswith('/dev/'):
pv_path = pv_name
else:
pv_path = f"/dev/{pv_name}" if pv_name != 'N/A' else 'N/A'
pv_item.setText(0, pv_name)
pv_item.setText(1, pv.get('pv_size', 'N/A'))
pv_item.setText(2, pv.get('pv_attr', 'N/A'))
pv_item.setText(3, pv.get('pv_uuid', 'N/A'))
pv_item.setText(4, f"VG: {pv.get('vg_name', 'N/A')}")
pv_item.setText(5, f"空闲: {pv.get('pv_free', 'N/A')}")
pv_item.setText(6, pv.get('pv_fmt', 'N/A'))
pv_item.setText(7, "")
pv_data_for_context = pv.copy()
pv_data_for_context['pv_name'] = pv_path
pv_item.setData(0, Qt.UserRole, {'type': 'pv', 'data': pv_data_for_context})
else:
item = QTreeWidgetItem(pv_root_item)
item.setText(0, "未找到物理卷。")
# 卷组 (VGs)
vg_root_item = QTreeWidgetItem(self.ui.treeWidget_lvm)
vg_root_item.setText(0, "卷组 (VGs)")
vg_root_item.setExpanded(True)
vg_root_item.setData(0, Qt.UserRole, {'type': 'vg_root'})
if lvm_data.get('vgs'):
for vg in lvm_data['vgs']:
vg_item = QTreeWidgetItem(vg_root_item)
vg_name = vg.get('vg_name', 'N/A')
vg_item.setText(0, vg_name)
vg_item.setText(1, vg.get('vg_size', 'N/A'))
vg_item.setText(2, vg.get('vg_attr', 'N/A'))
vg_item.setText(3, vg.get('vg_uuid', 'N/A'))
vg_item.setText(4, f"PVs: {vg.get('pv_count', 'N/A')}, LVs: {vg.get('lv_count', 'N/A')}")
vg_item.setText(5, f"空闲: {vg.get('vg_free', 'N/A')}, 已分配: {vg.get('vg_alloc_percent', 'N/A')}%")
vg_item.setText(6, vg.get('vg_fmt', 'N/A'))
vg_item.setText(7, "")
vg_data_for_context = vg.copy()
vg_data_for_context['vg_name'] = vg_name
vg_item.setData(0, Qt.UserRole, {'type': 'vg', 'data': vg_data_for_context})
else:
item = QTreeWidgetItem(vg_root_item)
item.setText(0, "未找到卷组。")
# 逻辑卷 (LVs)
lv_root_item = QTreeWidgetItem(self.ui.treeWidget_lvm)
lv_root_item.setText(0, "逻辑卷 (LVs)")
lv_root_item.setExpanded(True)
lv_root_item.setData(0, Qt.UserRole, {'type': 'lv_root'})
if lvm_data.get('lvs'):
for lv in lvm_data['lvs']:
lv_item = QTreeWidgetItem(lv_root_item)
lv_name = lv.get('lv_name', 'N/A')
vg_name = lv.get('vg_name', 'N/A')
lv_attr = lv.get('lv_attr', '')
lv_path = lv.get('lv_path')
if not lv_path or lv_path == 'N/A':
if vg_name != 'N/A' and lv_name != 'N/A':
lv_path = f"/dev/{vg_name}/{lv_name}"
else:
lv_path = 'N/A'
current_mount_point = self.system_manager.get_mountpoint_for_device(lv_path)
lv_item.setText(0, lv_name)
lv_item.setText(1, lv.get('lv_size', 'N/A'))
lv_item.setText(2, lv_attr)
lv_item.setText(3, lv.get('lv_uuid', 'N/A'))
lv_item.setText(4, f"VG: {vg_name}, Origin: {lv.get('origin', 'N/A')}")
lv_item.setText(5, f"快照: {lv.get('snap_percent', 'N/A')}%")
lv_item.setText(6, lv_path)
lv_item.setText(7, current_mount_point if current_mount_point else "")
lv_data_for_context = lv.copy()
lv_data_for_context['lv_path'] = lv_path
lv_data_for_context['lv_name'] = lv_name
lv_data_for_context['vg_name'] = vg_name
lv_data_for_context['lv_attr'] = lv_attr
lv_item.setData(0, Qt.UserRole, {'type': 'lv', 'data': lv_data_for_context})
else:
item = QTreeWidgetItem(lv_root_item)
item.setText(0, "未找到逻辑卷。")
for i in range(len(lvm_headers)):
self.ui.treeWidget_lvm.resizeColumnToContents(i)
logger.info("LVM信息刷新成功。")
except Exception as e:
QMessageBox.critical(self, "错误", f"刷新LVM信息失败: {e}")
logger.error(f"刷新LVM信息失败: {e}")
def show_lvm_context_menu(self, pos: QPoint):
item = self.ui.treeWidget_lvm.itemAt(pos)
menu = QMenu(self)
create_menu = QMenu("创建...", self)
create_pv_action = create_menu.addAction("创建物理卷 (PV)...")
create_pv_action.triggered.connect(self._handle_create_pv)
create_vg_action = create_menu.addAction("创建卷组 (VG)...")
create_vg_action.triggered.connect(self._handle_create_vg)
create_lv_action = create_menu.addAction("创建逻辑卷 (LV)...")
create_lv_action.triggered.connect(self._handle_create_lv)
menu.addMenu(create_menu)
menu.addSeparator()
if item:
item_data = item.data(0, Qt.UserRole)
if not item_data:
logger.warning(f"无法获取 LVM 项 {item.text(0)} 的详细数据。")
return
item_type = item_data.get('type')
data = item_data.get('data', {})
if item_type == 'pv':
pv_name = data.get('pv_name')
if pv_name and pv_name != 'N/A':
delete_pv_action = menu.addAction(f"删除物理卷 {pv_name}")
delete_pv_action.triggered.connect(lambda: self._handle_delete_pv(pv_name))
elif item_type == 'vg':
vg_name = data.get('vg_name')
if vg_name and vg_name != 'N/A':
delete_vg_action = menu.addAction(f"删除卷组 {vg_name}")
delete_vg_action.triggered.connect(lambda: self._handle_delete_vg(vg_name))
elif item_type == 'lv':
lv_name = data.get('lv_name')
vg_name = data.get('vg_name')
lv_attr = data.get('lv_attr', '')
lv_path = data.get('lv_path')
if lv_name and vg_name and lv_path and lv_path != 'N/A':
if 'a' in lv_attr:
deactivate_lv_action = menu.addAction(f"停用逻辑卷 {lv_name}")
deactivate_lv_action.triggered.connect(lambda: self._handle_deactivate_lv(lv_name, vg_name))
else:
activate_lv_action = menu.addAction(f"激活逻辑卷 {lv_name}")
activate_lv_action.triggered.connect(lambda: self._handle_activate_lv(lv_name, vg_name))
current_mount_point = self.system_manager.get_mountpoint_for_device(lv_path)
if current_mount_point and current_mount_point != '[SWAP]' and current_mount_point != '':
unmount_lv_action = menu.addAction(f"卸载 {lv_name} ({current_mount_point})")
unmount_lv_action.triggered.connect(lambda: self._unmount_and_refresh(lv_path))
else:
mount_lv_action = menu.addAction(f"挂载 {lv_name}...")
mount_lv_action.triggered.connect(lambda: self._handle_mount(lv_path))
menu.addSeparator()
delete_lv_action = menu.addAction(f"删除逻辑卷 {lv_name}")
delete_lv_action.triggered.connect(lambda: self._handle_delete_lv(lv_name, vg_name))
format_lv_action = menu.addAction(f"格式化逻辑卷 {lv_name}...")
format_lv_action.triggered.connect(lambda: self._handle_format_partition(lv_path))
else:
logger.warning(f"逻辑卷 '{lv_name}' (VG: {vg_name}) 的路径无效无法显示操作。Lv Path: {lv_path}")
if menu.actions():
menu.exec(self.ui.treeWidget_lvm.mapToGlobal(pos))
else:
logger.info("右键点击了空白区域或没有可用的LVM操作。")
def _handle_create_pv(self):
available_partitions = self.system_manager.get_unallocated_partitions()
if not available_partitions:
QMessageBox.warning(self, "警告", "没有可用于创建物理卷的未分配分区。")
return
dialog = CreatePvDialog(self, available_partitions)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_pv_info()
if info:
if self.lvm_ops.create_pv(info['device_path']):
self.refresh_all_info()
def _handle_delete_pv(self, device_path):
if self.lvm_ops.delete_pv(device_path):
self.refresh_all_info()
def _handle_create_vg(self):
lvm_info = self.system_manager.get_lvm_info()
available_pvs = []
for pv in lvm_info.get('pvs', []):
pv_name = pv.get('pv_name')
if pv_name and pv_name != 'N/A' and not pv.get('vg_name'):
if pv_name.startswith('/dev/'):
available_pvs.append(pv_name)
else:
available_pvs.append(f"/dev/{pv_name}")
if not available_pvs:
QMessageBox.warning(self, "警告", "没有可用于创建卷组的物理卷。请确保有未分配给任何卷组的物理卷。")
return
dialog = CreateVgDialog(self, available_pvs)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_vg_info()
if info:
if self.lvm_ops.create_vg(info['vg_name'], info['pvs']):
self.refresh_all_info()
def _handle_create_lv(self):
lvm_info = self.system_manager.get_lvm_info()
available_vgs = []
vg_sizes = {}
for vg in lvm_info.get('vgs', []):
vg_name = vg.get('vg_name')
if vg_name and vg_name != 'N/A':
available_vgs.append(vg_name)
free_size_str = vg.get('vg_free', '0B').strip()
current_vg_size_gb = 0.0
match = re.match(r'(\d+\.?\d*)\s*([gmktb])?', free_size_str, re.IGNORECASE)
if match:
value = float(match.group(1))
unit = match.group(2).lower() if match.group(2) else ''
if unit == 'k':
current_vg_size_gb = value / (1024 * 1024)
elif unit == 'm':
current_vg_size_gb = value / 1024
elif unit == 'g':
current_vg_size_gb = value
elif unit == 't':
current_vg_size_gb = value * 1024
elif unit == 'b' or unit == '':
current_vg_size_gb = value / (1024 * 1024 * 1024)
else:
logger.warning(f"未知LVM单位: '{unit}' for '{free_size_str}'")
else:
logger.warning(f"无法解析LVM空闲大小字符串: '{free_size_str}'")
vg_sizes[vg_name] = current_vg_size_gb
if not available_vgs:
QMessageBox.warning(self, "警告", "没有可用于创建逻辑卷的卷组。")
return
dialog = CreateLvDialog(self, available_vgs, vg_sizes)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_lv_info()
if info:
if self.lvm_ops.create_lv(info['lv_name'], info['vg_name'], info['size_gb'], info['use_max_space']):
self.refresh_all_info()
def _handle_delete_lv(self, lv_name, vg_name):
lv_path = f"/dev/{vg_name}/{lv_name}"
self.disk_ops.unmount_partition(lv_path, show_dialog_on_error=False)
self.disk_ops._remove_fstab_entry(lv_path)
if self.lvm_ops.delete_lv(lv_name, vg_name):
self.refresh_all_info()
def _handle_activate_lv(self, lv_name, vg_name):
if self.lvm_ops.activate_lv(lv_name, vg_name):
self.refresh_all_info()
def _handle_deactivate_lv(self, lv_name, vg_name):
if self.lvm_ops.deactivate_lv(lv_name, vg_name):
self.refresh_all_info()
def _handle_delete_vg(self, vg_name):
"""
处理删除卷组 (VG) 的操作。
"""
logger.info(f"尝试删除卷组 (VG) {vg_name}。")
reply = QMessageBox.question(
self,
"确认删除卷组",
f"您确定要删除卷组 {vg_name} 吗?此操作将永久删除该卷组及其所有逻辑卷和数据!",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.No:
logger.info(f"用户取消了删除卷组 {vg_name} 的操作。")
return
lvm_info = self.system_manager.get_lvm_info()
vg_info = next((vg for vg in lvm_info.get('vgs', []) if vg.get('vg_name') == vg_name), None)
lv_count_raw = vg_info.get('lv_count', 0) if vg_info else 0
try:
lv_count = int(float(lv_count_raw))
except (ValueError, TypeError):
lv_count = 0
if lv_count > 0:
QMessageBox.critical(self, "删除失败", f"卷组 {vg_name} 中仍包含逻辑卷。请先删除所有逻辑卷。")
logger.error(f"尝试删除包含逻辑卷的卷组 {vg_name}。操作被阻止。")
return
success = self.lvm_ops.delete_vg(vg_name)
if success:
self.refresh_all_info()
else:
QMessageBox.critical(self, "删除卷组失败", f"删除卷组 {vg_name} 失败,请检查日志。")
if __name__ == "__main__":
app = QApplication(sys.argv)
widget = MainWindow()
widget.show()
sys.exit(app.exec())