Files
diskmanager/disk_operations.py
2026-02-02 19:48:21 +08:00

476 lines
23 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import subprocess
import logging
import re
import os
from PySide6.QtWidgets import QMessageBox, QInputDialog
logger = logging.getLogger(__name__)
class DiskOperations:
def __init__(self):
pass
def _execute_shell_command(self, command_list, error_message, root_privilege=True,
suppress_critical_dialog_on_stderr_match=None, input_data=None):
"""
通用地运行一个 shell 命令,并处理错误。
:param command_list: 命令及其参数的列表。
:param error_message: 命令失败时显示给用户的错误消息。
:param root_privilege: 如果为 True则使用 sudo 执行命令。
:param suppress_critical_dialog_on_stderr_match: 如果 stderr 包含此字符串,则不显示关键错误对话框。
:param input_data: 传递给命令stdin的数据 (str)。
:return: (True/False, stdout_str, stderr_str)
"""
if not all(isinstance(arg, str) for arg in command_list):
logger.error(f"命令列表包含非字符串元素: {command_list}")
QMessageBox.critical(None, "错误", f"内部错误:尝试执行的命令包含无效参数。\n命令详情: {command_list}")
return False, "", "内部错误:命令参数类型不正确。"
if root_privilege:
command_list = ["sudo"] + command_list
full_cmd_str = ' '.join(command_list)
logger.debug(f"执行命令: {full_cmd_str}")
try:
result = subprocess.run(
command_list,
capture_output=True,
text=True,
check=True,
encoding='utf-8',
input=input_data
)
logger.info(f"命令成功: {full_cmd_str}")
return True, result.stdout.strip(), result.stderr.strip()
except subprocess.CalledProcessError as e:
stderr_output = e.stderr.strip()
logger.error(f"命令失败: {full_cmd_str}")
logger.error(f"退出码: {e.returncode}")
logger.error(f"标准输出: {e.stdout.strip()}")
logger.error(f"标准错误: {stderr_output}")
if suppress_critical_dialog_on_stderr_match and \
(suppress_critical_dialog_on_stderr_match in stderr_output or \
(isinstance(suppress_critical_dialog_on_stderr_match, tuple) and \
any(s in stderr_output for s in suppress_critical_dialog_on_stderr_match))):
logger.info(f"错误信息 '{stderr_output}' 匹配抑制条件,不显示关键错误对话框。")
else:
QMessageBox.critical(None, "错误", f"{error_message}\n错误详情: {stderr_output}")
return False, e.stdout.strip(), stderr_output
except FileNotFoundError:
QMessageBox.critical(None, "错误", f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。")
logger.error(f"命令 '{command_list[0]}' 未找到。")
return False, "", f"命令 '{command_list[0]}' 未找到。"
except Exception as e:
QMessageBox.critical(None, "错误", f"执行命令时发生未知错误: {e}")
logger.error(f"执行命令 {full_cmd_str} 时发生未知错误: {e}")
return False, "", str(e)
def _add_to_fstab(self, device_path, mount_point, fstype, uuid):
"""
将设备的挂载信息添加到 /etc/fstab。
使用 UUID 识别设备以提高稳定性。
"""
if not uuid or not mount_point or not fstype:
logger.error(f"无法将设备 {device_path} 添加到 fstab缺少 UUID/挂载点/文件系统类型。")
QMessageBox.warning(None, "警告", f"无法将设备 {device_path} 添加到 fstab缺少 UUID/挂载点/文件系统类型。")
return False
fstab_entry = f"UUID={uuid} {mount_point} {fstype} defaults 0 2"
fstab_path = "/etc/fstab"
try:
# 检查 fstab 中是否已存在相同 UUID 或挂载点的条目
with open(fstab_path, 'r') as f:
fstab_content = f.readlines()
for line in fstab_content:
if f"UUID={uuid}" in line:
logger.warning(f"设备 {device_path} (UUID={uuid}) 已存在于 fstab 中。跳过添加。")
QMessageBox.information(None, "信息", f"设备 {device_path} (UUID={uuid}) 已存在于 fstab 中。")
return True # 认为成功,因为目标已达成
# 检查挂载点是否已存在 (可能被其他设备使用)
if mount_point in line.split():
logger.warning(f"挂载点 {mount_point} 已存在于 fstab 中。跳过添加。")
QMessageBox.information(None, "信息", f"挂载点 {mount_point} 已存在于 fstab 中。")
return True # 认为成功,因为目标已达成
# 如果不存在,则追加到 fstab
with open(fstab_path, 'a') as f:
f.write(fstab_entry + '\n')
logger.info(f"已将 {fstab_entry} 添加到 {fstab_path}")
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功添加到 /etc/fstab。")
return True
except Exception as e:
logger.error(f"写入 {fstab_path} 失败: {e}")
QMessageBox.critical(None, "错误", f"写入 {fstab_path} 失败: {e}")
return False
def _remove_fstab_entry(self, device_path):
"""
从 /etc/fstab 中移除指定设备的条目。
此方法需要 SystemInfoManager 来获取设备的 UUID。
"""
success, stdout, stderr = self._execute_shell_command(
["lsblk", "-no", "UUID", device_path],
f"获取设备 {device_path} 的 UUID 失败",
root_privilege=False, # lsblk 通常不需要 sudo
suppress_critical_dialog_on_stderr_match=("找不到或无法访问", "No such device or address") # 匹配中英文错误
)
if not success:
logger.warning(f"无法获取设备 {device_path} 的 UUID无法从 fstab 中移除。错误: {stderr}")
return False
uuid = stdout.strip()
if not uuid:
logger.warning(f"设备 {device_path} 没有 UUID无法从 fstab 中移除。")
return False
fstab_path = "/etc/fstab"
try:
with open(fstab_path, 'r') as f:
lines = f.readlines()
new_lines = []
removed = False
for line in lines:
if f"UUID={uuid}" in line:
logger.info(f"{fstab_path} 中移除了条目: {line.strip()}")
removed = True
else:
new_lines.append(line)
if removed:
# 写入临时文件,然后替换原文件
with open(fstab_path + ".tmp", 'w') as f_tmp:
f_tmp.writelines(new_lines)
os.rename(fstab_path + ".tmp", fstab_path)
logger.info(f"已从 {fstab_path} 中移除 UUID={uuid} 的条目。")
return True
else:
logger.info(f"fstab 中未找到 UUID={uuid} 的条目。")
return False
except Exception as e:
logger.error(f"修改 {fstab_path} 失败: {e}")
QMessageBox.critical(None, "错误", f"修改 {fstab_path} 失败: {e}")
return False
def mount_partition(self, device_path, mount_point, add_to_fstab=False):
"""
挂载指定设备到指定挂载点。
:param device_path: 要挂载的设备路径。
:param mount_point: 挂载点。
:param add_to_fstab: 是否添加到 /etc/fstab。
:return: True 如果成功,否则 False。
"""
if not os.path.exists(mount_point):
try:
os.makedirs(mount_point)
logger.info(f"创建挂载点目录: {mount_point}")
except OSError as e:
QMessageBox.critical(None, "错误", f"创建挂载点目录 {mount_point} 失败: {e}")
logger.error(f"创建挂载点目录 {mount_point} 失败: {e}")
return False
logger.info(f"尝试挂载设备 {device_path}{mount_point}")
success, _, stderr = self._execute_shell_command(
["mount", device_path, mount_point],
f"挂载设备 {device_path} 失败"
)
if success:
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功挂载到 {mount_point}")
if add_to_fstab:
# 为了获取 fstype 和 UUID需要再次调用 lsblk
success_details, stdout_details, stderr_details = self._execute_shell_command(
["lsblk", "-no", "FSTYPE,UUID", device_path],
f"获取设备 {device_path} 的文件系统类型和 UUID 失败",
root_privilege=False,
suppress_critical_dialog_on_stderr_match=("找不到或无法访问", "No such device or address")
)
if success_details:
fstype, uuid = stdout_details.strip().split()
self._add_to_fstab(device_path, mount_point, fstype, uuid)
else:
logger.error(f"无法获取设备 {device_path} 的详细信息以添加到 fstab: {stderr_details}")
QMessageBox.warning(None, "警告", f"设备 {device_path} 已挂载,但无法获取详细信息以添加到 fstab。")
return True
else:
return False
def unmount_partition(self, device_path, show_dialog_on_error=True):
"""
卸载指定设备。
:param device_path: 要卸载的设备路径。
:param show_dialog_on_error: 是否在发生错误时显示对话框。
:return: True 如果成功,否则 False。
"""
logger.info(f"尝试卸载设备 {device_path}")
try:
success, _, stderr = self._execute_shell_command(
["umount", device_path],
f"卸载设备 {device_path} 失败",
suppress_critical_dialog_on_stderr_match="not mounted" if not show_dialog_on_error else None
)
if success:
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功卸载。")
return True
else:
if show_dialog_on_error:
QMessageBox.critical(None, "错误", f"卸载设备 {device_path} 失败。\n错误详情: {stderr}")
return False
except Exception as e:
logger.error(f"卸载设备 {device_path} 时发生异常: {e}")
if show_dialog_on_error:
QMessageBox.critical(None, "错误", f"卸载设备 {device_path} 时发生异常: {e}")
return False
def get_disk_free_space_info_mib(self, disk_path, total_disk_mib):
"""
获取磁盘上最大的空闲空间块的起始位置 (MiB) 和大小 (MiB)。
:param disk_path: 磁盘路径。
:param total_disk_mib: 磁盘的总大小 (MiB),用于全新磁盘的计算。
:return: (start_mib, size_mib) 元组。如果磁盘是全新的,返回 (0.0, total_disk_mib)。
如果磁盘有分区表但没有空闲空间,返回 (None, None)。
"""
logger.debug(f"尝试获取磁盘 {disk_path} 的空闲空间信息 (MiB)。")
success, stdout, stderr = self._execute_shell_command(
["parted", "-s", disk_path, "unit", "MiB", "print", "free"],
f"获取磁盘 {disk_path} 分区信息失败",
root_privilege=True
)
if not success:
logger.error(f"获取磁盘 {disk_path} 空闲空间信息失败: {stderr}")
return None, None
logger.debug(f"parted print free 命令原始输出:\n{stdout}")
free_spaces = []
lines = stdout.splitlines()
# Regex to capture StartMiB and SizeMiB from a "Free Space" line
# It's made more flexible to match "Free Space", "空闲空间", and "可用空间"
# Example: " 0.02MiB 8192MiB 8192MiB 可用空间"
# We need to capture the first numeric value (Start) and the third numeric value (Size).
free_space_line_pattern = re.compile(r'^\s*(\d+\.?\d*)MiB\s+(\d+\.?\d*)MiB\s+(\d+\.?\d*)MiB\s+(?:Free Space|空闲空间|可用空间)')
for line in lines:
match = free_space_line_pattern.match(line)
if match:
try:
start_mib = float(match.group(1)) # Capture StartMiB
size_mib = float(match.group(3)) # Capture SizeMiB (the third numeric value)
free_spaces.append({'start_mib': start_mib, 'size_mib': size_mib})
except ValueError as ve:
logger.warning(f"解析 parted free space 行 '{line}' 失败: {ve}")
continue
if not free_spaces:
logger.warning(f"在磁盘 {disk_path} 上未找到空闲空间。")
return None, None
# 找到最大的空闲空间块
largest_free_space = max(free_spaces, key=lambda x: x['size_mib'])
start_mib = largest_free_space['start_mib']
size_mib = largest_free_space['size_mib']
logger.debug(f"磁盘 {disk_path} 的最大空闲空间块起始于 {start_mib:.2f} MiB大小为 {size_mib:.2f} MiB。")
return start_mib, size_mib
def create_partition(self, disk_path, partition_table_type, size_gb, total_disk_mib, use_max_space):
"""
在指定磁盘上创建分区。
:param disk_path: 磁盘路径 (例如 /dev/sdb)。
:param partition_table_type: 分区表类型 ('gpt''msdos')。
:param size_gb: 分区大小 (GB)。
:param total_disk_mib: 磁盘总大小 (MiB)。
:param use_max_space: 是否使用最大可用空间。
:return: True 如果成功,否则 False。
"""
if not isinstance(disk_path, str) or not isinstance(partition_table_type, str):
logger.error(f"尝试创建分区时传入无效参数。磁盘路径: {disk_path}, 分区表类型: {partition_table_type}")
QMessageBox.critical(None, "错误", "无效的磁盘路径或分区表类型。")
return False
# 1. 检查磁盘是否有分区表
has_partition_table = False
try:
success_check, stdout_check, stderr_check = self._execute_shell_command(
["parted", "-s", disk_path, "print"],
f"检查磁盘 {disk_path} 分区表失败",
suppress_critical_dialog_on_stderr_match=("无法辨识的磁盘卷标", "unrecognized disk label"),
root_privilege=True
)
if success_check:
if "Partition Table: unknown" not in stdout_check and "分区表unknown" not in stdout_check:
has_partition_table = True
else:
logger.info(f"parted print 报告磁盘 {disk_path} 的分区表为 'unknown'")
else:
logger.info(f"parted print 命令失败,但可能不是因为没有分区表,错误信息: {stderr_check}")
except Exception as e:
logger.error(f"检查磁盘 {disk_path} 分区表时发生异常: {e}")
pass
actual_start_mib_for_parted = 0.0
# 2. 如果没有分区表,则创建分区表
if not has_partition_table:
reply = QMessageBox.question(None, "确认创建分区表",
f"磁盘 {disk_path} 没有分区表。您确定要创建 {partition_table_type} 分区表吗?"
f"此操作将擦除磁盘上的所有数据。",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.No:
logger.info(f"用户取消了在 {disk_path} 上创建分区表的操作。")
return False
logger.info(f"尝试在 {disk_path} 上创建 {partition_table_type} 分区表。")
success, _, stderr = self._execute_shell_command(
["parted", "-s", disk_path, "mklabel", partition_table_type],
f"创建 {partition_table_type} 分区表失败"
)
if not success:
return False
# 对于新创建分区表的磁盘,第一个分区通常从 1MiB 开始以确保兼容性和对齐
actual_start_mib_for_parted = 1.0
else:
# 如果有分区表,获取下一个可用分区的起始位置
start_mib_from_parted, _ = self.get_disk_free_space_info_mib(disk_path, total_disk_mib)
if start_mib_from_parted is None:
QMessageBox.critical(None, "错误", f"无法确定磁盘 {disk_path} 的分区起始位置或没有空闲空间。")
return False
# 如果 parted 报告的起始位置非常接近 0 MiB (例如 0.0 MiB 或 0.02 MiB)
# 为了安全和兼容性,也将其调整为 1.0 MiB。
# 否则,使用 parted 报告的精确起始位置。
if start_mib_from_parted < 1.0: # covers 0.0MiB and values like 0.017MiB (17.4kB)
actual_start_mib_for_parted = 1.0
else:
actual_start_mib_for_parted = start_mib_from_parted
# 3. 确定分区结束位置
if use_max_space:
end_pos = "100%"
size_for_log = "最大可用空间"
else:
# 计算结束 MiB
# 注意:这里计算的 end_mib 是基于用户请求的大小,
# parted 会根据实际可用空间和对齐进行微调。
end_mib = actual_start_mib_for_parted + size_gb * 1024
# 简单检查,如果请求的大小导致结束位置超出磁盘总大小,则使用最大可用
if end_mib > total_disk_mib:
QMessageBox.warning(None, "警告", f"请求的分区大小 ({size_gb}GB) 超出了可用空间。将调整为最大可用空间。")
end_pos = "100%"
size_for_log = "最大可用空间"
else:
end_pos = f"{end_mib}MiB"
size_for_log = f"{size_gb}GB"
# 4. 创建分区
create_cmd = ["parted", "-s", disk_path, "mkpart", "primary", f"{actual_start_mib_for_parted}MiB", end_pos]
logger.info(f"尝试在 {disk_path} 上创建 {size_for_log} 的主分区。命令: {' '.join(create_cmd)}")
success, _, stderr = self._execute_shell_command(
create_cmd,
f"{disk_path} 上创建分区失败"
)
if not success:
return False
QMessageBox.information(None, "成功", f"{disk_path} 上成功创建了 {size_for_log} 的分区。")
return True
def delete_partition(self, device_path):
"""
删除指定分区。
:param device_path: 要删除的分区路径。
:return: True 如果成功,否则 False。
"""
reply = QMessageBox.question(None, "确认删除分区",
f"您确定要删除分区 {device_path} 吗?此操作将擦除分区上的所有数据!",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.No:
logger.info(f"用户取消了删除分区 {device_path} 的操作。")
return False
# 尝试卸载分区
self.unmount_partition(device_path, show_dialog_on_error=False) # 静默卸载
# 从 fstab 中移除条目
self._remove_fstab_entry(device_path)
# 获取父磁盘和分区号
# 例如 /dev/sdb1 -> /dev/sdb, 1
match = re.match(r'(/dev/[a-z]+)(\d+)', device_path)
if not match:
QMessageBox.critical(None, "错误", f"无法解析设备路径 {device_path}")
logger.error(f"无法解析设备路径 {device_path}")
return False
disk_path = match.group(1)
partition_number = match.group(2)
logger.info(f"尝试删除分区 {device_path} (磁盘: {disk_path}, 分区号: {partition_number})。")
success, _, stderr = self._execute_shell_command(
["parted", "-s", disk_path, "rm", partition_number],
f"删除分区 {device_path} 失败"
)
if success:
QMessageBox.information(None, "成功", f"分区 {device_path} 已成功删除。")
return True
else:
return False
def format_partition(self, device_path, fstype=None):
"""
格式化指定分区。
:param device_path: 要格式化的分区路径。
:param fstype: 文件系统类型 (例如 'ext4', 'xfs', 'fat32', 'ntfs')。如果为 None则弹出对话框让用户选择。
:return: True 如果成功,否则 False。
"""
if fstype is None:
items = ("ext4", "xfs", "fat32", "ntfs")
fstype, ok = QInputDialog.getItem(None, "选择文件系统", "请选择要使用的文件系统类型:", items, 0, False)
if not ok or not fstype:
logger.info("用户取消了文件系统选择。")
return False
reply = QMessageBox.question(None, "确认格式化分区",
f"您确定要将分区 {device_path} 格式化为 {fstype} 吗?此操作将擦除分区上的所有数据!",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.No:
logger.info(f"用户取消了格式化分区 {device_path} 的操作。")
return False
# 尝试卸载分区
self.unmount_partition(device_path, show_dialog_on_error=False) # 静默卸载
# 从 fstab 中移除条目 (因为格式化会改变 UUID)
self._remove_fstab_entry(device_path)
logger.info(f"尝试将分区 {device_path} 格式化为 {fstype}")
format_cmd = []
if fstype == "ext4":
format_cmd = ["mkfs.ext4", "-F", device_path] # -F 强制执行
elif fstype == "xfs":
format_cmd = ["mkfs.xfs", "-f", device_path] # -f 强制执行
elif fstype == "fat32":
format_cmd = ["mkfs.fat", "-F", "32", device_path]
elif fstype == "ntfs":
format_cmd = ["mkfs.ntfs", "-f", device_path]
else:
QMessageBox.critical(None, "错误", f"不支持的文件系统类型: {fstype}")
logger.error(f"不支持的文件系统类型: {fstype}")
return False
success, _, stderr = self._execute_shell_command(
format_cmd,
f"格式化分区 {device_path} 失败"
)
if success:
QMessageBox.information(None, "成功", f"分区 {device_path} 已成功格式化为 {fstype}")
return True
else:
return False