This commit is contained in:
zj
2026-02-03 03:25:29 +08:00
parent adee002bd3
commit 74feafdc20
9 changed files with 1624 additions and 83 deletions

Binary file not shown.

View File

@@ -6,6 +6,8 @@ from PySide6.QtWidgets import (QDialog, QVBoxLayout, QHBoxLayout, QLabel,
QMessageBox, QCheckBox, QListWidget, QListWidgetItem) QMessageBox, QCheckBox, QListWidget, QListWidgetItem)
from PySide6.QtCore import Qt from PySide6.QtCore import Qt
from logger_config import setup_logging, logger
class CreatePartitionDialog(QDialog): class CreatePartitionDialog(QDialog):
def __init__(self, parent=None, disk_path="", total_disk_mib=0.0, max_available_mib=0.0): def __init__(self, parent=None, disk_path="", total_disk_mib=0.0, max_available_mib=0.0):
super().__init__(parent) super().__init__(parent)
@@ -15,14 +17,17 @@ class CreatePartitionDialog(QDialog):
self.disk_path = disk_path self.disk_path = disk_path
self.total_disk_mib = total_disk_mib # 磁盘总大小 (MiB) self.total_disk_mib = total_disk_mib # 磁盘总大小 (MiB)
self.max_available_mib = max_available_mib # 可用空间 (MiB) self.max_available_mib = max_available_mib # 可用空间 (MiB)
logger.debug(f"CreatePartitionDialog initialized for {disk_path}. Total MiB: {total_disk_mib}, Max Available MiB: {max_available_mib}")
self.partition_table_type_combo = QComboBox() self.partition_table_type_combo = QComboBox()
self.partition_table_type_combo.addItems(["gpt", "msdos"]) self.partition_table_type_combo.addItems(["gpt", "msdos"])
self.size_spinbox = QDoubleSpinBox() self.size_spinbox = QDoubleSpinBox()
# 调整最小值为 0.01 GB (约 10MB),并确保最大值不小于最小值 # 调整最小值为 0.01 GB (约 10MB),并确保最大值不小于最小值
calculated_max_gb = self.max_available_mib / 1024.0
self.size_spinbox.setMinimum(0.01) self.size_spinbox.setMinimum(0.01)
self.size_spinbox.setMaximum(max(0.01, self.max_available_mib / 1024.0)) # 将 MiB 转换为 GB 显示 self.size_spinbox.setMaximum(max(0.01, calculated_max_gb)) # 将 MiB 转换为 GB 显示
logger.debug(f"Size spinbox max set to: {self.size_spinbox.maximum()} GB (calculated from {calculated_max_gb} GB)")
self.size_spinbox.setSuffix(" GB") self.size_spinbox.setSuffix(" GB")
self.size_spinbox.setDecimals(2) self.size_spinbox.setDecimals(2)
@@ -54,21 +59,34 @@ class CreatePartitionDialog(QDialog):
def _connect_signals(self): def _connect_signals(self):
self.confirm_button.clicked.connect(self.accept) self.confirm_button.clicked.connect(self.accept)
self.cancel_button.clicked.connect(self.reject) self.cancel_button.clicked.connect(self.reject)
# 信号连接保持不变,但 _toggle_size_input 方法内部将直接查询复选框状态
self.use_max_space_checkbox.stateChanged.connect(self._toggle_size_input) self.use_max_space_checkbox.stateChanged.connect(self._toggle_size_input)
def _initialize_state(self): def _initialize_state(self):
# 根据默认选中状态设置 spinbox # 根据默认选中状态设置 spinbox
self._toggle_size_input(self.use_max_space_checkbox.checkState()) self._toggle_size_input(self.use_max_space_checkbox.checkState())
def _toggle_size_input(self, state): def _toggle_size_input(self, state): # state 参数仍然接收,但不再直接用于判断
if state == Qt.Checked: # 直接查询复选框的当前状态,而不是依赖信号传递的 state 参数
self.size_spinbox.setDisabled(True) is_checked = self.use_max_space_checkbox.isChecked()
self.size_spinbox.setValue(self.size_spinbox.maximum()) # 设置为最大可用 GB logger.debug(f"[_toggle_size_input] Called. Checkbox isChecked(): {is_checked}, signal state: {state}")
else:
self.size_spinbox.setDisabled(False) if is_checked:
# 如果之前是最大值,取消勾选后,恢复到最小值或一个合理值 logger.debug(f"[_toggle_size_input] Checkbox is CHECKED. Disabling spinbox and setting value to max.")
self.size_spinbox.setEnabled(False) # 禁用 spinbox
max_val = self.size_spinbox.maximum()
self.size_spinbox.setValue(max_val) # 设置为最大可用 GB
logger.debug(f"[_toggle_size_input] Spinbox value set to max: {max_val} GB.")
else: # is_checked is False
logger.debug(f"[_toggle_size_input] Checkbox is UNCHECKED. Enabling spinbox.")
self.size_spinbox.setEnabled(True) # 启用 spinbox
# 如果之前是最大值,取消勾选后,恢复到最小值,方便用户输入自定义值
if self.size_spinbox.value() == self.size_spinbox.maximum(): if self.size_spinbox.value() == self.size_spinbox.maximum():
self.size_spinbox.setValue(self.size_spinbox.minimum()) min_val = self.size_spinbox.minimum()
self.size_spinbox.setValue(min_val)
logger.debug(f"[_toggle_size_input] Spinbox was at max, reset to min: {min_val} GB.")
else:
logger.debug(f"[_toggle_size_input] Spinbox was not at max, keeping current value: {self.size_spinbox.value()} GB.")
def get_partition_info(self): def get_partition_info(self):
size_gb = self.size_spinbox.value() size_gb = self.size_spinbox.value()

512
disk_operations.py.autosave Normal file
View File

