238 lines
11 KiB
Python
238 lines
11 KiB
Python
# disk_operations.py
|
||
import os
|
||
import logging
|
||
from PySide6.QtWidgets import QMessageBox, QInputDialog
|
||
from system_info import SystemInfoManager # 引入 SystemInfoManager 来复用 _run_command
|
||
|
||
logger = logging.getLogger(__name__) # 获取当前模块的 logger 实例
|
||
|
||
class DiskOperations:
|
||
def __init__(self):
|
||
self.system_manager = SystemInfoManager() # 复用 SystemInfoManager 的命令执行器
|
||
|
||
def _get_device_path(self, device_name):
|
||
"""
|
||
辅助函数:将设备名(如 'sda1')转换为完整路径(如 '/dev/sda1')。
|
||
"""
|
||
if not device_name.startswith('/dev/'):
|
||
return f'/dev/{device_name}'
|
||
return device_name
|
||
|
||
def mount_partition(self, device_name, mount_point=None):
|
||
"""
|
||
挂载指定的分区。
|
||
如果未提供挂载点,会尝试查找现有挂载点或提示用户输入。
|
||
"""
|
||
dev_path = self._get_device_path(device_name)
|
||
logger.info(f"尝试挂载设备: {dev_path}")
|
||
|
||
if not mount_point:
|
||
# 尝试从系统信息中获取当前挂载点
|
||
devices = self.system_manager.get_block_devices()
|
||
found_mount_point = None
|
||
for dev in devices:
|
||
if dev.get('name') == device_name:
|
||
found_mount_point = dev.get('mountpoint')
|
||
break
|
||
if 'children' in dev:
|
||
for child in dev['children']:
|
||
if child.get('name') == device_name:
|
||
found_mount_point = child.get('mountpoint')
|
||
break
|
||
if found_mount_point:
|
||
break
|
||
|
||
if found_mount_point and found_mount_point != '[SWAP]' and found_mount_point != '':
|
||
# 如果设备已经有挂载点,并且不是SWAP,则直接使用
|
||
mount_point = found_mount_point
|
||
logger.info(f"设备 {dev_path} 已经挂载到 {mount_point}。")
|
||
QMessageBox.information(None, "信息", f"设备 {dev_path} 已经挂载到 {mount_point}。")
|
||
return True # 已经挂载,视为成功
|
||
else:
|
||
# 如果没有挂载点,或者挂载点是SWAP,则提示用户输入
|
||
mount_point, ok = QInputDialog.getText(None, "挂载分区",
|
||
f"请输入 {dev_path} 的挂载点 (例如: /mnt/data):",
|
||
text=f"/mnt/{device_name}")
|
||
if not ok or not mount_point:
|
||
logger.info("用户取消了挂载操作或未提供挂载点。")
|
||
return False
|
||
|
||
# 确保挂载点目录存在
|
||
if not os.path.exists(mount_point):
|
||
try:
|
||
os.makedirs(mount_point, exist_ok=True)
|
||
logger.info(f"创建挂载点目录: {mount_point}")
|
||
except OSError as e:
|
||
logger.error(f"创建挂载点目录失败 {mount_point}: {e}")
|
||
QMessageBox.critical(None, "错误", f"创建挂载点目录失败: {e}")
|
||
return False
|
||
|
||
try:
|
||
stdout, stderr = self.system_manager._run_command(["mount", dev_path, mount_point], root_privilege=True)
|
||
logger.info(f"成功挂载 {dev_path} 到 {mount_point}")
|
||
QMessageBox.information(None, "成功", f"成功挂载 {dev_path} 到 {mount_point}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"挂载 {dev_path} 失败: {e}")
|
||
QMessageBox.critical(None, "错误", f"挂载 {dev_path} 失败: {e}")
|
||
return False
|
||
|
||
def unmount_partition(self, device_name):
|
||
"""
|
||
卸载指定的分区。
|
||
"""
|
||
dev_path = self._get_device_path(device_name)
|
||
logger.info(f"尝试卸载设备: {dev_path}")
|
||
|
||
# 尝试从系统信息中获取当前挂载点
|
||
current_mount_point = None
|
||
devices = self.system_manager.get_block_devices()
|
||
for dev in devices:
|
||
if dev.get('name') == device_name:
|
||
current_mount_point = dev.get('mountpoint')
|
||
break
|
||
if 'children' in dev:
|
||
for child in dev['children']:
|
||
if child.get('name') == device_name:
|
||
current_mount_point = child.get('mountpoint')
|
||
break
|
||
if current_mount_point:
|
||
break
|
||
|
||
if not current_mount_point or current_mount_point == '[SWAP]' or current_mount_point == '':
|
||
logger.warning(f"设备 {dev_path} 未挂载或无法确定挂载点,无法卸载。")
|
||
QMessageBox.warning(None, "警告", f"设备 {dev_path} 未挂载或无法确定挂载点,无法卸载。")
|
||
return False
|
||
|
||
try:
|
||
# 尝试通过挂载点卸载,通常更可靠
|
||
stdout, stderr = self.system_manager._run_command(["umount", current_mount_point], root_privilege=True)
|
||
logger.info(f"成功卸载 {current_mount_point} ({dev_path})")
|
||
QMessageBox.information(None, "成功", f"成功卸载 {current_mount_point} ({dev_path})")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"卸载 {current_mount_point} ({dev_path}) 失败: {e}")
|
||
QMessageBox.critical(None, "错误", f"卸载 {current_mount_point} ({dev_path}) 失败: {e}")
|
||
return False
|
||
|
||
def delete_partition(self, device_name):
|
||
"""
|
||
删除指定的分区。
|
||
此操作不可逆,会丢失数据。
|
||
"""
|
||
dev_path = self._get_device_path(device_name)
|
||
logger.info(f"尝试删除分区: {dev_path}")
|
||
|
||
# 安全检查: 确保是分区,而不是整个磁盘
|
||
# 简单的检查方法是看设备名是否包含数字 (如 sda1, nvme0n1p1)
|
||
if not any(char.isdigit() for char in device_name):
|
||
QMessageBox.warning(None, "警告", f"{dev_path} 看起来不是一个分区。为安全起见,不执行删除操作。")
|
||
logger.warning(f"尝试删除整个磁盘 {dev_path},已阻止。")
|
||
return False
|
||
|
||
reply = QMessageBox.question(None, "确认删除分区",
|
||
f"你确定要删除分区 {dev_path} 吗?\n"
|
||
"此操作不可逆,将导致数据丢失!",
|
||
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
|
||
if reply == QMessageBox.No:
|
||
logger.info(f"用户取消了删除分区 {dev_path} 的操作。")
|
||
return False
|
||
|
||
# 首先尝试卸载分区(如果已挂载)
|
||
try:
|
||
# umount 如果设备未挂载会返回非零退出码,所以这里 check_output=False
|
||
self.system_manager._run_command(["umount", dev_path], root_privilege=True, check_output=False)
|
||
logger.info(f"尝试卸载 {dev_path} (如果已挂载)。")
|
||
except Exception as e:
|
||
logger.warning(f"卸载 {dev_path} 失败或未挂载: {e}")
|
||
# 继续执行,因为即使卸载失败也可能能删除
|
||
|
||
# 使用 parted 来删除分区
|
||
# parted 需要父磁盘设备名和分区号
|
||
# 例如,对于 /dev/sda1,需要 /dev/sda 和分区号 1
|
||
partition_number_str = ''.join(filter(str.isdigit, device_name)) # 从 sda1 提取 "1"
|
||
if not partition_number_str:
|
||
QMessageBox.critical(None, "错误", f"无法从 {device_name} 解析分区号。")
|
||
logger.error(f"无法从 {device_name} 解析分区号。")
|
||
return False
|
||
|
||
parent_disk_name = device_name.rstrip(partition_number_str) # 从 sda1 提取 "sda"
|
||
parent_disk_path = self._get_device_path(parent_disk_name)
|
||
|
||
try:
|
||
# parted -s /dev/sda rm 1
|
||
stdout, stderr = self.system_manager._run_command(
|
||
["parted", "-s", parent_disk_path, "rm", partition_number_str],
|
||
root_privilege=True
|
||
)
|
||
logger.info(f"成功删除分区 {dev_path}")
|
||
QMessageBox.information(None, "成功", f"成功删除分区 {dev_path}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"删除分区 {dev_path} 失败: {e}")
|
||
QMessageBox.critical(None, "错误", f"删除分区 {dev_path} 失败: {e}")
|
||
return False
|
||
|
||
def format_partition(self, device_name, fstype=None):
|
||
"""
|
||
格式化指定的分区。
|
||
此操作不可逆,会丢失数据。
|
||
"""
|
||
dev_path = self._get_device_path(device_name)
|
||
logger.info(f"尝试格式化分区: {dev_path}")
|
||
|
||
# 安全检查: 确保是分区
|
||
if not any(char.isdigit() for char in device_name):
|
||
QMessageBox.warning(None, "警告", f"{dev_path} 看起来不是一个分区。为安全起见,不执行格式化操作。")
|
||
logger.warning(f"尝试格式化整个磁盘 {dev_path},已阻止。")
|
||
return False
|
||
|
||
if not fstype:
|
||
# 提示用户选择文件系统类型
|
||
fstypes = ["ext4", "xfs", "fat32", "ntfs"] # 常用文件系统
|
||
fstype, ok = QInputDialog.getItem(None, "格式化分区",
|
||
f"请选择 {dev_path} 的文件系统类型:",
|
||
fstypes, 0, False) # 默认选择 ext4
|
||
if not ok or not fstype:
|
||
logger.info("用户取消了格式化操作或未选择文件系统。")
|
||
return False
|
||
|
||
reply = QMessageBox.question(None, "确认格式化分区",
|
||
f"你确定要格式化分区 {dev_path} 为 {fstype} 吗?\n"
|
||
"此操作不可逆,将导致数据丢失!",
|
||
QMessageBox.Yes | QMessageBox.No, QMessageBox.No)
|
||
if reply == QMessageBox.No:
|
||
logger.info(f"用户取消了格式化分区 {dev_path} 的操作。")
|
||
return False
|
||
|
||
# 首先尝试卸载分区(如果已挂载)
|
||
try:
|
||
self.system_manager._run_command(["umount", dev_path], root_privilege=True, check_output=False)
|
||
logger.info(f"尝试卸载 {dev_path} (如果已挂载)。")
|
||
except Exception as e:
|
||
logger.warning(f"卸载 {dev_path} 失败或未挂载: {e}")
|
||
|
||
# 执行 mkfs 命令
|
||
try:
|
||
mkfs_cmd = []
|
||
if fstype == "ext4":
|
||
mkfs_cmd = ["mkfs.ext4", "-F", dev_path] # -F 强制执行
|
||
elif fstype == "xfs":
|
||
mkfs_cmd = ["mkfs.xfs", "-f", dev_path] # -f 强制执行
|
||
elif fstype == "fat32":
|
||
mkfs_cmd = ["mkfs.fat", "-F", "32", dev_path]
|
||
elif fstype == "ntfs":
|
||
mkfs_cmd = ["mkfs.ntfs", "-f", dev_path] # -f 强制执行
|
||
else:
|
||
raise ValueError(f"不支持的文件系统类型: {fstype}")
|
||
|
||
stdout, stderr = self.system_manager._run_command(mkfs_cmd, root_privilege=True)
|
||
logger.info(f"成功格式化分区 {dev_path} 为 {fstype}")
|
||
QMessageBox.information(None, "成功", f"成功格式化分区 {dev_path} 为 {fstype}")
|
||
return True
|
||
except Exception as e:
|
||
logger.error(f"格式化分区 {dev_path} 失败: {e}")
|
||
QMessageBox.critical(None, "错误", f"格式化分区 {dev_path} 失败: {e}")
|
||
return False
|
||
|