Files
diskmanager/disk_operations.py
2026-02-02 22:31:02 +08:00

512 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import subprocess
import logging
import re
import os
from PySide6.QtWidgets import QMessageBox, QInputDialog
from system_info import SystemInfoManager
logger = logging.getLogger(__name__)
class DiskOperations:
def __init__(self, system_manager: SystemInfoManager): # NEW: 接收 system_manager 实例
self.system_manager = system_manager
def _execute_shell_command(self, command_list, error_message, root_privilege=True,
suppress_critical_dialog_on_stderr_match=None, input_data=None):
"""
通用地运行一个 shell 命令,并处理错误。
:param command_list: 命令及其参数的列表。
:param error_message: 命令失败时显示给用户的错误消息。
:param root_privilege: 如果为 True则使用 sudo 执行命令。
:param suppress_critical_dialog_on_stderr_match: 如果 stderr 包含此字符串,则不显示关键错误对话框。
可以是字符串或字符串元组。
:param input_data: 传递给命令stdin的数据 (str)。
:return: (True/False, stdout_str, stderr_str)
"""
if not all(isinstance(arg, str) for arg in command_list):
logger.error(f"命令列表包含非字符串元素: {command_list}")
QMessageBox.critical(None, "错误", f"内部错误:尝试执行的命令包含无效参数。\n命令详情: {command_list}")
return False, "", "内部错误:命令参数类型不正确。"
if root_privilege:
command_list = ["sudo"] + command_list
full_cmd_str = ' '.join(command_list)
logger.debug(f"执行命令: {full_cmd_str}")
try:
result = subprocess.run(
command_list,
capture_output=True,
text=True,
check=True,
encoding='utf-8',
input=input_data
)
logger.info(f"命令成功: {full_cmd_str}")
return True, result.stdout.strip(), result.stderr.strip()
except subprocess.CalledProcessError as e:
stderr_output = e.stderr.strip()
logger.error(f"命令失败: {full_cmd_str}")
logger.error(f"退出码: {e.returncode}")
logger.error(f"标准输出: {e.stdout.strip()}")
logger.error(f"标准错误: {stderr_output}")
# Determine if the error dialog should be suppressed by this function
should_suppress_dialog_here = False
if suppress_critical_dialog_on_stderr_match:
if isinstance(suppress_critical_dialog_on_stderr_match, str):
if suppress_critical_dialog_on_stderr_match in stderr_output:
should_suppress_dialog_here = True
elif isinstance(suppress_critical_dialog_on_stderr_match, tuple):
if any(s in stderr_output for s in suppress_critical_dialog_on_stderr_match):
should_suppress_dialog_here = True
if should_suppress_dialog_here:
logger.info(f"错误信息 '{stderr_output}' 匹配抑制条件,不显示关键错误对话框。")
else:
QMessageBox.critical(None, "错误", f"{error_message}\n错误详情: {stderr_output}")
return False, e.stdout.strip(), stderr_output
except FileNotFoundError:
QMessageBox.critical(None, "错误", f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。")
logger.error(f"命令 '{command_list[0]}' 未找到。")
return False, "", f"命令 '{command_list[0]}' 未找到。"
except Exception as e:
QMessageBox.critical(None, "错误", f"执行命令时发生未知错误: {e}")
logger.error(f"执行命令 {full_cmd_str} 时发生未知错误: {e}")
return False, "", str(e)
def _add_to_fstab(self, device_path, mount_point, fstype, uuid):
"""
将设备的挂载信息添加到 /etc/fstab。
使用 UUID 识别设备以提高稳定性。
"""
if not uuid or not mount_point or not fstype:
logger.error(f"无法将设备 {device_path} 添加到 fstab缺少 UUID/挂载点/文件系统类型。")
QMessageBox.warning(None, "警告", f"无法将设备 {device_path} 添加到 fstab缺少 UUID/挂载点/文件系统类型。")
return False
fstab_entry = f"UUID={uuid} {mount_point} {fstype} defaults 0 2"
fstab_path = "/etc/fstab"
try:
# 检查 fstab 中是否已存在相同 UUID 的条目
# 使用 _execute_shell_command 来读取 fstab尽管通常不需要 sudo
# 这里为了简化,直接读取文件,但写入时使用 _execute_shell_command
with open(fstab_path, 'r') as f:
fstab_content = f.readlines()
for line in fstab_content:
if f"UUID={uuid}" in line:
logger.warning(f"设备 {device_path} (UUID={uuid}) 已存在于 fstab 中。跳过添加。")
QMessageBox.information(None, "信息", f"设备 {device_path} (UUID={uuid}) 已存在于 fstab 中。")
return True # 认为成功,因为目标已达成
# 如果不存在,则追加到 fstab
# 使用 _execute_shell_command for sudo write
success, _, stderr = self._execute_shell_command(
["sh", "-c", f"echo '{fstab_entry}' >> {fstab_path}"],
f"{device_path} 添加到 {fstab_path} 失败",
root_privilege=True
)
if success:
logger.info(f"已将 {fstab_entry} 添加到 {fstab_path}")
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功添加到 /etc/fstab。")
return True
else:
return False # Error handled by _execute_shell_command
except Exception as e:
logger.error(f"处理 {fstab_path} 失败: {e}")
QMessageBox.critical(None, "错误", f"处理 {fstab_path} 失败: {e}")
return False
def _remove_fstab_entry(self, device_path):
"""
从 /etc/fstab 中移除指定设备的条目。
此方法需要 SystemInfoManager 来获取设备的 UUID。
"""
# NEW: 使用 system_manager 获取 UUID它会处理路径解析
device_details = self.system_manager.get_device_details_by_path(device_path)
if not device_details or not device_details.get('uuid'):
logger.warning(f"无法获取设备 {device_path} 的 UUID无法从 fstab 中移除。")
return False
uuid = device_details.get('uuid')
fstab_path = "/etc/fstab"
# Use sed for robust removal with sudo
# suppress_critical_dialog_on_stderr_match is added to handle cases where fstab might not exist
# or sed reports no changes, which is not a critical error for removal.
command = ["sed", "-i", f"/UUID={uuid}/d", fstab_path]
success, _, stderr = self._execute_shell_command(
command,
f"{fstab_path} 中删除 UUID={uuid} 的条目失败",
root_privilege=True,
suppress_critical_dialog_on_stderr_match=(
f"sed: {fstab_path}: No such file or directory", # English
f"sed: {fstab_path}: 没有那个文件或目录", # Chinese
"no changes were made" # if sed finds nothing to delete
)
)
if success:
logger.info(f"已从 {fstab_path} 中删除 UUID={uuid} 的条目。")
return True
else:
# If sed failed, it might be because the entry wasn't found, which is fine.
# _execute_shell_command would have suppressed the dialog if it matched the suppress_critical_dialog_on_stderr_match.
# So, if we reach here, it's either a real error (dialog shown by _execute_shell_command)
# or a suppressed "no changes" type of error. In both cases, if no real error, we return True.
if any(s in stderr for s in (
f"sed: {fstab_path}: No such file or directory",
f"sed: {fstab_path}: 没有那个文件或目录",
"no changes were made",
"No such file or directory" # more general check
)):
logger.info(f"fstab 中未找到 UUID={uuid} 的条目,无需删除。")
return True # Consider it a success if the entry is not there
return False # Other errors are already handled by _execute_shell_command
def mount_partition(self, device_path, mount_point, add_to_fstab=False):
"""
挂载指定设备到指定挂载点。
:param device_path: 要挂载的设备路径。
:param mount_point: 挂载点。
:param add_to_fstab: 是否添加到 /etc/fstab。
:return: True 如果成功,否则 False。
"""
if not os.path.exists(mount_point):
try:
os.makedirs(mount_point)
logger.info(f"创建挂载点目录: {mount_point}")
except OSError as e:
QMessageBox.critical(None, "错误", f"创建挂载点目录 {mount_point} 失败: {e}")
logger.error(f"创建挂载点目录 {mount_point} 失败: {e}")
return False
logger.info(f"尝试挂载设备 {device_path}{mount_point}")
success, _, stderr = self._execute_shell_command(
["mount", device_path, mount_point],
f"挂载设备 {device_path} 失败"
)
if success:
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功挂载到 {mount_point}")
if add_to_fstab:
# NEW: 使用 self.system_manager 获取 fstype 和 UUID
device_details = self.system_manager.get_device_details_by_path(device_path)
if device_details:
fstype = device_details.get('fstype')
uuid = device_details.get('uuid')
if fstype and uuid:
self._add_to_fstab(device_path, mount_point, fstype, uuid)
else:
logger.error(f"无法获取设备 {device_path} 的文件系统类型或 UUID 以添加到 fstab。")
QMessageBox.warning(None, "警告", f"设备 {device_path} 已挂载,但无法获取文件系统类型或 UUID 以添加到 fstab。")
else:
logger.error(f"无法获取设备 {device_path} 的详细信息以添加到 fstab。")
QMessageBox.warning(None, "警告", f"设备 {device_path} 已挂载,但无法获取详细信息以添加到 fstab。")
return True
else:
return False
def unmount_partition(self, device_path, show_dialog_on_error=True):
"""
卸载指定设备。
:param device_path: 要卸载的设备路径。
:param show_dialog_on_error: 是否在发生错误时显示对话框。
:return: True 如果成功,否则 False。
"""
logger.info(f"尝试卸载设备 {device_path}")
# 定义表示设备已未挂载的错误信息(中英文)
# 增加了 "未指定挂载点"
already_unmounted_errors = ("not mounted", "未挂载", "未指定挂载点")
# 调用 _execute_shell_command并告诉它在遇到“已未挂载”错误时不要弹出其自身的关键错误对话框。
success, _, stderr = self._execute_shell_command(
["umount", device_path],
f"卸载设备 {device_path} 失败",
suppress_critical_dialog_on_stderr_match=already_unmounted_errors
)
if success:
# 如果命令成功执行,则卸载成功。
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功卸载。")
return True
else:
# 如果命令失败,检查是否是因为设备已经未挂载或挂载点未指定。
is_already_unmounted_error = any(s in stderr for s in already_unmounted_errors)
if is_already_unmounted_error:
logger.info(f"设备 {device_path} 已经处于未挂载状态(或挂载点未指定),无需重复卸载。")
# 这种情况下,操作结果符合预期(设备已是未挂载),不弹出对话框,并返回 True。
return True
else:
# 对于其他类型的卸载失败(例如设备忙、权限不足等),
# _execute_shell_command 应该已经弹出了关键错误对话框
# (因为它没有匹配到 `already_unmounted_errors` 进行抑制)。
# 所以,这里我们不需要再次弹出对话框,直接返回 False。
return False
def get_disk_free_space_info_mib(self, disk_path, total_disk_mib):
"""
获取磁盘上最大的空闲空间块的起始位置 (MiB) 和大小 (MiB)。
:param disk_path: 磁盘路径。
:param total_disk_mib: 磁盘的总大小 (MiB),用于全新磁盘的计算。
:return: (start_mib, size_mib) 元组。如果磁盘是全新的,返回 (0.0, total_disk_mib)。
如果磁盘有分区表但没有空闲空间,返回 (None, None)。
"""
logger.debug(f"尝试获取磁盘 {disk_path} 的空闲空间信息 (MiB)。")
# suppress_critical_dialog_on_stderr_match is added to handle cases where parted might complain about
# an unrecognized disk label, which is expected for a fresh disk.
success, stdout, stderr = self._execute_shell_command(
["parted", "-s", disk_path, "unit", "MiB", "print", "free"],
f"获取磁盘 {disk_path} 分区信息失败",
root_privilege=True,
suppress_critical_dialog_on_stderr_match=("无法辨识的磁盘卷标", "unrecognized disk label")
)
if not success:
# If parted failed and it wasn't due to an unrecognized label (handled by suppress_critical_dialog_on_stderr_match),
# then _execute_shell_command would have shown a dialog.
# We just log and return None.
logger.error(f"获取磁盘 {disk_path} 空闲空间信息失败: {stderr}")
return None, None
logger.debug(f"parted print free 命令原始输出:\n{stdout}")
free_spaces = []
lines = stdout.splitlines()
# Regex to capture StartMiB and SizeMiB from a "Free Space" line
# It's made more flexible to match "Free Space", "空闲空间", and "可用空间"
# Example: " 0.02MiB 8192MiB 8192MiB 可用空间"
# We need to capture the first numeric value (Start) and the third numeric value (Size).
free_space_line_pattern = re.compile(r'^\s*(\d+\.?\d*)MiB\s+(\d+\.?\d*)MiB\s+(\d+\.?\d*)MiB\s+(?:Free Space|空闲空间|可用空间)')
for line in lines:
match = free_space_line_pattern.match(line)
if match:
try:
start_mib = float(match.group(1)) # Capture StartMiB
size_mib = float(match.group(3)) # Capture SizeMiB (the third numeric value)
free_spaces.append({'start_mib': start_mib, 'size_mib': size_mib})
except ValueError as ve:
logger.warning(f"解析 parted free space 行 '{line}' 失败: {ve}")
continue
if not free_spaces:
logger.warning(f"在磁盘 {disk_path} 上未找到空闲空间。")
return None, None
# 找到最大的空闲空间块
largest_free_space = max(free_spaces, key=lambda x: x['size_mib'])
start_mib = largest_free_space['start_mib']
size_mib = largest_free_space['size_mib']
logger.debug(f"磁盘 {disk_path} 的最大空闲空间块起始于 {start_mib:.2f} MiB大小为 {size_mib:.2f} MiB。")
return start_mib, size_mib
def create_partition(self, disk_path, partition_table_type, size_gb, total_disk_mib, use_max_space):
"""
在指定磁盘上创建分区。
:param disk_path: 磁盘路径 (例如 /dev/sdb)。
:param partition_table_type: 分区表类型 ('gpt''msdos')。
:param size_gb: 分区大小 (GB)。
:param total_disk_mib: 磁盘总大小 (MiB)。
:param use_max_space: 是否使用最大可用空间。
:return: True 如果成功,否则 False。
"""
if not isinstance(disk_path, str) or not isinstance(partition_table_type, str):
logger.error(f"尝试创建分区时传入无效参数。磁盘路径: {disk_path}, 分区表类型: {partition_table_type}")
QMessageBox.critical(None, "错误", "无效的磁盘路径或分区表类型。")
return False
# 1. 检查磁盘是否有分区表
has_partition_table = False
# Use suppress_critical_dialog_on_stderr_match for "unrecognized disk label" when checking
success_check, stdout_check, stderr_check = self._execute_shell_command(
["parted", "-s", disk_path, "print"],
f"检查磁盘 {disk_path} 分区表失败",
suppress_critical_dialog_on_stderr_match=("无法辨识的磁盘卷标", "unrecognized disk label"),
root_privilege=True
)
# If success_check is False, it means _execute_shell_command encountered an error.
# If that error was "unrecognized disk label", it was suppressed, and we treat it as no partition table.
# If it was another error, a dialog was shown by _execute_shell_command, and we should stop.
if not success_check:
# Check if the failure was due to "unrecognized disk label" (which means no partition table)
if any(s in stderr_check for s in ("无法辨识的磁盘卷标", "unrecognized disk label")):
logger.info(f"磁盘 {disk_path} 没有可识别的分区表。")
has_partition_table = False # Explicitly set to False
else:
logger.error(f"检查磁盘 {disk_path} 分区表时发生非预期的错误: {stderr_check}")
return False # Other critical error, stop operation.
else: # success_check is True
if "Partition Table: unknown" not in stdout_check and "分区表unknown" not in stdout_check:
has_partition_table = True
else:
logger.info(f"parted print 报告磁盘 {disk_path} 的分区表为 'unknown'")
has_partition_table = False
actual_start_mib_for_parted = 0.0
# 2. 如果没有分区表,则创建分区表
if not has_partition_table:
reply = QMessageBox.question(None, "确认创建分区表",
f"磁盘 {disk_path} 没有分区表。您确定要创建 {partition_table_type} 分区表吗?"
f"此操作将擦除磁盘上的所有数据。",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.No:
logger.info(f"用户取消了在 {disk_path} 上创建分区表的操作。")
return False
logger.info(f"尝试在 {disk_path} 上创建 {partition_table_type} 分区表。")
success, _, stderr = self._execute_shell_command(
["parted", "-s", disk_path, "mklabel", partition_table_type],
f"创建 {partition_table_type} 分区表失败"
)
if not success:
return False
# 对于新创建分区表的磁盘,第一个分区通常从 1MiB 开始以确保兼容性和对齐
actual_start_mib_for_parted = 1.0
else:
# 如果有分区表,获取下一个可用分区的起始位置
start_mib_from_parted, _ = self.get_disk_free_space_info_mib(disk_path, total_disk_mib)
if start_mib_from_parted is None:
QMessageBox.critical(None, "错误", f"无法确定磁盘 {disk_path} 的分区起始位置或没有空闲空间。")
return False
# 如果 parted 报告的起始位置非常接近 0 MiB (例如 0.0 MiB 或 0.02 MiB)
# 为了安全和兼容性,也将其调整为 1.0 MiB。
# 否则,使用 parted 报告的精确起始位置。
if start_mib_from_parted < 1.0: # covers 0.0MiB and values like 0.017MiB (17.4kB)
actual_start_mib_for_parted = 1.0
else:
actual_start_mib_for_parted = start_mib_from_parted
# 3. 确定分区结束位置
if use_max_space:
end_pos = "100%"
size_for_log = "最大可用空间"
else:
# 计算结束 MiB
# 注意:这里计算的 end_mib 是基于用户请求的大小,
# parted 会根据实际可用空间和对齐进行微调。
end_mib = actual_start_mib_for_parted + size_gb * 1024
# 简单检查,如果请求的大小导致结束位置超出磁盘总大小,则使用最大可用
if end_mib > total_disk_mib:
QMessageBox.warning(None, "警告", f"请求的分区大小 ({size_gb}GB) 超出了可用空间。将调整为最大可用空间。")
end_pos = "100%"
size_for_log = "最大可用空间"
else:
end_pos = f"{end_mib}MiB"
size_for_log = f"{size_gb}GB"
# 4. 创建分区
create_cmd = ["parted", "-s", disk_path, "mkpart", "primary", f"{actual_start_mib_for_parted}MiB", end_pos]
logger.info(f"尝试在 {disk_path} 上创建 {size_for_log} 的主分区。命令: {' '.join(create_cmd)}")
success, _, stderr = self._execute_shell_command(
create_cmd,
f"{disk_path} 上创建分区失败"
)
if success:
QMessageBox.information(None, "成功", f"{disk_path} 上成功创建了 {size_for_log} 的分区。")
return True
else:
return False
def delete_partition(self, device_path):
"""
删除指定分区。
:param device_path: 要删除的分区路径。
:return: True 如果成功,否则 False。
"""
reply = QMessageBox.question(None, "确认删除分区",
f"您确定要删除分区 {device_path} 吗?此操作将擦除分区上的所有数据!",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.No:
logger.info(f"用户取消了删除分区 {device_path} 的操作。")
return False
# 尝试卸载分区
# The unmount_partition method now handles "not mounted" gracefully without dialog and returns True.
# So, we just call it.
self.unmount_partition(device_path, show_dialog_on_error=False) # show_dialog_on_error=False means no dialog for other errors too.
# 从 fstab 中移除条目
# _remove_fstab_entry also handles "not found" gracefully.
self._remove_fstab_entry(device_path)
# 获取父磁盘和分区号
# 例如 /dev/sdb1 -> /dev/sdb, 1
match = re.match(r'(/dev/[a-z]+)(\d+)', device_path)
if not match:
QMessageBox.critical(None, "错误", f"无法解析设备路径 {device_path}")
logger.error(f"无法解析设备路径 {device_path}")
return False
disk_path = match.group(1)
partition_number = match.group(2)
logger.info(f"尝试删除分区 {device_path} (磁盘: {disk_path}, 分区号: {partition_number})。")
success, _, stderr = self._execute_shell_command(
["parted", "-s", disk_path, "rm", partition_number],
f"删除分区 {device_path} 失败"
)
if success:
QMessageBox.information(None, "成功", f"分区 {device_path} 已成功删除。")
return True
else:
return False
def format_partition(self, device_path, fstype=None):
"""
格式化指定分区。
:param device_path: 要格式化的分区路径。
:param fstype: 文件系统类型 (例如 'ext4', 'xfs', 'fat32', 'ntfs')。如果为 None则弹出对话框让用户选择。
:return: True 如果成功,否则 False。
"""
if fstype is None:
items = ("ext4", "xfs", "fat32", "ntfs")
fstype, ok = QInputDialog.getItem(None, "选择文件系统", "请选择要使用的文件系统类型:", items, 0, False)
if not ok or not fstype:
logger.info("用户取消了文件系统选择。")
return False
reply = QMessageBox.question(None, "确认格式化分区",
f"您确定要将分区 {device_path} 格式化为 {fstype} 吗?此操作将擦除分区上的所有数据!",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.No:
logger.info(f"用户取消了格式化分区 {device_path} 的操作。")
return False
# 尝试卸载分区
self.unmount_partition(device_path, show_dialog_on_error=False) # 静默卸载
# 从 fstab 中移除条目 (因为格式化会改变 UUID)
self._remove_fstab_entry(device_path)
logger.info(f"尝试将分区 {device_path} 格式化为 {fstype}")
format_cmd = []
if fstype == "ext4":
format_cmd = ["mkfs.ext4", "-F", device_path] # -F 强制执行
elif fstype == "xfs":
format_cmd = ["mkfs.xfs", "-f", device_path] # -f 强制执行
elif fstype == "fat32":
format_cmd = ["mkfs.fat", "-F", "32", device_path]
elif fstype == "ntfs":
format_cmd = ["mkfs.ntfs", "-f", device_path]
else:
QMessageBox.critical(None, "错误", f"不支持的文件系统类型: {fstype}")
logger.error(f"不支持的文件系统类型: {fstype}")
return False
success, _, stderr = self._execute_shell_command(
format_cmd,
f"格式化分区 {device_path} 失败"
)
if success:
QMessageBox.information(None, "成功", f"分区 {device_path} 已成功格式化为 {fstype}")
return True
else:
return False