@@ -0,0 +1,512 @@
# disk_operations.py
import subprocess
import logging
import re
import os
from PySide6.QtWidgets import QMessageBox, QInputDialog
from system_info import SystemInfoManager
logger = logging.getLogger(__name__)
class DiskOperations:
def __init__(self, system_manager: SystemInfoManager): # NEW: 接收 system_manager 实例
self.system_manager = system_manager
def _execute_shell_command(self, command_list, error_message, root_privilege=True,
suppress_critical_dialog_on_stderr_match=None, input_data=None):
"""
通用地运行一个 shell 命令,并处理错误。
:param command_list: 命令及其参数的列表。
:param error_message: 命令失败时显示给用户的错误消息。
:param root_privilege: 如果为 True则使用 sudo 执行命令。
:param suppress_critical_dialog_on_stderr_match: 如果 stderr 包含此字符串,则不显示关键错误对话框。
可以是字符串或字符串元组。
:param input_data: 传递给命令stdin的数据 (str)。
:return: (True/False, stdout_str, stderr_str)
"""
if not all(isinstance(arg, str) for arg in command_list):
logger.error(f"命令列表包含非字符串元素: {command_list}")
QMessageBox.critical(None, "错误", f"内部错误:尝试执行的命令包含无效参数。\n命令详情: {command_list}")
return False, "", "内部错误:命令参数类型不正确。"
if root_privilege:
command_list = ["sudo"] + command_list
full_cmd_str = ' '.join(command_list)
logger.debug(f"执行命令: {full_cmd_str}")
try:
result = subprocess.run(
command_list,
capture_output=True,
text=True,
check=True,
encoding='utf-8',
input=input_data
)
logger.info(f"命令成功: {full_cmd_str}")
return True, result.stdout.strip(), result.stderr.strip()
except subprocess.CalledProcessError as e:
stderr_output = e.stderr.strip()
logger.error(f"命令失败: {full_cmd_str}")
logger.error(f"退出码: {e.returncode}")
logger.error(f"标准输出: {e.stdout.strip()}")
logger.error(f"标准错误: {stderr_output}")
# Determine if the error dialog should be suppressed by this function
should_suppress_dialog_here = False
if suppress_critical_dialog_on_stderr_match:
if isinstance(suppress_critical_dialog_on_stderr_match, str):
if suppress_critical_dialog_on_stderr_match in stderr_output:
should_suppress_dialog_here = True
elif isinstance(suppress_critical_dialog_on_stderr_match, tuple):
if any(s in stderr_output for s in suppress_critical_dialog_on_stderr_match):
should_suppress_dialog_here = True
if should_suppress_dialog_here:
logger.info(f"错误信息 '{stderr_output}' 匹配抑制条件,不显示关键错误对话框。")
else:
QMessageBox.critical(None, "错误", f"{error_message}\n错误详情: {stderr_output}")
return False, e.stdout.strip(), stderr_output
except FileNotFoundError:
QMessageBox.critical(None, "错误", f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。")
logger.error(f"命令 '{command_list[0]}' 未找到。")
return False, "", f"命令 '{command_list[0]}' 未找到。"
except Exception as e:
QMessageBox.critical(None, "错误", f"执行命令时发生未知错误: {e}")
logger.error(f"执行命令 {full_cmd_str} 时发生未知错误: {e}")
return False, "", str(e)
def _add_to_fstab(self, device_path, mount_point, fstype, uuid):
"""
将设备的挂载信息添加到 /etc/fstab。
使用 UUID 识别设备以提高稳定性。
"""
if not uuid or not mount_point or not fstype:
logger.error(f"无法将设备 {device_path} 添加到 fstab缺少 UUID/挂载点/文件系统类型。")
QMessageBox.warning(None, "警告", f"无法将设备 {device_path} 添加到 fstab缺少 UUID/挂载点/文件系统类型。")
return False
fstab_entry = f"UUID={uuid} {mount_point} {fstype} defaults 0 2"
fstab_path = "/etc/fstab"
try:
# 检查 fstab 中是否已存在相同 UUID 的条目
# 使用 _execute_shell_command 来读取 fstab尽管通常不需要 sudo
# 这里为了简化,直接读取文件,但写入时使用 _execute_shell_command
with open(fstab_path, 'r') as f:
fstab_content = f.readlines()
for line in fstab_content:
if f"UUID={uuid}" in line:
logger.warning(f"设备 {device_path} (UUID={uuid}) 已存在于 fstab 中。跳过添加。")
QMessageBox.information(None, "信息", f"设备 {device_path} (UUID={uuid}) 已存在于 fstab 中。")
return True # 认为成功,因为目标已达成
# 如果不存在,则追加到 fstab
# 使用 _execute_shell_command for sudo write
success, _, stderr = self._execute_shell_command(
["sh", "-c", f"echo '{fstab_entry}' >> {fstab_path}"],
f"将 {device_path} 添加到 {fstab_path} 失败",
root_privilege=True
)
if success:
logger.info(f"已将 {fstab_entry} 添加到 {fstab_path}。")
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功添加到 /etc/fstab。")
return True
else:
return False # Error handled by _execute_shell_command
except Exception as e:
logger.error(f"处理 {fstab_path} 失败: {e}")
QMessageBox.critical(None, "错误", f"处理 {fstab_path} 失败: {e}")
return False
def _remove_fstab_entry(self, device_path):
"""
从 /etc/fstab 中移除指定设备的条目。
此方法需要 SystemInfoManager 来获取设备的 UUID。
"""
# NEW: 使用 system_manager 获取 UUID它会处理路径解析
device_details = self.system_manager.get_device_details_by_path(device_path)
if not device_details or not device_details.get('uuid'):
logger.warning(f"无法获取设备 {device_path} 的 UUID无法从 fstab 中移除。")
return False
uuid = device_details.get('uuid')
fstab_path = "/etc/fstab"
# Use sed for robust removal with sudo
# suppress_critical_dialog_on_stderr_match is added to handle cases where fstab might not exist
# or sed reports no changes, which is not a critical error for removal.
command = ["sed", "-i", f"/UUID={uuid}/d", fstab_path]
success, _, stderr = self._execute_shell_command(
command,
f"从 {fstab_path} 中删除 UUID={uuid} 的条目失败",
root_privilege=True,
suppress_critical_dialog_on_stderr_match=(
f"sed: {fstab_path}: No such file or directory", # English
f"sed: {fstab_path}: 没有那个文件或目录", # Chinese
"no changes were made" # if sed finds nothing to delete
)
)
if success:
logger.info(f"已从 {fstab_path} 中删除 UUID={uuid} 的条目。")
return True
else:
# If sed failed, it might be because the entry wasn't found, which is fine.
# _execute_shell_command would have suppressed the dialog if it matched the suppress_critical_dialog_on_stderr_match.
# So, if we reach here, it's either a real error (dialog shown by _execute_shell_command)
# or a suppressed "no changes" type of error. In both cases, if no real error, we return True.
if any(s in stderr for s in (
f"sed: {fstab_path}: No such file or directory",
f"sed: {fstab_path}: 没有那个文件或目录",
"no changes were made",
"No such file or directory" # more general check
)):
logger.info(f"fstab 中未找到 UUID={uuid} 的条目,无需删除。")
return True # Consider it a success if the entry is not there
return False # Other errors are already handled by _execute_shell_command
def mount_partition(self, device_path, mount_point, add_to_fstab=False):
"""
挂载指定设备到指定挂载点。
:param device_path: 要挂载的设备路径。
:param mount_point: 挂载点。
:param add_to_fstab: 是否添加到 /etc/fstab。
:return: True 如果成功,否则 False。
"""
if not os.path.exists(mount_point):
try:
os.makedirs(mount_point)
logger.info(f"创建挂载点目录: {mount_point}")
except OSError as e:
QMessageBox.critical(None, "错误", f"创建挂载点目录 {mount_point} 失败: {e}")
logger.error(f"创建挂载点目录 {mount_point} 失败: {e}")
return False
logger.info(f"尝试挂载设备 {device_path} 到 {mount_point}。")
success, _, stderr = self._execute_shell_command(
["mount", device_path, mount_point],
f"挂载设备 {device_path} 失败"
)
if success:
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功挂载到 {mount_point}。")
if add_to_fstab:
# NEW: 使用 self.system_manager 获取 fstype 和 UUID
device_details = self.system_manager.get_device_details_by_path(device_path)
if device_details:
fstype = device_details.get('fstype')
uuid = device_details.get('uuid')
if fstype and uuid:
self._add_to_fstab(device_path, mount_point, fstype, uuid)
else:
logger.error(f"无法获取设备 {device_path} 的文件系统类型或 UUID 以添加到 fstab。")
QMessageBox.warning(None, "警告", f"设备 {device_path} 已挂载,但无法获取文件系统类型或 UUID 以添加到 fstab。")
else:
logger.error(f"无法获取设备 {device_path} 的详细信息以添加到 fstab。")
QMessageBox.warning(None, "警告", f"设备 {device_path} 已挂载,但无法获取详细信息以添加到 fstab。")
return True
else:
return False
def unmount_partition(self, device_path, show_dialog_on_error=True):
"""
卸载指定设备。
:param device_path: 要卸载的设备路径。
:param show_dialog_on_error: 是否在发生错误时显示对话框。
:return: True 如果成功,否则 False。
"""
logger.info(f"尝试卸载设备 {device_path}。")
# 定义表示设备已未挂载的错误信息(中英文)
# 增加了 "未指定挂载点"
already_unmounted_errors = ("not mounted", "未挂载", "未指定挂载点")
# 调用 _execute_shell_command并告诉它在遇到“已未挂载”错误时不要弹出其自身的关键错误对话框。
success, _, stderr = self._execute_shell_command(
["umount", device_path],
f"卸载设备 {device_path} 失败",
suppress_critical_dialog_on_stderr_match=already_unmounted_errors
)
if success:
# 如果命令成功执行,则卸载成功。
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功卸载。")
return True
else:
# 如果命令失败,检查是否是因为设备已经未挂载或挂载点未指定。
is_already_unmounted_error = any(s in stderr for s in already_unmounted_errors)
if is_already_unmounted_error:
logger.info(f"设备 {device_path} 已经处于未挂载状态(或挂载点未指定),无需重复卸载。")
# 这种情况下,操作结果符合预期(设备已是未挂载),不弹出对话框,并返回 True。
return True
else:
# 对于其他类型的卸载失败(例如设备忙、权限不足等),
# _execute_shell_command 应该已经弹出了关键错误对话框
# (因为它没有匹配到 `already_unmounted_errors` 进行抑制)。
# 所以,这里我们不需要再次弹出对话框,直接返回 False。
return False
def get_disk_free_space_info_mib(self, disk_path, total_disk_mib):
"""
获取磁盘上最大的空闲空间块的起始位置 (MiB) 和大小 (MiB)。
:param disk_path: 磁盘路径。
:param total_disk_mib: 磁盘的总大小 (MiB),用于全新磁盘的计算。
:return: (start_mib, size_mib) 元组。如果磁盘是全新的,返回 (0.0, total_disk_mib)。
如果磁盘有分区表但没有空闲空间,返回 (None, None)。
"""
logger.debug(f"尝试获取磁盘 {disk_path} 的空闲空间信息 (MiB)。")
# suppress_critical_dialog_on_stderr_match is added to handle cases where parted might complain about
# an unrecognized disk label, which is expected for a fresh disk.
success, stdout, stderr = self._execute_shell_command(
["parted", "-s", disk_path, "unit", "MiB", "print", "free"],
f"获取磁盘 {disk_path} 分区信息失败",
root_privilege=True,
suppress_critical_dialog_on_stderr_match=("无法辨识的磁盘卷标", "unrecognized disk label")
)
if not success:
# If parted failed and it wasn't due to an unrecognized label (handled by suppress_critical_dialog_on_stderr_match),
# then _execute_shell_command would have shown a dialog.
# We just log and return None.
logger.error(f"获取磁盘 {disk_path} 空闲空间信息失败: {stderr}")
return None, None
logger.debug(f"parted print free 命令原始输出:\n{stdout}")
free_spaces = []
lines = stdout.splitlines()
# Regex to capture StartMiB and SizeMiB from a "Free Space" line
# It's made more flexible to match "Free Space", "空闲空间", and "可用空间"
# Example: " 0.02MiB 8192MiB 8192MiB 可用空间"
# We need to capture the first numeric value (Start) and the third numeric value (Size).
free_space_line_pattern = re.compile(r'^\s*(\d+\.?\d*)MiB\s+(\d+\.?\d*)MiB\s+(\d+\.?\d*)MiB\s+(?:Free Space|空闲空间|可用空间)')
for line in lines:
match = free_space_line_pattern.match(line)
if match:
try:
start_mib = float(match.group(1)) # Capture StartMiB
size_mib = float(match.group(3)) # Capture SizeMiB (the third numeric value)
free_spaces.append({'start_mib': start_mib, 'size_mib': size_mib})
except ValueError as ve:
logger.warning(f"解析 parted free space 行 '{line}' 失败: {ve}")
continue
if not free_spaces:
logger.warning(f"在磁盘 {disk_path} 上未找到空闲空间。")
return None, None
# 找到最大的空闲空间块
largest_free_space = max(free_spaces, key=lambda x: x['size_mib'])
start_mib = largest_free_space['start_mib']
size_mib = largest_free_space['size_mib']
logger.debug(f"磁盘 {disk_path} 的最大空闲空间块起始于 {start_mib:.2f} MiB大小为 {size_mib:.2f} MiB。")
return start_mib, size_mib
def create_partition(self, disk_path, partition_table_type, size_gb, total_disk_mib, use_max_space):
"""
在指定磁盘上创建分区。
:param disk_path: 磁盘路径 (例如 /dev/sdb)。
:param partition_table_type: 分区表类型 ('gpt' 或 'msdos')。
:param size_gb: 分区大小 (GB)。
:param total_disk_mib: 磁盘总大小 (MiB)。
:param use_max_space: 是否使用最大可用空间。
:return: True 如果成功,否则 False。
"""
if not isinstance(disk_path, str) or not isinstance(partition_table_type, str):
logger.error(f"尝试创建分区时传入无效参数。磁盘路径: {disk_path}, 分区表类型: {partition_table_type}")
QMessageBox.critical(None, "错误", "无效的磁盘路径或分区表类型。")
return False
# 1. 检查磁盘是否有分区表
has_partition_table = False
# Use suppress_critical_dialog_on_stderr_match for "unrecognized disk label" when checking
success_check, stdout_check, stderr_check = self._execute_shell_command(
["parted", "-s", disk_path, "print"],
f"检查磁盘 {disk_path} 分区表失败",
suppress_critical_dialog_on_stderr_match=("无法辨识的磁盘卷标", "unrecognized disk label"),
root_privilege=True
)
# If success_check is False, it means _execute_shell_command encountered an error.
# If that error was "unrecognized disk label", it was suppressed, and we treat it as no partition table.
# If it was another error, a dialog was shown by _execute_shell_command, and we should stop.
if not success_check:
# Check if the failure was due to "unrecognized disk label" (which means no partition table)
if any(s in stderr_check for s in ("无法辨识的磁盘卷标", "unrecognized disk label")):
logger.info(f"磁盘 {disk_path} 没有可识别的分区表。")
has_partition_table = False # Explicitly set to False
else:
logger.error(f"检查磁盘 {disk_path} 分区表时发生非预期的错误: {stderr_check}")
return False # Other critical error, stop operation.
else: # success_check is True
if "Partition Table: unknown" not in stdout_check and "分区表unknown" not in stdout_check:
has_partition_table = True
else:
logger.info(f"parted print 报告磁盘 {disk_path} 的分区表为 'unknown'。")
has_partition_table = False
actual_start_mib_for_parted = 0.0
# 2. 如果没有分区表,则创建分区表
if not has_partition_table:
reply = QMessageBox.question(None, "确认创建分区表",
f"磁盘 {disk_path} 没有分区表。您确定要创建 {partition_table_type} 分区表吗?"
f"此操作将擦除磁盘上的所有数据。",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.No:
logger.info(f"用户取消了在 {disk_path} 上创建分区表的操作。")
return False
logger.info(f"尝试在 {disk_path} 上创建 {partition_table_type} 分区表。")
success, _, stderr = self._execute_shell_command(
["parted", "-s", disk_path, "mklabel", partition_table_type],
f"创建 {partition_table_type} 分区表失败"
)
if not success:
return False
# 对于新创建分区表的磁盘,第一个分区通常从 1MiB 开始以确保兼容性和对齐
actual_start_mib_for_parted = 1.0
else:
# 如果有分区表,获取下一个可用分区的起始位置
start_mib_from_parted, _ = self.get_disk_free_space_info_mib(disk_path, total_disk_mib)
if start_mib_from_parted is None:
QMessageBox.critical(None, "错误", f"无法确定磁盘 {disk_path} 的分区起始位置或没有空闲空间。")
return False
# 如果 parted 报告的起始位置非常接近 0 MiB (例如 0.0 MiB 或 0.02 MiB)
# 为了安全和兼容性,也将其调整为 1.0 MiB。
# 否则,使用 parted 报告的精确起始位置。
if start_mib_from_parted < 1.0: # covers 0.0MiB and values like 0.017MiB (17.4kB)
actual_start_mib_for_parted = 1.0
else:
actual_start_mib_for_parted = start_mib_from_parted
# 3. 确定分区结束位置
if use_max_space:
end_pos = "100%"
size_for_log = "最大可用空间"
else:
# 计算结束 MiB
# 注意:这里计算的 end_mib 是基于用户请求的大小,
# parted 会根据实际可用空间和对齐进行微调。
end_mib = actual_start_mib_for_parted + size_gb * 1024
# 简单检查,如果请求的大小导致结束位置超出磁盘总大小,则使用最大可用
if end_mib > total_disk_mib:
QMessageBox.warning(None, "警告", f"请求的分区大小 ({size_gb}GB) 超出了可用空间。将调整为最大可用空间。")
end_pos = "100%"
size_for_log = "最大可用空间"
else:
end_pos = f"{end_mib}MiB"
size_for_log = f"{size_gb}GB"
# 4. 创建分区
create_cmd = ["parted", "-s", disk_path, "mkpart", "primary", f"{actual_start_mib_for_parted}MiB", end_pos]
logger.info(f"尝试在 {disk_path} 上创建 {size_for_log} 的主分区。命令: {' '.join(create_cmd)}")
success, _, stderr = self._execute_shell_command(
create_cmd,
f"在 {disk_path} 上创建分区失败"
)
if success:
QMessageBox.information(None, "成功", f"在 {disk_path} 上成功创建了 {size_for_log} 的分区。")
return True
else:
return False
def delete_partition(self, device_path):
"""
删除指定分区。
:param device_path: 要删除的分区路径。
:return: True 如果成功,否则 False。
"""
reply = QMessageBox.question(None, "确认删除分区",
f"您确定要删除分区 {device_path} 吗?此操作将擦除分区上的所有数据!",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.No:
logger.info(f"用户取消了删除分区 {device_path} 的操作。")
return False
# 尝试卸载分区
# The unmount_partition method now handles "not mounted" gracefully without dialog and returns True.
# So, we just call it.
self.unmount_partition(device_path, show_dialog_on_error=False) # show_dialog_on_error=False means no dialog for other errors too.
# 从 fstab 中移除条目
# _remove_fstab_entry also handles "not found" gracefully.
self._remove_fstab_entry(device_path)
# 获取父磁盘和分区号
# 例如 /dev/sdb1 -> /dev/sdb, 1
match = re.match(r'(/dev/[a-z]+)(\d+)', device_path)
if not match:
QMessageBox.critical(None, "错误", f"无法解析设备路径 {device_path}。")
logger.error(f"无法解析设备路径 {device_path}。")
return False
disk_path = match.group(1)
partition_number = match.group(2)
logger.info(f"尝试删除分区 {device_path} (磁盘: {disk_path}, 分区号: {partition_number})。")
success, _, stderr = self._execute_shell_command(
["parted", "-s", disk_path, "rm", partition_number],
f"删除分区 {device_path} 失败"
)
if success:
QMessageBox.information(None, "成功", f"分区 {device_path} 已成功删除。")
return True
else:
return False
def format_partition(self, device_path, fstype=None):
"""
格式化指定分区。
:param device_path: 要格式化的分区路径。
:param fstype: 文件系统类型 (例如 'ext4', 'xfs', 'fat32', 'ntfs')。如果为 None则弹出对话框让用户选择。
:return: True 如果成功,否则 False。
"""
if fstype is None:
items = ("ext4", "xfs", "fat32", "ntfs")
fstype, ok = QInputDialog.getItem(None, "选择文件系统", "请选择要使用的文件系统类型:", items, 0, False)
if not ok or not fstype:
logger.info("用户取消了文件系统选择。")
return False
reply = QMessageBox.question(None, "确认格式化分区",
f"您确定要将分区 {device_path} 格式化为 {fstype} 吗?此操作将擦除分区上的所有数据!",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.No:
logger.info(f"用户取消了格式化分区 {device_path} 的操作。")
return False
# 尝试卸载分区
self.unmount_partition(device_path, show_dialog_on_error=False) # 静默卸载
# 从 fstab 中移除条目 (因为格式化会改变 UUID)
self._remove_fstab_entry(device_path)
logger.info(f"尝试将分区 {device_path} 格式化为 {fstype}。")
format_cmd = []
if fstype == "ext4":
format_cmd = ["mkfs.ext4", "-F", device_path] # -F 强制执行
elif fstype == "xfs":
format_cmd = ["mkfs.xfs", "-f", device_path] # -f 强制执行
elif fstype == "fat32":
format_cmd = ["mkfs.fat", "-F", "32", device_path]
elif fstype == "ntfs":
format_cmd = ["mkfs.ntfs", "-f", device_path]
else:
QMessageBox.critical(None, "错误", f"不支持的文件系统类型: {fstype}")
logger.error(f"不支持的文件系统类型: {fstype}")
return False
success, _, stderr = self._execute_shell_command(
format_cmd,
f"格式化分区 {device_path} 失败"
)
if success:
QMessageBox.information(None, "成功", f"分区 {device_path} 已成功格式化为 {fstype}。")
return True
else:
return False

View File

@@ -16,6 +16,7 @@ class LvmOperations:
:param error_message: 命令失败时显示给用户的错误消息。 :param error_message: 命令失败时显示给用户的错误消息。
:param root_privilege: 如果为 True则使用 sudo 执行命令。 :param root_privilege: 如果为 True则使用 sudo 执行命令。
:param suppress_critical_dialog_on_stderr_match: 如果 stderr 包含此字符串,则不显示关键错误对话框。 :param suppress_critical_dialog_on_stderr_match: 如果 stderr 包含此字符串,则不显示关键错误对话框。
可以是字符串或字符串元组/列表。
:param input_data: 传递给命令stdin的数据 (str)。 :param input_data: 传递给命令stdin的数据 (str)。
:return: (True/False, stdout_str, stderr_str) :return: (True/False, stdout_str, stderr_str)
""" """
@@ -48,11 +49,24 @@ class LvmOperations:
logger.error(f"标准输出: {e.stdout.strip()}") logger.error(f"标准输出: {e.stdout.strip()}")
logger.error(f"标准错误: {stderr_output}") logger.error(f"标准错误: {stderr_output}")
if suppress_critical_dialog_on_stderr_match and \ # --- 修改开始:处理 suppress_critical_dialog_on_stderr_match 可以是字符串或元组/列表 ---
suppress_critical_dialog_on_stderr_match in stderr_output: should_suppress_dialog = False
if suppress_critical_dialog_on_stderr_match:
if isinstance(suppress_critical_dialog_on_stderr_match, str):
if suppress_critical_dialog_on_stderr_match in stderr_output:
should_suppress_dialog = True
elif isinstance(suppress_critical_dialog_on_stderr_match, (list, tuple)):
for pattern in suppress_critical_dialog_on_stderr_match:
if pattern in stderr_output:
should_suppress_dialog = True
break # 找到一个匹配就足够了
if should_suppress_dialog:
logger.info(f"错误信息 '{stderr_output}' 匹配抑制条件,不显示关键错误对话框。") logger.info(f"错误信息 '{stderr_output}' 匹配抑制条件,不显示关键错误对话框。")
else: else:
QMessageBox.critical(None, "错误", f"{error_message}\n错误详情: {stderr_output}") QMessageBox.critical(None, "错误", f"{error_message}\n错误详情: {stderr_output}")
# --- 修改结束 ---
return False, e.stdout.strip(), stderr_output return False, e.stdout.strip(), stderr_output
except FileNotFoundError: except FileNotFoundError:
QMessageBox.critical(None, "错误", f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。") QMessageBox.critical(None, "错误", f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。")
@@ -66,7 +80,7 @@ class LvmOperations:
def create_pv(self, device_path): def create_pv(self, device_path):
""" """
创建物理卷 (PV)。 创建物理卷 (PV)。
:param device_path: 设备的路径,例如 /dev/sdb1。 :param device_path: 设备的路径,例如 /dev/sdb1 或 /dev/sdb
:return: True 如果成功,否则 False。 :return: True 如果成功,否则 False。
""" """
if not isinstance(device_path, str): if not isinstance(device_path, str):
@@ -82,15 +96,58 @@ class LvmOperations:
return False return False
logger.info(f"尝试在 {device_path} 上创建物理卷。") logger.info(f"尝试在 {device_path} 上创建物理卷。")
# 第一次尝试创建物理卷,抑制 "device is partitioned" 错误,因为我们将在代码中处理它
success, _, stderr = self._execute_shell_command( success, _, stderr = self._execute_shell_command(
["pvcreate", "-y", device_path], # -y 自动确认 ["pvcreate", "-y", device_path], # -y 自动确认
f"{device_path} 上创建物理卷失败" f"{device_path} 上创建物理卷失败",
suppress_critical_dialog_on_stderr_match="device is partitioned" # 抑制此特定错误
) )
if success: if success:
QMessageBox.information(None, "成功", f"物理卷已在 {device_path} 上成功创建。") QMessageBox.information(None, "成功", f"物理卷已在 {device_path} 上成功创建。")
return True return True
else: else:
return False # 如果 pvcreate 失败,检查是否是 "device is partitioned" 错误
if "device is partitioned" in stderr:
# 提示用户是否要擦除分区表
wipe_reply = QMessageBox.question(
None, "设备已分区",
f"设备 {device_path} 已分区。您是否要擦除设备上的所有分区表,"
f"并将其整个用于物理卷?此操作将导致所有数据丢失!",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No
)
if wipe_reply == QMessageBox.Yes:
logger.warning(f"用户选择擦除 {device_path} 上的分区表并创建物理卷。")
# 尝试擦除分区表 (使用 GPT 作为默认,通常更现代且支持大容量磁盘)
# 注意:这里需要 parted 命令,确保系统已安装 parted
mklabel_success, _, mklabel_stderr = self._execute_shell_command(
["parted", "-s", device_path, "mklabel", "gpt"], # 使用 gpt 分区表
f"擦除 {device_path} 上的分区表失败"
)
if mklabel_success:
logger.info(f"已成功擦除 {device_path} 上的分区表。重试创建物理卷。")
# 再次尝试创建物理卷
retry_success, _, retry_stderr = self._execute_shell_command(
["pvcreate", "-y", device_path],
f"{device_path} 上创建物理卷失败 (重试)"
# 重试时不再抑制如果再次失败_execute_shell_command 会显示错误
)
if retry_success:
QMessageBox.information(None, "成功", f"物理卷已在 {device_path} 上成功创建。")
return True
else:
# 如果重试失败_execute_shell_command 已经显示错误
return False
else:
# mklabel 失败_execute_shell_command 已经显示错误
return False
else:
logger.info(f"用户取消了擦除 {device_path} 分区表的操作。")
QMessageBox.information(None, "信息", f"未在已分区设备 {device_path} 上创建物理卷。")
return False
else:
# 对于其他类型的 pvcreate 失败_execute_shell_command 已经显示了错误
return False
def delete_pv(self, device_path): def delete_pv(self, device_path):
""" """

View File

@@ -37,7 +37,7 @@ class MainWindow(QMainWindow):
self.system_manager = SystemInfoManager() self.system_manager = SystemInfoManager()
self.disk_ops = DiskOperations(self.system_manager) self.disk_ops = DiskOperations(self.system_manager)
self.raid_ops = RaidOperations() self.raid_ops = RaidOperations()
self.lvm_ops = LvmOperations() self.lvm_ops = LvmOperations() # LvmOperations 包含通用的 _execute_shell_command
# 连接刷新按钮的信号到槽函数 # 连接刷新按钮的信号到槽函数
if hasattr(self.ui, 'refreshButton'): if hasattr(self.ui, 'refreshButton'):
@@ -149,6 +149,10 @@ class MainWindow(QMainWindow):
if device_type == 'disk': if device_type == 'disk':
create_partition_action = menu.addAction(f"创建分区 {device_path}...") create_partition_action = menu.addAction(f"创建分区 {device_path}...")
create_partition_action.triggered.connect(lambda: self._handle_create_partition(device_path, dev_data)) create_partition_action.triggered.connect(lambda: self._handle_create_partition(device_path, dev_data))
# --- 新增功能:擦除分区表 ---
wipe_action = menu.addAction(f"擦除分区表 {device_path}...")
wipe_action.triggered.connect(lambda: self._handle_wipe_partition_table(device_path))
# --- 新增功能结束 ---
menu.addSeparator() menu.addSeparator()
if device_type == 'part': if device_type == 'part':
@@ -171,6 +175,39 @@ class MainWindow(QMainWindow):
else: else:
logger.info("右键点击了空白区域或设备没有可用的操作。") logger.info("右键点击了空白区域或设备没有可用的操作。")
# --- 新增方法:处理擦除分区表 ---
def _handle_wipe_partition_table(self, device_path):
"""
处理擦除物理盘分区表的操作。
"""
reply = QMessageBox.question(
self,
"确认擦除分区表",
f"您确定要擦除设备 {device_path} 上的所有分区表吗?\n"
f"**此操作将永久删除设备上的所有分区和数据,且不可恢复!**\n"
f"请谨慎操作!",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.No:
logger.info(f"用户取消了擦除 {device_path} 分区表的操作。")
return
logger.warning(f"尝试擦除 {device_path} 上的分区表。")
# 使用 LvmOperations 中通用的 _execute_shell_command 来执行 parted 命令
success, _, stderr = self.lvm_ops._execute_shell_command(
["parted", "-s", device_path, "mklabel", "gpt"], # 默认使用 gpt 分区表类型
f"擦除 {device_path} 上的分区表失败"
)
if success:
QMessageBox.information(self, "成功", f"设备 {device_path} 上的分区表已成功擦除。")
self.refresh_all_info()
else:
# 错误信息已由 _execute_shell_command 处理并显示
pass
# --- 新增方法结束 ---
def _handle_create_partition(self, disk_path, dev_data): def _handle_create_partition(self, disk_path, dev_data):
total_disk_mib = 0.0 total_disk_mib = 0.0
total_size_str = dev_data.get('size') total_size_str = dev_data.get('size')
@@ -208,6 +245,7 @@ class MainWindow(QMainWindow):
if dev_data.get('children'): if dev_data.get('children'):
logger.debug(f"磁盘 {disk_path} 存在现有分区,尝试计算下一个分区起始位置。") logger.debug(f"磁盘 {disk_path} 存在现有分区,尝试计算下一个分区起始位置。")
# 假设 disk_ops.get_disk_free_space_info_mib 能够正确处理
calculated_start_mib, largest_free_space_mib = self.disk_ops.get_disk_free_space_info_mib(disk_path, total_disk_mib) calculated_start_mib, largest_free_space_mib = self.disk_ops.get_disk_free_space_info_mib(disk_path, total_disk_mib)
if calculated_start_mib is None or largest_free_space_mib is None: if calculated_start_mib is None or largest_free_space_mib is None:
QMessageBox.critical(self, "错误", f"无法确定磁盘 {disk_path} 的分区起始位置。") QMessageBox.critical(self, "错误", f"无法确定磁盘 {disk_path} 的分区起始位置。")
@@ -218,9 +256,9 @@ class MainWindow(QMainWindow):
max_available_mib = 0.0 max_available_mib = 0.0
else: else:
logger.debug(f"磁盘 {disk_path} 没有现有分区,假定从 0 MiB 开始,最大可用空间为整个磁盘。") logger.debug(f"磁盘 {disk_path} 没有现有分区,假定从 0 MiB 开始,最大可用空间为整个磁盘。")
max_available_mib = max(0.0, total_disk_mib - 1.0) max_available_mib = max(0.0, total_disk_mib - 1.0) # 留一点空间,避免边界问题
start_position_mib = 1.0 start_position_mib = 1.0 # 现代分区表通常从1MB或更大偏移开始
logger.debug(f"磁盘 /dev/sdd 没有现有分区,假定从 {start_position_mib} MiB 开始,最大可用空间为 {max_available_mib} MiB。") logger.debug(f"磁盘 {disk_path} 没有现有分区,假定从 {start_position_mib} MiB 开始,最大可用空间为 {max_available_mib} MiB。")
dialog = CreatePartitionDialog(self, disk_path, total_disk_mib, max_available_mib) dialog = CreatePartitionDialog(self, disk_path, total_disk_mib, max_available_mib)
if dialog.exec() == QDialog.Accepted: if dialog.exec() == QDialog.Accepted:

762
mainwindow.py.autosave Normal file
View File

@@ -0,0 +1,762 @@
# mainwindow.py
import sys
import logging
import re
import os
from PySide6.QtWidgets import (QApplication, QMainWindow, QTreeWidgetItem,
QMessageBox, QHeaderView, QMenu, QInputDialog, QDialog)
from PySide6.QtCore import Qt, QPoint
# 导入自动生成的 UI 文件
from ui_form import Ui_MainWindow
# 导入我们自己编写的系统信息管理模块
from system_info import SystemInfoManager
# 导入日志配置
from logger_config import setup_logging, logger
# 导入磁盘操作模块
from disk_operations import DiskOperations
# 导入 RAID 操作模块
from raid_operations import RaidOperations
# 导入 LVM 操作模块
from lvm_operations import LvmOperations
# 导入自定义对话框
from dialogs import (CreatePartitionDialog, MountDialog, CreateRaidDialog,
CreatePvDialog, CreateVgDialog, CreateLvDialog)
class MainWindow(QMainWindow):
def __init__(self, parent=None):
super().__init__(parent)
self.ui = Ui_MainWindow()
self.ui.setupUi(self)
setup_logging(self.ui.logOutputTextEdit)
logger.info("应用程序启动。")
# 初始化管理器和操作类
self.system_manager = SystemInfoManager()
self.disk_ops = DiskOperations(self.system_manager)
self.raid_ops = RaidOperations()
self.lvm_ops = LvmOperations() # LvmOperations 包含通用的 _execute_shell_command
# 连接刷新按钮的信号到槽函数
if hasattr(self.ui, 'refreshButton'):
self.ui.refreshButton.clicked.connect(self.refresh_all_info)
else:
logger.warning("Warning: refreshButton not found in UI. Please add it in form.ui and regenerate ui_form.py.")
# 启用 treeWidget 的自定义上下文菜单
self.ui.treeWidget_block_devices.setContextMenuPolicy(Qt.CustomContextMenu)
self.ui.treeWidget_block_devices.customContextMenuRequested.connect(self.show_block_device_context_menu)
self.ui.treeWidget_raid.setContextMenuPolicy(Qt.CustomContextMenu)
self.ui.treeWidget_raid.customContextMenuRequested.connect(self.show_raid_context_menu)
self.ui.treeWidget_lvm.setContextMenuPolicy(Qt.CustomContextMenu)
self.ui.treeWidget_lvm.customContextMenuRequested.connect(self.show_lvm_context_menu)
# 初始化时刷新所有数据
self.refresh_all_info()
logger.info("所有设备信息已初始化加载。")
def refresh_all_info(self):
"""
刷新所有设备信息块设备、RAID和LVM。
"""
logger.info("开始刷新所有设备信息...")
self.refresh_block_devices_info()
self.refresh_raid_info()
self.refresh_lvm_info()
logger.info("所有设备信息刷新完成。")
# --- 块设备概览 Tab ---
def refresh_block_devices_info(self):
self.ui.treeWidget_block_devices.clear()
columns = [
("设备名", 'name'), ("类型", 'type'), ("大小", 'size'), ("挂载点", 'mountpoint'),
("文件系统", 'fstype'), ("只读", 'ro'), ("UUID", 'uuid'), ("PARTUUID", 'partuuid'),
("厂商", 'vendor'), ("型号", 'model'), ("序列号", 'serial'),
("主次号", 'maj:min'), ("父设备名", 'pkname'),
]
headers = [col[0] for col in columns]
self.field_keys = [col[1] for col in columns]
self.ui.treeWidget_block_devices.setColumnCount(len(headers))
self.ui.treeWidget_block_devices.setHeaderLabels(headers)
for i in range(len(headers)):
self.ui.treeWidget_block_devices.header().setSectionResizeMode(i, QHeaderView.ResizeToContents)
try:
devices = self.system_manager.get_block_devices()
for dev in devices:
self._add_device_to_tree(self.ui.treeWidget_block_devices, dev)
for i in range(len(headers)):
self.ui.treeWidget_block_devices.resizeColumnToContents(i)
logger.info("块设备信息刷新成功。")
except Exception as e:
QMessageBox.critical(self, "错误", f"刷新块设备信息失败: {e}")
logger.error(f"刷新块设备信息失败: {e}")
def _add_device_to_tree(self, parent_item, dev_data):
item = QTreeWidgetItem(parent_item)
for i, key in enumerate(self.field_keys):
value = dev_data.get(key)
if key == 'ro':
item.setText(i, "是" if value else "否")
elif value is None:
item.setText(i, "")
else:
item.setText(i, str(value))
item.setData(0, Qt.UserRole, dev_data)
if 'children' in dev_data:
for child in dev_data['children']:
self._add_device_to_tree(item, child)
item.setExpanded(True)
def show_block_device_context_menu(self, pos: QPoint):
item = self.ui.treeWidget_block_devices.itemAt(pos)
menu = QMenu(self)
create_menu = QMenu("创建...", self)
create_raid_action = create_menu.addAction("创建 RAID 阵列...")
create_raid_action.triggered.connect(self._handle_create_raid_array)
create_pv_action = create_menu.addAction("创建物理卷 (PV)...")
create_pv_action.triggered.connect(self._handle_create_pv)
menu.addMenu(create_menu)
menu.addSeparator()
if item:
dev_data = item.data(0, Qt.UserRole)
if not dev_data:
logger.warning(f"无法获取设备 {item.text(0)} 的详细数据。")
return
device_name = dev_data.get('name')
device_type = dev_data.get('type')
mount_point = dev_data.get('mountpoint')
device_path = dev_data.get('path')
if not device_path:
device_path = f"/dev/{device_name}"
if device_type == 'disk':
create_partition_action = menu.addAction(f"创建分区 {device_path}...")
create_partition_action.triggered.connect(lambda: self._handle_create_partition(device_path, dev_data))
# --- 新增功能:擦除分区表 ---
wipe_action = menu.addAction(f"擦除分区表 {device_path}...")
wipe_action.triggered.connect(lambda: self._handle_wipe_partition_table(device_path))
# --- 新增功能结束 ---
menu.addSeparator()
if device_type == 'part':
if not mount_point or mount_point == '' or mount_point == 'N/A':
mount_action = menu.addAction(f"挂载 {device_path}...")
mount_action.triggered.connect(lambda: self._handle_mount(device_path))
elif mount_point != '[SWAP]':
unmount_action = menu.addAction(f"卸载 {device_path}")
unmount_action.triggered.connect(lambda: self._unmount_and_refresh(device_path))
menu.addSeparator()
delete_action = menu.addAction(f"删除分区 {device_path}")
delete_action.triggered.connect(lambda: self._handle_delete_partition(device_path))
format_action = menu.addAction(f"格式化分区 {device_path}...")
format_action.triggered.connect(lambda: self._handle_format_partition(device_path))
if menu.actions():
menu.exec(self.ui.treeWidget_block_devices.mapToGlobal(pos))
else:
logger.info("右键点击了空白区域或设备没有可用的操作。")
# --- 新增方法:处理擦除分区表 ---
def _handle_wipe_partition_table(self, device_path):
"""
处理擦除物理盘分区表的操作。
"""
reply = QMessageBox.question(
self,
"确认擦除分区表",
f"您确定要擦除设备 {device_path} 上的所有分区表吗?\n"
f"**此操作将永久删除设备上的所有分区和数据,且不可恢复!**\n"
f"请谨慎操作!",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.No:
logger.info(f"用户取消了擦除 {device_path} 分区表的操作。")
return
logger.warning(f"尝试擦除 {device_path} 上的分区表。")
# 使用 LvmOperations 中通用的 _execute_shell_command 来执行 parted 命令
success, _, stderr = self.lvm_ops._execute_shell_command(
["parted", "-s", device_path, "mklabel", "gpt"], # 默认使用 gpt 分区表类型
f"擦除 {device_path} 上的分区表失败"
)
if success:
QMessageBox.information(self, "成功", f"设备 {device_path} 上的分区表已成功擦除。")
self.refresh_all_info()
else:
# 错误信息已由 _execute_shell_command 处理并显示
pass
# --- 新增方法结束 ---
def _handle_create_partition(self, disk_path, dev_data):
total_disk_mib = 0.0
total_size_str = dev_data.get('size')
logger.debug(f"尝试为磁盘 {disk_path} 创建分区。原始大小字符串: '{total_size_str}'")
if total_size_str:
match = re.match(r'(\d+(\.\d+)?)\s*([KMGT]?B?)', total_size_str, re.IGNORECASE)
if match:
value = float(match.group(1))
unit = match.group(3).upper() if match.group(3) else ''
if unit == 'KB' or unit == 'K': total_disk_mib = value / 1024
elif unit == 'MB' or unit == 'M': total_disk_mib = value
elif unit == 'GB' or unit == 'G': total_disk_mib = value * 1024
elif unit == 'TB' or unit == 'T': total_disk_mib = value * 1024 * 1024
elif unit == 'B':
total_disk_mib = value / (1024 * 1024)
else:
logger.warning(f"无法识别磁盘 {disk_path} 的大小单位: '{unit}' (原始: '{total_size_str}')")
total_disk_mib = 0.0
logger.debug(f"解析后的磁盘总大小 (MiB): {total_disk_mib}")
else:
logger.warning(f"无法解析磁盘 {disk_path} 的大小字符串 '{total_size_str}'。正则表达式不匹配。")
total_disk_mib = 0.0
else:
logger.warning(f"获取磁盘 {disk_path} 的大小字符串为空或None。")
total_disk_mib = 0.0
if total_disk_mib <= 0.0:
QMessageBox.critical(self, "错误", f"无法获取磁盘 {disk_path} 的有效总大小。")
return
start_position_mib = 0.0
max_available_mib = total_disk_mib
if dev_data.get('children'):
logger.debug(f"磁盘 {disk_path} 存在现有分区,尝试计算下一个分区起始位置。")
# 假设 disk_ops.get_disk_free_space_info_mib 能够正确处理
calculated_start_mib, largest_free_space_mib = self.disk_ops.get_disk_free_space_info_mib(disk_path, total_disk_mib)
if calculated_start_mib is None or largest_free_space_mib is None:
QMessageBox.critical(self, "错误", f"无法确定磁盘 {disk_path} 的分区起始位置。")
return
start_position_mib = calculated_start_mib
max_available_mib = largest_free_space_mib
if max_available_mib < 0:
max_available_mib = 0.0
else:
logger.debug(f"磁盘 {disk_path} 没有现有分区,假定从 0 MiB 开始,最大可用空间为整个磁盘。")
max_available_mib = max(0.0, total_disk_mib - 1.0) # 留一点空间,避免边界问题
start_position_mib = 1.0 # 现代分区表通常从1MB或更大偏移开始
logger.debug(f"磁盘 {disk_path} 没有现有分区,假定从 {start_position_mib} MiB 开始,最大可用空间为 {max_available_mib} MiB。")
dialog = CreatePartitionDialog(self, disk_path, total_disk_mib, max_available_mib)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_partition_info()
if info:
if self.disk_ops.create_partition(
info['disk_path'],
info['partition_table_type'],
info['size_gb'],
info['total_disk_mib'],
info['use_max_space']
):
self.refresh_all_info()
def _handle_mount(self, device_path):
dialog = MountDialog(self, device_path)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_mount_info()
if info:
if self.disk_ops.mount_partition(device_path, info['mount_point'], info['add_to_fstab']):
self.refresh_all_info()
def _unmount_and_refresh(self, device_path):
if self.disk_ops.unmount_partition(device_path, show_dialog_on_error=True):
self.refresh_all_info()
def _handle_delete_partition(self, device_path):
if self.disk_ops.delete_partition(device_path):
self.refresh_all_info()
def _handle_format_partition(self, device_path):
if self.disk_ops.format_partition(device_path):
self.refresh_all_info()
# --- RAID 管理 Tab ---
def refresh_raid_info(self):
self.ui.treeWidget_raid.clear()
raid_headers = [
"阵列设备", "级别", "状态", "大小", "活动设备", "失败设备", "备用设备",
"总设备数", "UUID", "名称", "Chunk Size", "挂载点"
]
self.ui.treeWidget_raid.setColumnCount(len(raid_headers))
self.ui.treeWidget_raid.setHeaderLabels(raid_headers)
for i in range(len(raid_headers)):
self.ui.treeWidget_raid.header().setSectionResizeMode(i, QHeaderView.ResizeToContents)
try:
raid_arrays = self.system_manager.get_mdadm_arrays()
if not raid_arrays:
item = QTreeWidgetItem(self.ui.treeWidget_raid)
item.setText(0, "未找到RAID阵列。")
logger.info("未找到RAID阵列。")
return
for array in raid_arrays:
array_item = QTreeWidgetItem(self.ui.treeWidget_raid)
array_path = array.get('device', 'N/A')
if not array_path or array_path == 'N/A':
logger.warning(f"RAID阵列 '{array.get('name', '未知')}' 的设备路径无效,跳过。")
continue
current_mount_point = self.system_manager.get_mountpoint_for_device(array_path)
array_item.setText(0, array_path)
array_item.setText(1, array.get('level', 'N/A'))
array_item.setText(2, array.get('state', 'N/A'))
array_item.setText(3, array.get('array_size', 'N/A'))
array_item.setText(4, array.get('active_devices', 'N/A'))
array_item.setText(5, array.get('failed_devices', 'N/A'))
array_item.setText(6, array.get('spare_devices', 'N/A'))
array_item.setText(7, array.get('total_devices', 'N/A'))
array_item.setText(8, array.get('uuid', 'N/A'))
array_item.setText(9, array.get('name', 'N/A'))
array_item.setText(10, array.get('chunk_size', 'N/A'))
array_item.setText(11, current_mount_point if current_mount_point else "")
array_item.setExpanded(True)
array_data_for_context = array.copy()
array_data_for_context['device'] = array_path
array_item.setData(0, Qt.UserRole, array_data_for_context)
for member in array.get('member_devices', []):
member_item = QTreeWidgetItem(array_item)
member_item.setText(0, f" {member.get('device_path', 'N/A')}")
member_item.setText(1, f"成员: {member.get('raid_device', 'N/A')}")
member_item.setText(2, member.get('state', 'N/A'))
member_item.setText(3, f"Major: {member.get('major', 'N/A')}, Minor: {member.get('minor', 'N/A')}")
for i in range(len(raid_headers)):
self.ui.treeWidget_raid.resizeColumnToContents(i)
logger.info("RAID阵列信息刷新成功。")
except Exception as e:
QMessageBox.critical(self, "错误", f"刷新RAID阵列信息失败: {e}")
logger.error(f"刷新RAID阵列信息失败: {e}")
def show_raid_context_menu(self, pos: QPoint):
item = self.ui.treeWidget_raid.itemAt(pos)
menu = QMenu(self)
create_raid_action = menu.addAction("创建 RAID 阵列...")
create_raid_action.triggered.connect(self._handle_create_raid_array)
menu.addSeparator()
if item and item.parent() is None:
array_data = item.data(0, Qt.UserRole)
if not array_data:
logger.warning(f"无法获取 RAID 阵列 {item.text(0)} 的详细数据。")
return
array_path = array_data.get('device')
member_devices = [m.get('device_path') for m in array_data.get('member_devices', [])]
if not array_path or array_path == 'N/A':
logger.warning(f"RAID阵列 '{array_data.get('name', '未知')}' 的设备路径无效,无法显示操作。")
return
current_mount_point = self.system_manager.get_mountpoint_for_device(array_path)
if current_mount_point and current_mount_point != '[SWAP]' and current_mount_point != '':
unmount_action = menu.addAction(f"卸载 {array_path} ({current_mount_point})")
unmount_action.triggered.connect(lambda: self._unmount_and_refresh(array_path))
else:
mount_action = menu.addAction(f"挂载 {array_path}...")
mount_action.triggered.connect(lambda: self._handle_mount(array_path))
menu.addSeparator()
stop_action = menu.addAction(f"停止阵列 {array_path}")
stop_action.triggered.connect(lambda: self._handle_stop_raid_array(array_path))
delete_action = menu.addAction(f"删除阵列 {array_path}")
delete_action.triggered.connect(lambda: self._handle_delete_raid_array(array_path, member_devices))
format_action = menu.addAction(f"格式化阵列 {array_path}...")
format_action.triggered.connect(lambda: self._handle_format_raid_array(array_path))
if menu.actions():
menu.exec(self.ui.treeWidget_raid.mapToGlobal(pos))
else:
logger.info("右键点击了空白区域或没有可用的RAID操作。")
def _handle_create_raid_array(self):
available_devices = self.system_manager.get_unallocated_partitions()
if not available_devices:
QMessageBox.warning(self, "警告", "没有可用于创建 RAID 阵列的设备。请确保有未挂载、未被LVM或RAID使用的磁盘或分区。")
return
dialog = CreateRaidDialog(self, available_devices)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_raid_info()
if info:
if self.raid_ops.create_raid_array(info['devices'], info['level'], info['chunk_size']):
self.refresh_all_info()
def _handle_stop_raid_array(self, array_path):
if self.raid_ops.stop_raid_array(array_path):
self.refresh_all_info()
def _handle_delete_raid_array(self, array_path, member_devices):
if self.raid_ops.delete_raid_array(array_path, member_devices):
self.refresh_all_info()
def _handle_format_raid_array(self, array_path):
if self.disk_ops.format_partition(array_path):
self.refresh_all_info()
# --- LVM 管理 Tab ---
def refresh_lvm_info(self):
self.ui.treeWidget_lvm.clear()
lvm_headers = ["名称", "大小", "属性", "UUID", "关联", "空闲/已用", "路径/格式", "挂载点"]
self.ui.treeWidget_lvm.setColumnCount(len(lvm_headers))
self.ui.treeWidget_lvm.setHeaderLabels(lvm_headers)
for i in range(len(lvm_headers)):
self.ui.treeWidget_lvm.header().setSectionResizeMode(i, QHeaderView.ResizeToContents)
try:
lvm_data = self.system_manager.get_lvm_info()
if not lvm_data.get('pvs') and not lvm_data.get('vgs') and not lvm_data.get('lvs'):
item = QTreeWidgetItem(self.ui.treeWidget_lvm)
item.setText(0, "未找到LVM信息。")
logger.info("未找到LVM信息。")
return
# 物理卷 (PVs)
pv_root_item = QTreeWidgetItem(self.ui.treeWidget_lvm)
pv_root_item.setText(0, "物理卷 (PVs)")
pv_root_item.setExpanded(True)
pv_root_item.setData(0, Qt.UserRole, {'type': 'pv_root'})
if lvm_data.get('pvs'):
for pv in lvm_data['pvs']:
pv_item = QTreeWidgetItem(pv_root_item)
pv_name = pv.get('pv_name', 'N/A')
if pv_name.startswith('/dev/'):
pv_path = pv_name
else:
pv_path = f"/dev/{pv_name}" if pv_name != 'N/A' else 'N/A'
pv_item.setText(0, pv_name)
pv_item.setText(1, pv.get('pv_size', 'N/A'))
pv_item.setText(2, pv.get('pv_attr', 'N/A'))
pv_item.setText(3, pv.get('pv_uuid', 'N/A'))
pv_item.setText(4, f"VG: {pv.get('vg_name', 'N/A')}")
pv_item.setText(5, f"空闲: {pv.get('pv_free', 'N/A')}")
pv_item.setText(6, pv.get('pv_fmt', 'N/A'))
pv_item.setText(7, "")
pv_data_for_context = pv.copy()
pv_data_for_context['pv_name'] = pv_path
pv_item.setData(0, Qt.UserRole, {'type': 'pv', 'data': pv_data_for_context})
else:
item = QTreeWidgetItem(pv_root_item)
item.setText(0, "未找到物理卷。")
# 卷组 (VGs)
vg_root_item = QTreeWidgetItem(self.ui.treeWidget_lvm)
vg_root_item.setText(0, "卷组 (VGs)")
vg_root_item.setExpanded(True)
vg_root_item.setData(0, Qt.UserRole, {'type': 'vg_root'})
if lvm_data.get('vgs'):
for vg in lvm_data['vgs']:
vg_item = QTreeWidgetItem(vg_root_item)
vg_name = vg.get('vg_name', 'N/A')
vg_item.setText(0, vg_name)
vg_item.setText(1, vg.get('vg_size', 'N/A'))
vg_item.setText(2, vg.get('vg_attr', 'N/A'))
vg_item.setText(3, vg.get('vg_uuid', 'N/A'))
vg_item.setText(4, f"PVs: {vg.get('pv_count', 'N/A')}, LVs: {vg.get('lv_count', 'N/A')}")
vg_item.setText(5, f"空闲: {vg.get('vg_free', 'N/A')}, 已分配: {vg.get('vg_alloc_percent', 'N/A')}%")
vg_item.setText(6, vg.get('vg_fmt', 'N/A'))
vg_item.setText(7, "")
vg_data_for_context = vg.copy()
vg_data_for_context['vg_name'] = vg_name
vg_item.setData(0, Qt.UserRole, {'type': 'vg', 'data': vg_data_for_context})
else:
item = QTreeWidgetItem(vg_root_item)
item.setText(0, "未找到卷组。")
# 逻辑卷 (LVs)
lv_root_item = QTreeWidgetItem(self.ui.treeWidget_lvm)
lv_root_item.setText(0, "逻辑卷 (LVs)")
lv_root_item.setExpanded(True)
lv_root_item.setData(0, Qt.UserRole, {'type': 'lv_root'})
if lvm_data.get('lvs'):
for lv in lvm_data['lvs']:
lv_item = QTreeWidgetItem(lv_root_item)
lv_name = lv.get('lv_name', 'N/A')
vg_name = lv.get('vg_name', 'N/A')
lv_attr = lv.get('lv_attr', '')
lv_path = lv.get('lv_path')
if not lv_path or lv_path == 'N/A':
if vg_name != 'N/A' and lv_name != 'N/A':
lv_path = f"/dev/{vg_name}/{lv_name}"
else:
lv_path = 'N/A'
current_mount_point = self.system_manager.get_mountpoint_for_device(lv_path)
lv_item.setText(0, lv_name)
lv_item.setText(1, lv.get('lv_size', 'N/A'))
lv_item.setText(2, lv_attr)
lv_item.setText(3, lv.get('lv_uuid', 'N/A'))
lv_item.setText(4, f"VG: {vg_name}, Origin: {lv.get('origin', 'N/A')}")
lv_item.setText(5, f"快照: {lv.get('snap_percent', 'N/A')}%")
lv_item.setText(6, lv_path)
lv_item.setText(7, current_mount_point if current_mount_point else "")
lv_data_for_context = lv.copy()
lv_data_for_context['lv_path'] = lv_path
lv_data_for_context['lv_name'] = lv_name
lv_data_for_context['vg_name'] = vg_name
lv_data_for_context['lv_attr'] = lv_attr
lv_item.setData(0, Qt.UserRole, {'type': 'lv', 'data': lv_data_for_context})
else:
item = QTreeWidgetItem(lv_root_item)
item.setText(0, "未找到逻辑卷。")
for i in range(len(lvm_headers)):
self.ui.treeWidget_lvm.resizeColumnToContents(i)
logger.info("LVM信息刷新成功。")
except Exception as e:
QMessageBox.critical(self, "错误", f"刷新LVM信息失败: {e}")
logger.error(f"刷新LVM信息失败: {e}")
def show_lvm_context_menu(self, pos: QPoint):
item = self.ui.treeWidget_lvm.itemAt(pos)
menu = QMenu(self)
create_menu = QMenu("创建...", self)
create_pv_action = create_menu.addAction("创建物理卷 (PV)...")
create_pv_action.triggered.connect(self._handle_create_pv)
create_vg_action = create_menu.addAction("创建卷组 (VG)...")
create_vg_action.triggered.connect(self._handle_create_vg)
create_lv_action = create_menu.addAction("创建逻辑卷 (LV)...")
create_lv_action.triggered.connect(self._handle_create_lv)
menu.addMenu(create_menu)
menu.addSeparator()
if item:
item_data = item.data(0, Qt.UserRole)
if not item_data:
logger.warning(f"无法获取 LVM 项 {item.text(0)} 的详细数据。")
return
item_type = item_data.get('type')
data = item_data.get('data', {})
if item_type == 'pv':
pv_name = data.get('pv_name')
if pv_name and pv_name != 'N/A':
delete_pv_action = menu.addAction(f"删除物理卷 {pv_name}")
delete_pv_action.triggered.connect(lambda: self._handle_delete_pv(pv_name))
elif item_type == 'vg':
vg_name = data.get('vg_name')
if vg_name and vg_name != 'N/A':
delete_vg_action = menu.addAction(f"删除卷组 {vg_name}")
delete_vg_action.triggered.connect(lambda: self._handle_delete_vg(vg_name))
elif item_type == 'lv':
lv_name = data.get('lv_name')
vg_name = data.get('vg_name')
lv_attr = data.get('lv_attr', '')
lv_path = data.get('lv_path')
if lv_name and vg_name and lv_path and lv_path != 'N/A':
if 'a' in lv_attr:
deactivate_lv_action = menu.addAction(f"停用逻辑卷 {lv_name}")
deactivate_lv_action.triggered.connect(lambda: self._handle_deactivate_lv(lv_name, vg_name))
else:
activate_lv_action = menu.addAction(f"激活逻辑卷 {lv_name}")
activate_lv_action.triggered.connect(lambda: self._handle_activate_lv(lv_name, vg_name))
current_mount_point = self.system_manager.get_mountpoint_for_device(lv_path)
if current_mount_point and current_mount_point != '[SWAP]' and current_mount_point != '':
unmount_lv_action = menu.addAction(f"卸载 {lv_name} ({current_mount_point})")
unmount_lv_action.triggered.connect(lambda: self._unmount_and_refresh(lv_path))
else:
mount_lv_action = menu.addAction(f"挂载 {lv_name}...")
mount_lv_action.triggered.connect(lambda: self._handle_mount(lv_path))
menu.addSeparator()
delete_lv_action = menu.addAction(f"删除逻辑卷 {lv_name}")
delete_lv_action.triggered.connect(lambda: self._handle_delete_lv(lv_name, vg_name))
format_lv_action = menu.addAction(f"格式化逻辑卷 {lv_name}...")
format_lv_action.triggered.connect(lambda: self._handle_format_partition(lv_path))
else:
logger.warning(f"逻辑卷 '{lv_name}' (VG: {vg_name}) 的路径无效无法显示操作。Lv Path: {lv_path}")
if menu.actions():
menu.exec(self.ui.treeWidget_lvm.mapToGlobal(pos))
else:
logger.info("右键点击了空白区域或没有可用的LVM操作。")
def _handle_create_pv(self):
available_partitions = self.system_manager.get_unallocated_partitions()
if not available_partitions:
QMessageBox.warning(self, "警告", "没有可用于创建物理卷的未分配分区。")
return
dialog = CreatePvDialog(self, available_partitions)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_pv_info()
if info:
if self.lvm_ops.create_pv(info['device_path']):
self.refresh_all_info()
def _handle_delete_pv(self, device_path):
if self.lvm_ops.delete_pv(device_path):
self.refresh_all_info()
def _handle_create_vg(self):
lvm_info = self.system_manager.get_lvm_info()
available_pvs = []
for pv in lvm_info.get('pvs', []):
pv_name = pv.get('pv_name')
if pv_name and pv_name != 'N/A' and not pv.get('vg_name'):
if pv_name.startswith('/dev/'):
available_pvs.append(pv_name)
else:
available_pvs.append(f"/dev/{pv_name}")
if not available_pvs:
QMessageBox.warning(self, "警告", "没有可用于创建卷组的物理卷。请确保有未分配给任何卷组的物理卷。")
return
dialog = CreateVgDialog(self, available_pvs)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_vg_info()
if info:
if self.lvm_ops.create_vg(info['vg_name'], info['pvs']):
self.refresh_all_info()
def _handle_create_lv(self):
lvm_info = self.system_manager.get_lvm_info()
available_vgs = []
vg_sizes = {}
for vg in lvm_info.get('vgs', []):
vg_name = vg.get('vg_name')
if vg_name and vg_name != 'N/A':
available_vgs.append(vg_name)
free_size_str = vg.get('vg_free', '0B').strip()
current_vg_size_gb = 0.0
match = re.match(r'(\d+\.?\d*)\s*([gmktb])?', free_size_str, re.IGNORECASE)
if match:
value = float(match.group(1))
unit = match.group(2).lower() if match.group(2) else ''
if unit == 'k':
current_vg_size_gb = value / (1024 * 1024)
elif unit == 'm':
current_vg_size_gb = value / 1024
elif unit == 'g':
current_vg_size_gb = value
elif unit == 't':
current_vg_size_gb = value * 1024
elif unit == 'b' or unit == '':
current_vg_size_gb = value / (1024 * 1024 * 1024)
else:
logger.warning(f"未知LVM单位: '{unit}' for '{free_size_str}'")
else:
logger.warning(f"无法解析LVM空闲大小字符串: '{free_size_str}'")
vg_sizes[vg_name] = current_vg_size_gb
if not available_vgs:
QMessageBox.warning(self, "警告", "没有可用于创建逻辑卷的卷组。")
return
dialog = CreateLvDialog(self, available_vgs, vg_sizes)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_lv_info()
if info:
if self.lvm_ops.create_lv(info['lv_name'], info['vg_name'], info['size_gb'], info['use_max_space']):
self.refresh_all_info()
def _handle_delete_lv(self, lv_name, vg_name):
lv_path = f"/dev/{vg_name}/{lv_name}"
self.disk_ops.unmount_partition(lv_path, show_dialog_on_error=False)
self.disk_ops._remove_fstab_entry(lv_path)
if self.lvm_ops.delete_lv(lv_name, vg_name):
self.refresh_all_info()
def _handle_activate_lv(self, lv_name, vg_name):
if self.lvm_ops.activate_lv(lv_name, vg_name):
self.refresh_all_info()
def _handle_deactivate_lv(self, lv_name, vg_name):
if self.lvm_ops.deactivate_lv(lv_name, vg_name):
self.refresh_all_info()
def _handle_delete_vg(self, vg_name):
"""
处理删除卷组 (VG) 的操作。
"""
logger.info(f"尝试删除卷组 (VG) {vg_name}。")
reply = QMessageBox.question(
self,
"确认删除卷组",
f"您确定要删除卷组 {vg_name} 吗?此操作将永久删除该卷组及其所有逻辑卷和数据!",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.No:
logger.info(f"用户取消了删除卷组 {vg_name} 的操作。")
return
lvm_info = self.system_manager.get_lvm_info()
vg_info = next((vg for vg in lvm_info.get('vgs', []) if vg.get('vg_name') == vg_name), None)
lv_count_raw = vg_info.get('lv_count', 0) if vg_info else 0
try:
lv_count = int(float(lv_count_raw))
except (ValueError, TypeError):
lv_count = 0
if lv_count > 0:
QMessageBox.critical(self, "删除失败", f"卷组 {vg_name} 中仍包含逻辑卷。请先删除所有逻辑卷。")
logger.error(f"尝试删除包含逻辑卷的卷组 {vg_name}。操作被阻止。")
return
success = self.lvm_ops.delete_vg(vg_name)
if success:
self.refresh_all_info()
else:
QMessageBox.critical(self, "删除卷组失败", f"删除卷组 {vg_name} 失败,请检查日志。")
if __name__ == "__main__":
app = QApplication(sys.argv)
widget = MainWindow()
widget.show()
sys.exit(app.exec())

View File

@@ -1,9 +1,12 @@
# raid_operations.py # raid_operations.py
import logging import logging
import subprocess import subprocess
from PySide6.QtWidgets import QMessageBox from PySide6.QtWidgets import QMessageBox, QInputDialog, QLineEdit # 导入 QInputDialog 和 QLineEdit
from PySide6.QtCore import Qt
from system_info import SystemInfoManager from system_info import SystemInfoManager
import re # 导入正则表达式模块 import re
import pexpect # <--- 新增导入 pexpect
import sys
logger = logging.getLogger(__name__) logger = logging.getLogger(__name__)
@@ -11,14 +14,16 @@ class RaidOperations:
def __init__(self): def __init__(self):
self.system_manager = SystemInfoManager() self.system_manager = SystemInfoManager()
def _execute_shell_command(self, command_list, error_msg_prefix, suppress_critical_dialog_on_stderr_match=None, input_to_command=None): # <--- 添加 input_to_command 参数 def _execute_shell_command(self, command_list, error_msg_prefix, suppress_critical_dialog_on_stderr_match=None, input_to_command=None):
""" """
执行一个shell命令并返回stdout和stderr。 执行一个shell命令并返回stdout和stderr。
这个方法包装了 SystemInfoManager._run_command并统一处理日志和错误消息框。 这个方法包装了 SystemInfoManager._run_command并统一处理日志和错误消息框。
它会通过 SystemInfoManager 自动处理 sudo。
:param command_list: 命令及其参数的列表。 :param command_list: 命令及其参数的列表。
:param error_msg_prefix: 命令失败时显示给用户的错误消息前缀。 :param error_msg_prefix: 命令失败时显示给用户的错误消息前缀。
:param suppress_critical_dialog_on_stderr_match: 一个字符串,如果在 stderr 中找到此字符串, :param suppress_critical_dialog_on_stderr_match: 一个字符串或字符串元组。
如果在 stderr 中找到此字符串(或元组中的任一字符串),
则在 CalledProcessError 发生时,只记录日志,不弹出 QMessagebox.critical。 则在 CalledProcessError 发生时,只记录日志,不弹出 QMessagebox.critical。
:param input_to_command: 传递给命令stdin的数据 (str)。 :param input_to_command: 传递给命令stdin的数据 (str)。
:return: (bool success, str stdout, str stderr) :return: (bool success, str stdout, str stderr)
@@ -27,11 +32,12 @@ class RaidOperations:
logger.info(f"执行命令: {full_cmd_str}") logger.info(f"执行命令: {full_cmd_str}")
try: try:
# SystemInfoManager._run_command 应该负责在 root_privilege=True 时添加 sudo
stdout, stderr = self.system_manager._run_command( stdout, stderr = self.system_manager._run_command(
command_list, command_list,
root_privilege=True, root_privilege=True, # 假设所有 RAID 操作都需要 root 权限
check_output=True, # RAID操作通常需要检查输出 check_output=True,
input_data=input_to_command # <--- 将 input_to_command 传递给 _run_command input_data=input_to_command
) )
if stdout: logger.debug(f"命令 '{full_cmd_str}' 标准输出:\n{stdout.strip()}") if stdout: logger.debug(f"命令 '{full_cmd_str}' 标准输出:\n{stdout.strip()}")
@@ -39,18 +45,31 @@ class RaidOperations:
return True, stdout, stderr return True, stdout, stderr
except subprocess.CalledProcessError as e: except subprocess.CalledProcessError as e:
stderr_output = e.stderr.strip() # 获取标准错误输出 stderr_output = e.stderr.strip()
logger.error(f"{error_msg_prefix} 命令: {full_cmd_str}") logger.error(f"{error_msg_prefix} 命令: {full_cmd_str}")
logger.error(f"退出码: {e.returncode}") logger.error(f"退出码: {e.returncode}")
logger.error(f"标准输出: {e.stdout.strip()}") logger.error(f"标准输出: {e.stdout.strip()}")
logger.error(f"标准错误: {stderr_output}") logger.error(f"标准错误: {stderr_output}")
# 根据 suppress_critical_dialog_on_stderr_match 参数决定是否弹出错误对话框 # --- 修改开始 ---
if suppress_critical_dialog_on_stderr_match and suppress_critical_dialog_on_stderr_match in stderr_output: should_suppress_dialog = False
logger.info(f"特定错误 '{suppress_critical_dialog_on_stderr_match}' 匹配,已抑制错误对话框。") if suppress_critical_dialog_on_stderr_match:
if isinstance(suppress_critical_dialog_on_stderr_match, str):
if suppress_critical_dialog_on_stderr_match in stderr_output:
should_suppress_dialog = True
elif isinstance(suppress_critical_dialog_on_stderr_match, (list, tuple)):
for pattern in suppress_critical_dialog_on_stderr_match:
if pattern in stderr_output:
should_suppress_dialog = True
break # 找到一个匹配就足够了
if should_suppress_dialog:
logger.info(f"特定错误 '{stderr_output}' 匹配抑制条件,已抑制错误对话框。")
else: else:
QMessageBox.critical(None, "错误", f"{error_msg_prefix}\n详细信息: {stderr_output}") QMessageBox.critical(None, "错误", f"{error_msg_prefix}\n详细信息: {stderr_output}")
return False, e.stdout, stderr_output # 返回 stderr_output # --- 修改结束 ---
return False, e.stdout, stderr_output
except FileNotFoundError: except FileNotFoundError:
logger.error(f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。") logger.error(f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。")
QMessageBox.critical(None, "错误", f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。") QMessageBox.critical(None, "错误", f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。")
@@ -60,6 +79,143 @@ class RaidOperations:
QMessageBox.critical(None, "错误", f"{error_msg_prefix}\n未知错误: {e}") QMessageBox.critical(None, "错误", f"{error_msg_prefix}\n未知错误: {e}")
return False, "", str(e) return False, "", str(e)
def _execute_interactive_mdadm_command(self, command_list, error_msg_prefix):
"""
专门处理 mdadm --create 命令的交互式执行。
通过 pexpect 监听提示并弹出 QMessageBox 让用户选择。
"""
# pexpect.spawn 需要一个字符串命令,并且 sudo 应该包含在内
# 假设 SystemInfoManager._run_command 在 root_privilege=True 时会添加 sudo
# 但 pexpect.spawn 需要完整的命令字符串,所以这里手动添加 sudo
full_cmd_str = "sudo " + ' '.join(command_list)
logger.info(f"执行交互式命令: {full_cmd_str}")
stdout_buffer = []
try:
# 增加 timeout因为用户交互可能需要时间
child = pexpect.spawn(full_cmd_str, encoding='utf-8', timeout=300) # 增加超时到 5 分钟
child.logfile = sys.stdout
# --- 1. 优先处理 sudo 密码提示 ---
try:
# 尝试匹配 sudo 密码提示,设置较短的超时,如果没出现就继续
index = child.expect(['[pP]assword for', pexpect.TIMEOUT], timeout=5)
if index == 0: # 匹配到密码提示
stdout_buffer.append(child.before) # 捕获提示前的输出
password, ok = QInputDialog.getText(
None, "Sudo 密码", "请输入您的 sudo 密码:", QLineEdit.Password
)
if not ok or not password:
QMessageBox.critical(None, "错误", "sudo 密码未提供或取消。")
child.close()
return False, "".join(stdout_buffer), "Sudo 密码未提供或取消。"
child.sendline(password)
stdout_buffer.append(child.after) # 捕获发送密码后的输出
logger.info("已发送 sudo 密码。")
elif index == 1: # 超时,未出现密码提示 (可能是 NOPASSWD 或已缓存)
logger.info("Sudo 未提示输入密码(可能已配置 NOPASSWD 或密码已缓存)。")
stdout_buffer.append(child.before) # 捕获任何初始输出
except pexpect.exceptions.EOF:
stdout_buffer.append(child.before)
logger.error("命令在 sudo 密码提示前退出。")
child.close()
return False, "".join(stdout_buffer), "命令在 sudo 密码提示前退出。"
except pexpect.exceptions.TIMEOUT:
# 再次捕获超时,确保日志记录
logger.info("Sudo 密码提示超时,可能已配置 NOPASSWD 或密码已缓存。")
stdout_buffer.append(child.before)
# 定义交互式提示及其处理方式
prompts_data = [ # 重命名为 prompts_data 以避免与循环变量混淆
(r"To optimalize recovery speed, .* write-(?:intent|indent) bitmap, do you want to enable it now\? \[y/N\]\s*",
"mdadm 建议启用写入意图位图以优化恢复速度。您希望现在启用它吗?"),
(r"Continue creating array \[y/N\]\s*",
"mdadm 警告:检测到驱动器大小不一致或分区表。创建阵列将覆盖数据。是否仍要继续创建阵列?")
]
# 提取所有模式,以便 pexpect.expect 可以同时监听它们
patterns_to_expect = [p for p, _ in prompts_data]
# 循环处理所有可能的提示,直到没有更多提示或命令结束
while True:
try:
# 尝试匹配任何一个预定义的提示
# pexpect 会等待直到匹配到其中一个模式,或者超时,或者遇到 EOF
index = child.expect(patterns_to_expect, timeout=10) # 每次等待10秒
stdout_buffer.append(child.before) # 捕获到提示前的输出
# 获取匹配到的提示的描述
matched_description = prompts_data[index][1]
# 弹出对话框询问用户
reply = QMessageBox.question(None, "mdadm 提示", matched_description,
QMessageBox.Yes | QMessageBox.No, QMessageBox.Yes)
if reply == QMessageBox.Yes:
user_choice = 'y'
logger.info(f"用户对提示 '{matched_description}' 选择 ''")
else:
user_choice = 'n'
logger.info(f"用户对提示 '{matched_description}' 选择 ''")
child.sendline(user_choice)
stdout_buffer.append(user_choice + '\n') # 记录用户输入
# 等待一小段时间,确保 mdadm 接收并处理了输入,并清空缓冲区
# 避免在下一次 expect 之前,旧的输出再次被匹配
child.expect(pexpect.TIMEOUT, timeout=1)
except pexpect.exceptions.TIMEOUT:
# 如果在等待任何提示时超时,说明没有更多提示了,或者命令已完成。
logger.debug("在等待 mdadm 提示时超时,可能没有更多交互。")
stdout_buffer.append(child.before) # 捕获当前为止的输出
break # 跳出 while 循环,进入等待 EOF 阶段
except pexpect.exceptions.EOF:
# 如果在等待提示时遇到 EOF说明命令已经执行完毕或失败
logger.warning("在等待 mdadm 提示时遇到 EOF。")
stdout_buffer.append(child.before)
break # 跳出 while 循环,进入等待 EOF 阶段
try:
# 增加一个合理的超时时间,以防 mdadm 在后台进行长时间初始化
child.expect(pexpect.EOF, timeout=120) # 例如,等待 120 秒
stdout_buffer.append(child.before) # 捕获命令结束前的任何剩余输出
logger.info("mdadm 命令已正常结束 (通过 EOF 捕获)。")
except pexpect.exceptions.TIMEOUT:
stdout_buffer.append(child.before)
logger.error(f"mdadm 命令在所有交互完成后,等待 EOF 超时。")
QMessageBox.critical(None, "错误", f"{error_msg_prefix}\n详细信息: mdadm 命令在交互完成后未正常退出,等待 EOF 超时。")
child.close()
return False, "".join(stdout_buffer), "mdadm 命令等待 EOF 超时。"
except pexpect.exceptions.EOF:
stdout_buffer.append(child.before)
logger.info("mdadm 命令已正常结束 (通过 EOF 捕获)。")
child.close()
final_output = "".join(stdout_buffer)
if child.exitstatus == 0:
logger.info(f"交互式命令 {full_cmd_str} 成功完成。")
return True, final_output, "" # pexpect 很难区分 stdout/stderr都视为 stdout
else:
logger.error(f"交互式命令 {full_cmd_str} 失败,退出码: {child.exitstatus}")
QMessageBox.critical(None, "错误", f"{error_msg_prefix}\n详细信息: 命令执行失败,退出码 {child.exitstatus}\n输出: {final_output}")
return False, final_output, f"命令执行失败,退出码 {child.exitstatus}"
except pexpect.exceptions.ExceptionPexpect as e:
logger.error(f"pexpect 内部错误: {e}")
QMessageBox.critical(None, "错误", f"{error_msg_prefix}\n详细信息: pexpect 内部错误: {e}")
return False, "", str(e)
except Exception as e:
logger.error(f"执行交互式命令 {full_cmd_str} 时发生未知错误: {e}")
QMessageBox.critical(None, "错误", f"{error_msg_prefix}\n未知错误: {e}")
return False, "", str(e)
def _get_next_available_md_device_name(self): def _get_next_available_md_device_name(self):
""" """
查找下一个可用的 /dev/mdX 设备名称(例如 /dev/md0, /dev/md1, ...)。 查找下一个可用的 /dev/mdX 设备名称(例如 /dev/md0, /dev/md1, ...)。
@@ -70,7 +226,6 @@ class RaidOperations:
for array in raid_arrays: for array in raid_arrays:
device_path = array.get('device') device_path = array.get('device')
if device_path and device_path.startswith('/dev/md'): if device_path and device_path.startswith('/dev/md'):
# 匹配 /dev/mdX 形式的设备名
match = re.match(r'/dev/md(\d+)', device_path) match = re.match(r'/dev/md(\d+)', device_path)
if match: if match:
existing_md_numbers.add(int(match.group(1))) existing_md_numbers.add(int(match.group(1)))
@@ -94,7 +249,7 @@ class RaidOperations:
if isinstance(level, int): if isinstance(level, int):
level_str = f"raid{level}" level_str = f"raid{level}"
elif isinstance(level, str): elif isinstance(level, str):
if level in ["0", "1", "5", "6", "10"]: # 增加 "6", "10" 确保全面 if level in ["0", "1", "5", "6", "10"]:
level_str = f"raid{level}" level_str = f"raid{level}"
else: else:
level_str = level level_str = level
@@ -104,22 +259,21 @@ class RaidOperations:
level = level_str level = level_str
# --- 标准化 RAID 级别输入结束 --- # --- 标准化 RAID 级别输入结束 ---
if not devices or len(devices) < 2: if not devices or len(devices) < 1: # RAID0 理论上可以一个设备,但通常至少两个
QMessageBox.critical(None, "错误", "创建 RAID 阵列至少需要两个成员设备") QMessageBox.critical(None, "错误", "创建 RAID 阵列至少需要一个设备RAID0")
return False return False
# 检查 RAID 5 至少需要 3 个设备 # 检查其他 RAID 级别所需的设备数量
if level == "raid5" and len(devices) < 3:
QMessageBox.critical(None, "错误", "RAID5 至少需要三个设备。")
return False
# 补充其他 RAID 级别设备数量检查,与 dialogs.py 保持一致
if level == "raid1" and len(devices) < 2: if level == "raid1" and len(devices) < 2:
QMessageBox.critical(None, "错误", "RAID1 至少需要两个设备。") QMessageBox.critical(None, "错误", "RAID1 至少需要两个设备。")
return False return False
if level == "raid5" and len(devices) < 3:
QMessageBox.critical(None, "错误", "RAID5 至少需要三个设备。")
return False
if level == "raid6" and len(devices) < 4: if level == "raid6" and len(devices) < 4:
QMessageBox.critical(None, "错误", "RAID6 至少需要四个设备。") QMessageBox.critical(None, "错误", "RAID6 至少需要四个设备。")
return False return False
if level == "raid10" and len(devices) < 2: if level == "raid10" and len(devices) < 2: # RAID10 至少需要 2 个设备 (例如 1+0)
QMessageBox.critical(None, "错误", "RAID10 至少需要两个设备。") QMessageBox.critical(None, "错误", "RAID10 至少需要两个设备。")
return False return False
@@ -137,10 +291,7 @@ class RaidOperations:
# 1. 清除设备上的旧 RAID 超级块(如果有) # 1. 清除设备上的旧 RAID 超级块(如果有)
for dev in devices: for dev in devices:
# 使用 --force 选项,避免交互式提示
clear_cmd = ["mdadm", "--zero-superblock", "--force", dev] clear_cmd = ["mdadm", "--zero-superblock", "--force", dev]
# 定义表示设备上没有超级块的错误信息,根据日志调整
no_superblock_error_match = "Unrecognised md component device" no_superblock_error_match = "Unrecognised md component device"
success, _, stderr = self._execute_shell_command( success, _, stderr = self._execute_shell_command(
@@ -149,7 +300,6 @@ class RaidOperations:
suppress_critical_dialog_on_stderr_match=no_superblock_error_match suppress_critical_dialog_on_stderr_match=no_superblock_error_match
) )
# 检查 stderr 是否包含“未识别的 MD 组件设备”信息
if no_superblock_error_match in stderr: if no_superblock_error_match in stderr:
logger.info(f"设备 {dev} 上未找到旧 RAID 超级块,已确保其干净。") logger.info(f"设备 {dev} 上未找到旧 RAID 超级块,已确保其干净。")
elif success: elif success:
@@ -159,41 +309,30 @@ class RaidOperations:
return False return False
# 2. 创建 RAID 阵列 # 2. 创建 RAID 阵列
# --- 修改点:自动生成唯一的阵列名称 ---
array_name = self._get_next_available_md_device_name() array_name = self._get_next_available_md_device_name()
logger.info(f"将使用自动生成的 RAID 阵列名称: {array_name}") logger.info(f"将使用自动生成的 RAID 阵列名称: {array_name}")
# --- 修改点结束 ---
if level == "raid0": create_cmd_base = ["mdadm", "--create", array_name, "--level=" + level.replace("raid", ""),
create_cmd = ["mdadm", "--create", array_name, "--level=raid0", f"--raid-devices={len(devices)}"]
f"--raid-devices={len(devices)}", f"--chunk={chunk_size}K"] + devices
elif level == "raid1": # 添加 chunk size (RAID0, RAID5, RAID6, RAID10 通常需要)
create_cmd = ["mdadm", "--create", array_name, "--level=raid1", if level in ["raid0", "raid5", "raid6", "raid10"]:
f"--raid-devices={len(devices)}"] + devices create_cmd_base.append(f"--chunk={chunk_size}K")
elif level == "raid5":
create_cmd = ["mdadm", "--create", array_name, "--level=raid5", create_cmd = create_cmd_base + devices
f"--raid-devices={len(devices)}"] + devices
elif level == "raid6": # 增加 RAID6
create_cmd = ["mdadm", "--create", array_name, "--level=raid6",
f"--raid-devices={len(devices)}", f"--chunk={chunk_size}K"] + devices
elif level == "raid10": # 增加 RAID10
create_cmd = ["mdadm", "--create", array_name, "--level=raid10",
f"--raid-devices={len(devices)}", f"--chunk={chunk_size}K"] + devices
else:
QMessageBox.critical(None, "错误", f"不支持的 RAID 级别: {level}")
return False
# 在 --create 命令中添加 --force 选项 # 在 --create 命令中添加 --force 选项
create_cmd.insert(2, "--force") create_cmd.insert(2, "--force")
success_create, stdout_create, stderr_create = self._execute_shell_command( # 调用新的交互式方法
success_create, stdout_create, stderr_create = self._execute_interactive_mdadm_command(
create_cmd, create_cmd,
f"创建 RAID 阵列失败", f"创建 RAID 阵列失败"
input_to_command='y\n' # 尝试通过 stdin 传递 'y'
) )
if not success_create: if not success_create:
# 检查是否是由于 "Array name ... is in use already." 导致的失败 # pexpect 捕获的输出可能都在 stdout_create 中
if "Array name" in stderr_create and "is in use already" in stderr_create: if "Array name" in stdout_create and "is in use already" in stdout_create:
QMessageBox.critical(None, "错误", f"创建 RAID 阵列失败:阵列名称 {array_name} 已被占用。请尝试停止或删除现有阵列。") QMessageBox.critical(None, "错误", f"创建 RAID 阵列失败:阵列名称 {array_name} 已被占用。请尝试停止或删除现有阵列。")
return False return False
@@ -204,9 +343,29 @@ class RaidOperations:
examine_scan_cmd = ["mdadm", "--examine", "--scan"] examine_scan_cmd = ["mdadm", "--examine", "--scan"]
success_scan, scan_stdout, _ = self._execute_shell_command(examine_scan_cmd, "扫描 mdadm 配置失败") success_scan, scan_stdout, _ = self._execute_shell_command(examine_scan_cmd, "扫描 mdadm 配置失败")
if success_scan: if success_scan:
append_to_conf_cmd = ["bash", "-c", f"echo '{scan_stdout.strip()}' >> /etc/mdadm/mdadm.conf"] # --- 新增:确保 /etc/mdadm 目录存在 ---
if not self._execute_shell_command(append_to_conf_cmd, "更新 /etc/mdadm/mdadm.conf 失败")[0]: mkdir_cmd = ["mkdir", "-p", "/etc/mdadm"]
success_mkdir, _, stderr_mkdir = self._execute_shell_command(
mkdir_cmd,
"创建 /etc/mdadm 目录失败"
)
if not success_mkdir:
logger.error(f"无法创建 /etc/mdadm 目录,更新 mdadm.conf 失败。错误: {stderr_mkdir}")
QMessageBox.critical(None, "错误", f"无法创建 /etc/mdadm 目录,更新 mdadm.conf 失败。\n详细信息: {stderr_mkdir}")
return False # 如果连目录都无法创建,则认为创建阵列失败,因为无法保证重启后自动识别
logger.info("已确保 /etc/mdadm 目录存在。")
# --- 新增结束 ---
# 这里需要确保 /etc/mdadm/mdadm.conf 存在且可写入
# _execute_shell_command 会自动添加 sudo
success_append, _, _ = self._execute_shell_command(
["bash", "-c", f"echo '{scan_stdout.strip()}' | tee -a /etc/mdadm/mdadm.conf > /dev/null"],
"更新 /etc/mdadm/mdadm.conf 失败"
)
if not success_append:
logger.warning("更新 /etc/mdadm/mdadm.conf 失败。") logger.warning("更新 /etc/mdadm/mdadm.conf 失败。")
else:
logger.info("已成功更新 /etc/mdadm/mdadm.conf。")
else: else:
logger.warning("未能扫描到 mdadm 配置,跳过更新 mdadm.conf。") logger.warning("未能扫描到 mdadm 配置,跳过更新 mdadm.conf。")
@@ -227,15 +386,14 @@ class RaidOperations:
return False return False
logger.info(f"尝试停止 RAID 阵列: {array_path}") logger.info(f"尝试停止 RAID 阵列: {array_path}")
# 尝试卸载阵列(如果已挂载),不显示错误对话框 # 尝试卸载阵列(如果已挂载),不显示错误对话框
# 注意:这里需要调用 disk_operations 的 unmount_partition # _execute_shell_command 会自动添加 sudo
# 由于 RaidOperations 不直接持有 DiskOperations 实例,需要通过某种方式获取或传递 self._execute_shell_command(
# 暂时先直接调用 umount 命令,不处理 fstab ["umount", array_path],
# 或者,更好的方式是让 MainWindow 调用 disk_ops.unmount_partition f"尝试卸载 {array_path} 失败",
# 此处简化处理,只执行 umount 命令 suppress_critical_dialog_on_stderr_match=("not mounted", "未挂载") # 修改这里,添加中文错误信息
# 这里的 suppress_critical_dialog_on_stderr_match 应该与 DiskOperations 中的定义保持一致 )
self._execute_shell_command(["umount", array_path], f"尝试卸载 {array_path} 失败",
suppress_critical_dialog_on_stderr_match="not mounted") # 仅抑制英文,因为这里没有访问 DiskOperations 的 already_unmounted_errors
if not self._execute_shell_command(["mdadm", "--stop", array_path], f"停止 RAID 阵列 {array_path} 失败")[0]: if not self._execute_shell_command(["mdadm", "--stop", array_path], f"停止 RAID 阵列 {array_path} 失败")[0]:
return False return False
@@ -269,10 +427,7 @@ class RaidOperations:
# 2. 清除成员设备上的超级块 # 2. 清除成员设备上的超级块
success_all_cleared = True success_all_cleared = True
for dev in member_devices: for dev in member_devices:
# 使用 --force 选项,避免交互式提示
clear_cmd = ["mdadm", "--zero-superblock", "--force", dev] clear_cmd = ["mdadm", "--zero-superblock", "--force", dev]
# 定义表示设备上没有超级块的错误信息,根据日志调整
no_superblock_error_match = "Unrecognised md component device" no_superblock_error_match = "Unrecognised md component device"
success, _, stderr = self._execute_shell_command( success, _, stderr = self._execute_shell_command(
@@ -287,7 +442,6 @@ class RaidOperations:
logger.info(f"已清除设备 {dev} 上的旧 RAID 超级块。") logger.info(f"已清除设备 {dev} 上的旧 RAID 超级块。")
else: else:
success_all_cleared = False success_all_cleared = False
# 错误对话框已由 _execute_shell_command 弹出,这里只记录警告
logger.warning(f"未能清除设备 {dev} 上的 RAID 超级块,请手动检查。") logger.warning(f"未能清除设备 {dev} 上的 RAID 超级块,请手动检查。")
if success_all_cleared: if success_all_cleared: