Files
diskmanager/disk_operations.py
2026-02-02 18:38:41 +08:00

537 lines
27 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# disk_operations.py
import subprocess
import logging
import os
import re
from PySide6.QtWidgets import QMessageBox, QInputDialog
# 导入我们自己编写的系统信息管理模块
from system_info import SystemInfoManager
logger = logging.getLogger(__name__)
class DiskOperations:
def __init__(self):
self.system_manager = SystemInfoManager() # 实例化 SystemInfoManager
def _execute_shell_command(self, command_list, error_message, root_privilege=True,
suppress_critical_dialog_on_stderr_match=None, input_data=None):
"""
通用地运行一个 shell 命令,并处理错误。
:param command_list: 命令及其参数的列表。
:param error_message: 命令失败时显示给用户的错误消息。
:param root_privilege: 如果为 True则使用 sudo 执行命令。
:param suppress_critical_dialog_on_stderr_match: 如果 stderr 包含此字符串,则不显示关键错误对话框。
:param input_data: 传递给命令stdin的数据 (str)。
:return: (True/False, stdout_str, stderr_str)
"""
# 确保 command_list 中的所有项都是字符串
if not all(isinstance(arg, str) for arg in command_list):
logger.error(f"命令列表包含非字符串元素: {command_list}")
QMessageBox.critical(None, "错误", f"内部错误:尝试执行的命令包含无效参数。\n命令详情: {command_list}")
return False, "", "内部错误:命令参数类型不正确。"
if root_privilege:
command_list = ["sudo"] + command_list
full_cmd_str = ' '.join(command_list)
logger.debug(f"执行命令: {full_cmd_str}")
try:
result = subprocess.run(
command_list,
capture_output=True,
text=True,
check=True,
encoding='utf-8',
input=input_data
)
logger.info(f"命令成功: {full_cmd_str}")
return True, result.stdout.strip(), result.stderr.strip()
except subprocess.CalledProcessError as e:
stderr_output = e.stderr.strip()
logger.error(f"命令失败: {full_cmd_str}")
logger.error(f"退出码: {e.returncode}")
logger.error(f"标准输出: {e.stdout.strip()}")
logger.error(f"标准错误: {stderr_output}")
if suppress_critical_dialog_on_stderr_match and \
suppress_critical_dialog_on_stderr_match in stderr_output:
logger.info(f"错误信息 '{stderr_output}' 匹配抑制条件,不显示关键错误对话框。")
else:
QMessageBox.critical(None, "错误", f"{error_message}\n错误详情: {stderr_output}")
return False, e.stdout.strip(), stderr_output
except FileNotFoundError:
QMessageBox.critical(None, "错误", f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。")
logger.error(f"命令 '{command_list[0]}' 未找到。")
return False, "", f"命令 '{command_list[0]}' 未找到。"
except Exception as e:
QMessageBox.critical(None, "错误", f"执行命令时发生未知错误: {e}")
logger.error(f"执行命令 {full_cmd_str} 时发生未知错误: {e}")
return False, "", str(e)
def _get_fstab_path(self):
return "/etc/fstab"
def create_partition(self, disk_path, partition_table_type, size_gb, total_disk_mib, use_max_space):
"""
在指定磁盘上创建分区。
:param disk_path: 磁盘设备路径,例如 /dev/sdb。
:param partition_table_type: 分区表类型,'gpt''msdos'
:param size_gb: 分区大小GB
:param total_disk_mib: 磁盘总大小 (MiB)。
:param use_max_space: 是否使用最大可用空间创建分区。
:return: 新创建的分区路径,如果失败则为 None。
"""
if not isinstance(disk_path, str) or not isinstance(partition_table_type, str):
logger.error(f"尝试创建分区时传入无效的磁盘路径或分区表类型。磁盘: {disk_path} (类型: {type(disk_path)}), 类型: {partition_table_type} (类型: {type(partition_table_type)})")
QMessageBox.critical(None, "错误", "无效的磁盘路径或分区表类型。")
return None
logger.info(f"尝试在 {disk_path} 上创建 {size_gb}GB 的分区,分区表类型为 {partition_table_type}")
# 1. 检查磁盘是否已经有分区表,如果没有则创建
# parted -s /dev/sdb print
try:
stdout, _ = self.system_manager._run_command(["parted", "-s", disk_path, "print"], root_privilege=True)
if "unrecognised disk label" in stdout:
logger.info(f"磁盘 {disk_path} 没有分区表,将创建 {partition_table_type} 分区表。")
success, _, _ = self._execute_shell_command(
["parted", "-s", disk_path, "mklabel", partition_table_type],
f"创建分区表 {partition_table_type} 失败"
)
if not success:
return None
else:
logger.info(f"磁盘 {disk_path} 已有分区表。")
except Exception as e:
logger.error(f"检查或创建分区表失败: {e}")
QMessageBox.critical(None, "错误", f"检查或创建分区表失败: {e}")
return None
# 2. 获取下一个分区的起始扇区
start_sector = self.get_disk_next_partition_start_sector(disk_path)
if start_sector is None:
QMessageBox.critical(None, "错误", f"无法确定磁盘 {disk_path} 的分区起始位置。")
return None
# 3. 构建 parted 命令
parted_cmd = ["parted", "-s", disk_path, "mkpart", "primary"]
# 对于 GPT默认文件系统是 ext2。对于 msdos默认是 ext2。
# parted mkpart primary [fstype] start end
# 我们可以省略 fstype让用户稍后格式化。
# 计算结束位置
start_mib = start_sector * 512 / (1024 * 1024) # 转换为 MiB
if use_max_space:
end_mib = total_disk_mib # 使用磁盘总大小作为结束位置
parted_cmd.extend([f"{start_mib}MiB", f"{end_mib}MiB"])
else:
end_mib = start_mib + size_gb * 1024 # 将 GB 转换为 MiB
parted_cmd.extend([f"{start_mib}MiB", f"{end_mib}MiB"])
# 执行创建分区命令
success, stdout, stderr = self._execute_shell_command(
parted_cmd,
f"{disk_path} 上创建分区失败"
)
if not success:
return None
# 4. 刷新内核分区表
self._execute_shell_command(["partprobe", disk_path], f"刷新 {disk_path} 分区表失败", root_privilege=True)
# 5. 获取新创建的分区路径
# 通常新分区是磁盘的最后一个分区,例如 /dev/sdb1, /dev/sdb2 等
# 我们可以通过 lsblk 再次获取信息,找到最新的分区
try:
# 重新获取设备信息,找到最新的分区
devices = self.system_manager.get_block_devices()
new_partition_path = None
for dev in devices:
if dev.get('path') == disk_path and dev.get('children'):
# 找到最新的子分区
latest_partition = None
for child in dev['children']:
if child.get('type') == 'part':
if latest_partition is None or int(child.get('maj:min').split(':')[1]) > int(latest_partition.get('maj:min').split(':')[1]):
latest_partition = child
if latest_partition:
new_partition_path = latest_partition.get('path')
break
if new_partition_path:
logger.info(f"成功在 {disk_path} 上创建分区: {new_partition_path}")
return new_partition_path
else:
logger.error(f"{disk_path} 上创建分区成功,但未能确定新分区的设备路径。")
QMessageBox.warning(None, "警告", f"分区创建成功,但未能确定新分区的设备路径。请手动检查。")
return None
except Exception as e:
logger.error(f"获取新创建分区路径失败: {e}")
QMessageBox.critical(None, "错误", f"获取新创建分区路径失败: {e}")
return None
def get_disk_next_partition_start_sector(self, disk_path):
"""
获取磁盘上下一个可用的分区起始扇区。
:param disk_path: 磁盘设备路径,例如 /dev/sdb。
:return: 下一个分区起始扇区(整数),如果失败则为 None。
"""
if not isinstance(disk_path, str):
logger.error(f"传入 get_disk_next_partition_start_sector 的 disk_path 不是字符串: {disk_path} (类型: {type(disk_path)})")
return None
try:
stdout, _ = self.system_manager._run_command(["parted", "-s", disk_path, "unit", "s", "print", "free"], root_privilege=True)
# 查找最大的空闲空间
free_space_pattern = re.compile(r'^\s*\d+\s+([\d.]+s)\s+([\d.]+s)\s+([\d.]+s)\s+Free Space', re.MULTILINE)
matches = free_space_pattern.findall(stdout)
if matches:
# 找到所有空闲空间的起始扇区,并取最小的作为下一个分区的起始
# 或者,更准确地说,找到最后一个分区的结束扇区 + 1
# 简化处理:如果没有分区,从 2048s 开始 (常见对齐)
# 如果有分区,从最后一个分区的结束扇区 + 1 开始
# 先尝试获取所有分区信息
stdout_partitions, _ = self.system_manager._run_command(["parted", "-s", disk_path, "unit", "s", "print"], root_privilege=True)
partition_end_pattern = re.compile(r'^\s*\d+\s+[\d.]+s\s+([\d.]+s)', re.MULTILINE)
partition_end_sectors = [int(float(s.replace('s', ''))) for s in partition_end_pattern.findall(stdout_partitions)]
if partition_end_sectors:
# 最后一个分区的结束扇区 + 1
last_end_sector = max(partition_end_sectors)
# 确保对齐,通常是 1MiB (2048扇区) 或 4MiB (8192扇区)
# 这里我们简单地取下一个 1MiB 对齐的扇区
start_sector = (last_end_sector // 2048 + 1) * 2048
logger.debug(f"磁盘 {disk_path} 最后一个分区结束于 {last_end_sector}s下一个分区起始扇区建议为 {start_sector}s。")
return start_sector
else:
# 没有分区,从 2048s 开始
logger.debug(f"磁盘 {disk_path} 没有现有分区,建议起始扇区为 2048s。")
return 2048 # 2048扇区是 1MiB常见的起始对齐位置
else:
logger.warning(f"无法在磁盘 {disk_path} 上找到空闲空间信息。")
return None
except Exception as e:
logger.error(f"获取磁盘 {disk_path} 下一个分区起始扇区失败: {e}")
return None
def get_disk_next_partition_start_mib(self, disk_path):
"""
获取磁盘上下一个可用的分区起始位置 (MiB)。
:param disk_path: 磁盘设备路径,例如 /dev/sdb。
:return: 下一个分区起始位置 (MiB),如果失败则为 None。
"""
start_sector = self.get_disk_next_partition_start_sector(disk_path)
if start_sector is not None:
return start_sector * 512 / (1024 * 1024) # 转换为 MiB
return None
def delete_partition(self, device_path):
"""
删除指定的分区。
:param device_path: 要删除的分区路径,例如 /dev/sdb1。
:return: True 如果删除成功,否则 False。
"""
if not isinstance(device_path, str):
logger.error(f"尝试删除非字符串设备路径: {device_path} (类型: {type(device_path)})")
QMessageBox.critical(None, "错误", "无效的设备路径。")
return False
reply = QMessageBox.question(None, "确认删除分区",
f"您确定要删除分区 {device_path} 吗?此操作不可逆!",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.No:
logger.info(f"用户取消了删除分区 {device_path} 的操作。")
return False
logger.info(f"尝试删除分区: {device_path}")
# 1. 尝试卸载分区 (静默处理,如果未挂载则不报错)
self.unmount_partition(device_path, show_dialog_on_error=False)
# 2. 从 fstab 中移除条目
self._remove_fstab_entry(device_path)
# 3. 获取父磁盘和分区号
match = re.match(r'(/dev/\w+)(\d+)', device_path)
if not match:
QMessageBox.critical(None, "错误", f"无法解析分区路径 {device_path}")
logger.error(f"无法解析分区路径 {device_path}")
return False
disk_path = match.group(1)
partition_number = match.group(2)
# 4. 执行 parted 命令删除分区
success, _, stderr = self._execute_shell_command(
["parted", "-s", disk_path, "rm", partition_number],
f"删除分区 {device_path} 失败"
)
if success:
QMessageBox.information(None, "成功", f"分区 {device_path} 已成功删除。")
# 刷新内核分区表
self._execute_shell_command(["partprobe", disk_path], f"刷新 {disk_path} 分区表失败", root_privilege=True)
return True
else:
return False
def format_partition(self, device_path, fstype=None):
"""
格式化指定分区。
:param device_path: 要格式化的设备路径,例如 /dev/sdb1。
:param fstype: 文件系统类型,例如 'ext4', 'xfs'。如果为 None则弹出对话框让用户选择。
:return: True 如果格式化成功,否则 False。
"""
if not isinstance(device_path, str):
logger.error(f"尝试格式化非字符串设备路径: {device_path} (类型: {type(device_path)})")
QMessageBox.critical(None, "错误", "无效的设备路径。")
return False
# 1. 尝试卸载分区 (静默处理,如果未挂载则不报错)
self.unmount_partition(device_path, show_dialog_on_error=False)
# 2. 从 fstab 中移除旧条目 (因为格式化后 UUID 会变)
self._remove_fstab_entry(device_path)
if fstype is None:
# 弹出对话框让用户选择文件系统类型
items = ["ext4", "xfs", "fat32", "ntfs"] # 常用文件系统
selected_fstype, ok = QInputDialog.getItem(None, "选择文件系统",
f"请为 {device_path} 选择文件系统类型:",
items, 0, False)
if not ok or not selected_fstype:
logger.info(f"用户取消了格式化 {device_path} 的操作。")
return False
fstype = selected_fstype
reply = QMessageBox.question(None, "确认格式化分区",
f"您确定要格式化分区 {device_path}{fstype} 吗?此操作将擦除所有数据!",
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
if reply == QMessageBox.No:
logger.info(f"用户取消了格式化分区 {device_path} 的操作。")
return False
logger.info(f"尝试格式化设备 {device_path}{fstype}")
format_cmd = []
if fstype == "ext4":
format_cmd = ["mkfs.ext4", "-F", device_path]
elif fstype == "xfs":
format_cmd = ["mkfs.xfs", "-f", device_path]
elif fstype == "fat32":
format_cmd = ["mkfs.vfat", "-F", "32", device_path]
elif fstype == "ntfs":
format_cmd = ["mkfs.ntfs", "-f", device_path]
else:
QMessageBox.critical(None, "错误", f"不支持的文件系统类型: {fstype}")
logger.error(f"不支持的文件系统类型: {fstype}")
return False
success, _, stderr = self._execute_shell_command(
format_cmd,
f"格式化设备 {device_path}{fstype} 失败"
)
if success:
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功格式化为 {fstype}")
return True
else:
return False
def mount_partition(self, device_path, mount_point, add_to_fstab=False):
"""
挂载一个分区。
:param device_path: 要挂载的设备路径,例如 /dev/sdb1。
:param mount_point: 挂载点,例如 /mnt/data。
:param add_to_fstab: 是否将挂载信息添加到 /etc/fstab。
:return: True 如果挂载成功,否则 False。
"""
if not isinstance(device_path, str) or not isinstance(mount_point, str):
logger.error(f"尝试挂载时传入无效的设备路径或挂载点。设备: {device_path} (类型: {type(device_path)}), 挂载点: {mount_point} (类型: {type(mount_point)})")
QMessageBox.critical(None, "错误", "无效的设备路径或挂载点。")
return False
logger.info(f"尝试挂载设备 {device_path}{mount_point}")
try:
# 确保挂载点存在
if not os.path.exists(mount_point):
logger.debug(f"创建挂载点目录: {mount_point}")
os.makedirs(mount_point, exist_ok=True)
# 执行挂载命令
success, _, stderr = self._execute_shell_command(
["mount", device_path, mount_point],
f"挂载设备 {device_path}{mount_point} 失败"
)
if success:
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功挂载到 {mount_point}")
if add_to_fstab:
self._add_to_fstab(device_path, mount_point)
return True
else:
return False
except Exception as e:
logger.error(f"挂载设备 {device_path} 时发生错误: {e}")
QMessageBox.critical(None, "错误", f"挂载设备 {device_path} 失败。\n错误详情: {e}")
return False
def unmount_partition(self, device_path, show_dialog_on_error=True):
"""
卸载一个分区。
:param device_path: 要卸载的设备路径,例如 /dev/sdb1 或 /dev/md0。
:param show_dialog_on_error: 是否在卸载失败时显示错误对话框。
:return: True 如果卸载成功,否则 False。
"""
if not isinstance(device_path, str):
logger.error(f"尝试卸载非字符串设备路径: {device_path} (类型: {type(device_path)})")
if show_dialog_on_error:
QMessageBox.critical(None, "错误", f"无效的设备路径: {device_path}")
return False
logger.info(f"尝试卸载设备: {device_path}")
try:
# 尝试从 fstab 中移除条目,如果存在的话
# 注意:这里先移除 fstab 条目,是为了防止卸载失败后 fstab 中仍然有无效条目
# 但如果卸载成功fstab 条目也需要移除。
# 如果卸载失败,且 fstab 移除成功,用户需要手动处理。
# 更好的做法可能是:卸载成功后,再移除 fstab 条目。
# 但为了简化逻辑,这里先移除,并假设卸载会成功。
# 实际上_remove_fstab_entry 应该在卸载成功后才调用,或者在删除分区/格式化时调用。
# 对于单纯的卸载操作,不应该自动移除 fstab 条目,除非用户明确要求。
# 考虑到当前场景是“卸载后刷新”通常用户只是想临时卸载fstab条目不应被删除。
# 所以这里暂时注释掉,或者只在删除分区/格式化时调用。
# self._remove_fstab_entry(device_path)
# 执行卸载命令
success, _, stderr = self._execute_shell_command(
["umount", device_path],
f"卸载设备 {device_path} 失败",
suppress_critical_dialog_on_stderr_match="not mounted" if not show_dialog_on_error else None
)
if success:
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功卸载。")
return success
except Exception as e:
logger.error(f"卸载设备 {device_path} 时发生错误: {e}")
if show_dialog_on_error:
QMessageBox.critical(None, "错误", f"卸载设备 {device_path} 失败。\n错误详情: {e}")
return False
def _add_to_fstab(self, device_path, mount_point):
"""
将设备的挂载信息添加到 /etc/fstab。
使用 UUID 方式添加。
"""
if not isinstance(device_path, str) or not isinstance(mount_point, str):
logger.error(f"尝试添加到 fstab 时传入无效的设备路径或挂载点。设备: {device_path} (类型: {type(device_path)}), 挂载点: {mount_point} (类型: {type(mount_point)})")
QMessageBox.critical(None, "错误", "无效的设备路径或挂载点,无法添加到 fstab。")
return False
logger.info(f"尝试将设备 {device_path} (挂载点: {mount_point}) 添加到 /etc/fstab。")
device_details = self.system_manager.get_device_details_by_path(device_path)
uuid = device_details.get('uuid') if device_details else None
fstype = device_details.get('fstype') if device_details else None
if not uuid or not fstype:
logger.error(f"无法获取设备 {device_path} 的 UUID 或文件系统类型,无法添加到 fstab。UUID: {uuid}, FSTYPE: {fstype}")
QMessageBox.critical(None, "错误", f"无法获取设备 {device_path} 的 UUID 或文件系统类型,无法添加到 /etc/fstab。")
return False
fstab_entry = f"UUID={uuid} {mount_point} {fstype} defaults 0 2"
fstab_path = self._get_fstab_path()
try:
# 检查是否已存在相同的挂载点或 UUID 条目
with open(fstab_path, 'r') as f:
lines = f.readlines()
for line in lines:
if f"UUID={uuid}" in line or mount_point in line.split():
logger.warning(f"fstab 中已存在设备 {device_path} (UUID: {uuid}) 或挂载点 {mount_point} 的条目,跳过添加。")
return True # 已经存在,认为成功
# 添加新条目
with open(fstab_path, 'a') as f:
f.write(fstab_entry + '\n')
logger.info(f"成功将 '{fstab_entry}' 添加到 {fstab_path}")
return True
except Exception as e:
logger.error(f"将设备 {device_path} 添加到 /etc/fstab 失败: {e}")
QMessageBox.critical(None, "错误", f"将设备 {device_path} 添加到 /etc/fstab 失败。\n错误详情: {e}")
return False
def _remove_fstab_entry(self, device_path):
"""
从 /etc/fstab 中移除指定设备的条目。
:param device_path: 设备路径,例如 /dev/sdb1 或 /dev/md0。
:return: True 如果成功移除或没有找到条目,否则 False。
"""
if not isinstance(device_path, str):
logger.warning(f"尝试移除 fstab 条目时传入非字符串设备路径: {device_path} (类型: {type(device_path)})")
return False
logger.info(f"尝试从 /etc/fstab 移除与 {device_path} 相关的条目。")
fstab_path = self._get_fstab_path()
try:
device_details = self.system_manager.get_device_details_by_path(device_path)
uuid = device_details.get('uuid') if device_details else None
logger.debug(f"尝试移除 fstab 条目,获取到的设备 {device_path} 的 UUID: {uuid}")
# 如果没有 UUID 且不是 /dev/ 路径,则无法可靠地从 fstab 中移除
if not uuid and not device_path.startswith('/dev/'):
logger.warning(f"无法获取设备 {device_path} 的 UUID也无法识别为 /dev/ 路径,跳过 fstab 移除。")
return False
with open(fstab_path, 'r') as f:
lines = f.readlines()
new_lines = []
removed_count = 0
for line in lines:
# 检查是否是注释或空行
if line.strip().startswith('#') or not line.strip():
new_lines.append(line)
continue
is_match = False
# 尝试匹配 UUID
if uuid:
if f"UUID={uuid}" in line:
is_match = True
# 尝试匹配设备路径 (如果 UUID 不存在或不匹配)
if not is_match and device_path:
# 分割行以避免部分匹配,例如 /dev/sda 匹配 /dev/sda1
parts = line.split()
if len(parts) > 0 and parts[0] == device_path:
is_match = True
if is_match:
logger.info(f"{fstab_path} 移除了条目: {line.strip()}")
removed_count += 1
else:
new_lines.append(line)
if removed_count > 0:
with open(fstab_path, 'w') as f:
f.writelines(new_lines)
logger.info(f"成功从 {fstab_path} 移除了 {removed_count} 个条目。")
return True
else:
logger.info(f"{fstab_path} 中未找到与 {device_path} 相关的条目。")
return True # 没有找到也算成功,因为目标是确保没有该条目
except FileNotFoundError:
logger.warning(f"fstab 文件 {fstab_path} 不存在。")
return True
except Exception as e:
logger.error(f"从 /etc/fstab 移除条目时发生错误: {e}")
QMessageBox.critical(None, "错误", f"从 /etc/fstab 移除条目失败。\n错误详情: {e}")
return False