# disk_operations.py import subprocess import logging import re import os from PySide6.QtWidgets import QMessageBox, QInputDialog from PySide6.QtCore import QObject, Signal, QThread from system_info import SystemInfoManager logger = logging.getLogger(__name__) # --- 新增: 后台格式化工作线程 --- class FormatWorker(QObject): # 定义信号,用于向主线程发送格式化结果 finished = Signal(bool, str, str, str) # 成功状态, 设备路径, 标准输出, 标准错误 started = Signal(str) # 设备路径 def __init__(self, device_path, fs_type, execute_shell_command_func, parent=None): super().__init__(parent) self.device_path = device_path self.fs_type = fs_type # 接收一个可调用的函数,用于执行shell命令 (这里是 DiskOperations._execute_shell_command) self._execute_shell_command_func = execute_shell_command_func def run(self): """ 在单独线程中执行格式化命令。 """ self.started.emit(self.device_path) logger.info(f"后台格式化开始: 设备 {self.device_path}, 文件系统 {self.fs_type}") command_list = [] if self.fs_type == "ext4": command_list = ["mkfs.ext4", "-F", self.device_path] # -F 强制执行 elif self.fs_type == "xfs": command_list = ["mkfs.xfs", "-f", self.device_path] # -f 强制执行 elif self.fs_type == "ntfs": command_list = ["mkfs.ntfs", "-f", self.device_path] # -f 强制执行 elif self.fs_type == "fat32": command_list = ["mkfs.vfat", "-F", "32", self.device_path] # -F 32 指定FAT32 else: logger.error(f"不支持的文件系统类型: {self.fs_type}") self.finished.emit(False, self.device_path, "", f"不支持的文件系统类型: {self.fs_type}") return # 调用传入的shell命令执行函数,并禁用其内部的QMessageBox显示 success, stdout, stderr = self._execute_shell_command_func( command_list, f"格式化设备 {self.device_path} 为 {self.fs_type} 失败", root_privilege=True, show_dialog=False # <--- 关键:阻止工作线程弹出 QMessageBox ) self.finished.emit(success, self.device_path, stdout, stderr) logger.info(f"后台格式化完成: 设备 {self.device_path}, 成功: {success}") class DiskOperations(QObject): # <--- 继承 QObject 以便发出信号 # 定义信号,用于通知主线程格式化操作的开始和结束 formatting_finished = Signal(bool, str, str, str) # 成功状态, 设备路径, 标准输出, 标准错误 formatting_started = Signal(str) # 设备路径 def __init__(self, system_manager: SystemInfoManager, lvm_ops, parent=None): # <--- 构造函数现在接收 lvm_ops 实例 super().__init__(parent) self.system_manager = system_manager self._lvm_ops = lvm_ops # 保存 LvmOperations 实例,以便调用其 _execute_shell_command self.active_format_workers = {} # 用于跟踪正在运行的格式化线程 # DiskOperations 内部的 _execute_shell_command 只是对 LvmOperations._execute_shell_command 的包装 # 这样所有 shell 命令都通过 LvmOperations 的统一入口,可以控制 show_dialog def _execute_shell_command(self, command_list, error_message, root_privilege=True, suppress_critical_dialog_on_stderr_match=None, input_data=None, show_dialog=True): return self._lvm_ops._execute_shell_command( command_list, error_message, root_privilege, suppress_critical_dialog_on_stderr_match, input_data, show_dialog ) def _add_to_fstab(self, device_path, mount_point, fstype, uuid): """ 将设备的挂载信息添加到 /etc/fstab。 使用 UUID 识别设备以提高稳定性。 """ if not uuid or not mount_point or not fstype: logger.error(f"无法将设备 {device_path} 添加到 fstab:缺少 UUID/挂载点/文件系统类型。") QMessageBox.warning(None, "警告", f"无法将设备 {device_path} 添加到 fstab:缺少 UUID/挂载点/文件系统类型。") return False fstab_entry = f"UUID={uuid} {mount_point} {fstype} defaults 0 2" fstab_path = "/etc/fstab" try: # 检查 fstab 中是否已存在相同 UUID 的条目 # 使用 _execute_shell_command 来读取 fstab,尽管通常不需要 sudo # 这里为了简化,直接读取文件,但写入时使用 _execute_shell_command with open(fstab_path, 'r') as f: fstab_content = f.readlines() for line in fstab_content: if f"UUID={uuid}" in line: logger.warning(f"设备 {device_path} (UUID={uuid}) 已存在于 fstab 中。跳过添加。") QMessageBox.information(None, "信息", f"设备 {device_path} (UUID={uuid}) 已存在于 fstab 中。") return True # 认为成功,因为目标已达成 # 如果不存在,则追加到 fstab # 使用 _execute_shell_command for sudo write success, _, stderr = self._execute_shell_command( ["sh", "-c", f"echo '{fstab_entry}' >> {fstab_path}"], f"将 {device_path} 添加到 {fstab_path} 失败", root_privilege=True ) if success: logger.info(f"已将 {fstab_entry} 添加到 {fstab_path}。") QMessageBox.information(None, "成功", f"设备 {device_path} 已成功添加到 /etc/fstab。") return True else: return False # Error handled by _execute_shell_command except Exception as e: logger.error(f"处理 {fstab_path} 失败: {e}") QMessageBox.critical(None, "错误", f"处理 {fstab_path} 失败: {e}") return False def _remove_fstab_entry(self, device_path): """ 从 /etc/fstab 中移除指定设备的条目。 此方法需要 SystemInfoManager 来获取设备的 UUID。 """ # NEW: 使用 system_manager 获取 UUID,它会处理路径解析 device_details = self.system_manager.get_device_details_by_path(device_path) if not device_details or not device_details.get('uuid'): logger.warning(f"无法获取设备 {device_path} 的 UUID,无法从 fstab 中移除。") return False uuid = device_details.get('uuid') fstab_path = "/etc/fstab" # Use sed for robust removal with sudo # suppress_critical_dialog_on_stderr_match is added to handle cases where fstab might not exist # or sed reports no changes, which is not a critical error for removal. command = ["sed", "-i", f"/{re.escape(uuid)}/d", fstab_path] # Use re.escape for UUID success, _, stderr = self._execute_shell_command( command, f"从 {fstab_path} 中删除 UUID={uuid} 的条目失败", root_privilege=True, suppress_critical_dialog_on_stderr_match=( f"sed: {fstab_path}: No such file or directory", # English f"sed: {fstab_path}: 没有那个文件或目录", # Chinese "no changes were made" # if sed finds nothing to delete ) ) if success: logger.info(f"已从 {fstab_path} 中删除 UUID={uuid} 的条目。") return True else: # If sed failed, it might be because the entry wasn't found, which is fine. # _execute_shell_command would have suppressed the dialog if it matched the suppress_critical_dialog_on_stderr_match. # So, if we reach here, it's either a real error (dialog shown by _execute_shell_command) # or a suppressed "no changes" type of error. In both cases, if no real error, we return True. if any(s in stderr for s in ( f"sed: {fstab_path}: No such file or directory", f"sed: {fstab_path}: 没有那个文件或目录", "no changes were made", "No such file or directory" # more general check )): logger.info(f"fstab 中未找到 UUID={uuid} 的条目,无需删除。") return True # Consider it a success if the entry is not there return False # Other errors are already handled by _execute_shell_command def _resolve_device_occupation(self, device_path, action_description="操作"): """ 尝试解决设备占用问题。 检查: 1. 是否是交换分区,如果是则提示用户关闭。 2. 是否有进程占用,如果有则提示用户终止。 :param device_path: 要检查的设备路径。 :param action_description: 正在尝试的操作描述,用于用户提示。 :return: True 如果占用已解决或没有占用,False 如果用户取消或解决失败。 """ logger.info(f"尝试解决设备 {device_path} 的占用问题,以便进行 {action_description}。") # 1. 检查是否是交换分区 block_devices = self.system_manager.get_block_devices() device_info = self.system_manager._find_device_by_path_recursive(block_devices, device_path) # Check if it's a swap partition and currently active if device_info and device_info.get('fstype') == 'swap' and device_info.get('mountpoint') == '[SWAP]': reply = QMessageBox.question( None, "设备占用 - 交换分区", f"设备 {device_path} 是一个活跃的交换分区。为了进行 {action_description},需要关闭它。您要继续吗?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No ) if reply == QMessageBox.No: logger.info(f"用户取消了关闭交换分区 {device_path}。") return False logger.info(f"尝试关闭交换分区 {device_path}。") success, _, stderr = self._execute_shell_command( ["swapoff", device_path], f"关闭交换分区 {device_path} 失败", show_dialog=True # Show dialog for swapoff failure ) if not success: logger.error(f"关闭交换分区 {device_path} 失败: {stderr}") QMessageBox.critical(None, "错误", f"关闭交换分区 {device_path} 失败。无法进行 {action_description}。") return False QMessageBox.information(None, "信息", f"交换分区 {device_path} 已成功关闭。") # After swapoff, it might still be reported by fuser, so continue to fuser check. # 2. 检查是否有进程占用 (使用 fuser) success_fuser, stdout_fuser, stderr_fuser = self._execute_shell_command( ["fuser", "-vm", device_path], f"检查设备 {device_path} 占用失败", show_dialog=False, # Don't show dialog if fuser fails (e.g., device not busy) suppress_critical_dialog_on_stderr_match=( "No such file or directory", # if device doesn't exist "not found", # if fuser itself isn't found "Usage:" # if fuser gets invalid args, though unlikely here ) ) pids = [] process_info_lines = [] # Parse fuser output only if it was successful and has stdout if success_fuser and stdout_fuser: for line in stdout_fuser.splitlines(): # Example: "/dev/sdb1: root 12078 .rce. gpg-agent" match = re.match(r'^\S+:\s+\S+\s+(\d+)\s+.*', line) if match: pid = match.group(1) pids.append(pid) process_info_lines.append(line.strip()) if pids: process_list_str = "\n".join(process_info_lines) reply = QMessageBox.question( None, "设备占用 - 进程", f"设备 {device_path} 正在被以下进程占用:\n{process_list_str}\n\n您要强制终止这些进程吗?这可能会导致数据丢失或系统不稳定!", QMessageBox.Yes | QMessageBox.No, QMessageBox.No ) if reply == QMessageBox.No: logger.info(f"用户取消了终止占用设备 {device_path} 的进程。") return False logger.info(f"尝试终止占用设备 {device_path} 的进程: {', '.join(pids)}。") all_killed = True for pid in pids: kill_success, _, kill_stderr = self._execute_shell_command( ["kill", "-9", pid], f"终止进程 {pid} 失败", show_dialog=True # Show dialog for kill failure ) if not kill_success: logger.error(f"终止进程 {pid} 失败: {kill_stderr}") all_killed = False if not all_killed: QMessageBox.critical(None, "错误", f"未能终止所有占用设备 {device_path} 的进程。无法进行 {action_description}。") return False QMessageBox.information(None, "信息", f"已尝试终止占用设备 {device_path} 的所有进程。") else: logger.info(f"设备 {device_path} 未被任何进程占用。") logger.info(f"设备 {device_path} 的占用问题已尝试解决。") return True def mount_partition(self, device_path, mount_point, add_to_fstab=False): """ 挂载指定设备到指定挂载点。 :param device_path: 要挂载的设备路径。 :param mount_point: 挂载点。 :param add_to_fstab: 是否添加到 /etc/fstab。 :return: True 如果成功,否则 False。 """ if not os.path.exists(mount_point): try: os.makedirs(mount_point) logger.info(f"创建挂载点目录: {mount_point}") except OSError as e: QMessageBox.critical(None, "错误", f"创建挂载点目录 {mount_point} 失败: {e}") logger.error(f"创建挂载点目录 {mount_point} 失败: {e}") return False logger.info(f"尝试挂载设备 {device_path} 到 {mount_point}。") success, _, stderr = self._execute_shell_command( ["mount", device_path, mount_point], f"挂载设备 {device_path} 失败" ) if success: QMessageBox.information(None, "成功", f"设备 {device_path} 已成功挂载到 {mount_point}。") if add_to_fstab: # NEW: 使用 self.system_manager 获取 fstype 和 UUID device_details = self.system_manager.get_device_details_by_path(device_path) if device_details: fstype = device_details.get('fstype') uuid = device_details.get('uuid') if fstype and uuid: self._add_to_fstab(device_path, mount_point, fstype, uuid) else: logger.error(f"无法获取设备 {device_path} 的文件系统类型或 UUID 以添加到 fstab。") QMessageBox.warning(None, "警告", f"设备 {device_path} 已挂载,但无法获取文件系统类型或 UUID 以添加到 fstab。") else: logger.error(f"无法获取设备 {device_path} 的详细信息以添加到 fstab。") QMessageBox.warning(None, "警告", f"设备 {device_path} 已挂载,但无法获取详细信息以添加到 fstab。") return True else: return False def unmount_partition(self, device_path, show_dialog_on_error=True): """ 卸载指定设备。 :param device_path: 要卸载的设备路径。 :param show_dialog_on_error: 是否在发生错误时显示对话框。 :return: True 如果成功,否则 False。 """ logger.info(f"尝试卸载设备 {device_path}。") # 定义表示设备已未挂载的错误信息(中英文) already_unmounted_errors = ("not mounted", "未挂载", "未指定挂载点") device_busy_errors = ("device is busy", "设备或资源忙", "Device or resource busy") # Common "device busy" messages # First attempt to unmount success, _, stderr = self._execute_shell_command( ["umount", device_path], f"卸载设备 {device_path} 失败", suppress_critical_dialog_on_stderr_match=already_unmounted_errors + device_busy_errors, # Suppress both types of errors for initial attempt show_dialog=False # Don't show dialog on first attempt, we'll handle it ) if success: QMessageBox.information(None, "成功", f"设备 {device_path} 已成功卸载。") return True else: # Check if it failed because it was already unmounted if any(s in stderr for s in already_unmounted_errors): logger.info(f"设备 {device_path} 已经处于未挂载状态(或挂载点未指定),无需重复卸载。") return True # Consider it a success # Check if it failed because the device was busy if any(s in stderr for s in device_busy_errors): logger.warning(f"卸载设备 {device_path} 失败,设备忙。尝试解决占用问题。") # Call the new helper to resolve occupation if self._resolve_device_occupation(device_path, action_description=f"卸载 {device_path}"): logger.info(f"设备 {device_path} 占用问题已解决,重试卸载。") # Retry unmount after resolving occupation retry_success, _, retry_stderr = self._execute_shell_command( ["umount", device_path], f"重试卸载设备 {device_path} 失败", suppress_critical_dialog_on_stderr_match=already_unmounted_errors, show_dialog=show_dialog_on_error # Show dialog if retry fails for other reasons ) if retry_success: QMessageBox.information(None, "成功", f"设备 {device_path} 已成功卸载。") return True else: logger.error(f"重试卸载设备 {device_path} 失败: {retry_stderr}") if show_dialog_on_error and not any(s in retry_stderr for s in already_unmounted_errors): QMessageBox.critical(None, "错误", f"重试卸载设备 {device_path} 失败。\n错误详情: {retry_stderr}") return False else: logger.info(f"用户取消或未能解决设备 {device_path} 的占用问题。") if show_dialog_on_error: QMessageBox.critical(None, "错误", f"未能解决设备 {device_path} 的占用问题,无法卸载。") return False else: # Other types of unmount failures logger.error(f"卸载设备 {device_path} 失败: {stderr}") if show_dialog_on_error: QMessageBox.critical(None, "错误", f"卸载设备 {device_path} 失败。\n错误详情: {stderr}") return False def get_disk_free_space_info_mib(self, disk_path, total_disk_mib): """ 获取磁盘上最大的空闲空间块的起始位置 (MiB) 和大小 (MiB)。 :param disk_path: 磁盘路径。 :param total_disk_mib: 磁盘的总大小 (MiB),用于全新磁盘的计算。 :return: (start_mib, size_mib) 元组。如果磁盘是全新的,返回 (0.0, total_disk_mib)。 如果磁盘有分区表但没有空闲空间,返回 (None, None)。 """ logger.debug(f"尝试获取磁盘 {disk_path} 的空闲空间信息 (MiB)。") # suppress_critical_dialog_on_stderr_match is added to handle cases where parted might complain about # an unrecognized disk label, which is expected for a fresh disk. success, stdout, stderr = self._execute_shell_command( ["parted", "-s", disk_path, "unit", "MiB", "print", "free"], f"获取磁盘 {disk_path} 分区信息失败", root_privilege=True, suppress_critical_dialog_on_stderr_match=("无法辨识的磁盘卷标", "unrecognized disk label") ) if not success: # If parted failed and it wasn't due to an unrecognized label (handled by suppress_critical_dialog_on_stderr_match), # then _execute_shell_command would have shown a dialog. # We just log and return None. logger.error(f"获取磁盘 {disk_path} 空闲空间信息失败: {stderr}") return None, None logger.debug(f"parted print free 命令原始输出:\n{stdout}") free_spaces = [] lines = stdout.splitlines() # Regex to capture StartMiB and SizeMiB from a "Free Space" line # It's made more flexible to match "Free Space", "空闲空间", and "可用空间" # Example: " 0.02MiB 8192MiB 8192MiB 可用空间" # We need to capture the first numeric value (Start) and the third numeric value (Size). free_space_line_pattern = re.compile(r'^\s*(\d+\.?\d*)MiB\s+(\d+\.?\d*)MiB\s+(\d+\.?\d*)MiB\s+(?:Free Space|空闲空间|可用空间)') for line in lines: match = free_space_line_pattern.match(line) if match: try: start_mib = float(match.group(1)) # Capture StartMiB size_mib = float(match.group(3)) # Capture SizeMiB (the third numeric value) free_spaces.append({'start_mib': start_mib, 'size_mib': size_mib}) except ValueError as ve: logger.warning(f"解析 parted free space 行 '{line}' 失败: {ve}") continue if not free_spaces: logger.warning(f"在磁盘 {disk_path} 上未找到空闲空间。") return None, None # 找到最大的空闲空间块 largest_free_space = max(free_spaces, key=lambda x: x['size_mib']) start_mib = largest_free_space['start_mib'] size_mib = largest_free_space['size_mib'] logger.debug(f"磁盘 {disk_path} 的最大空闲空间块起始于 {start_mib:.2f} MiB,大小为 {size_mib:.2f} MiB。") return start_mib, size_mib def create_partition(self, disk_path, partition_table_type, size_gb, total_disk_mib, use_max_space): """ 在指定磁盘上创建分区。 :param disk_path: 磁盘路径 (例如 /dev/sdb)。 :param partition_table_type: 分区表类型 ('gpt' 或 'msdos')。 :param size_gb: 分区大小 (GB)。 :param total_disk_mib: 磁盘总大小 (MiB)。 :param use_max_space: 是否使用最大可用空间。 :return: True 如果成功,否则 False。 """ if not isinstance(disk_path, str) or not isinstance(partition_table_type, str): logger.error(f"尝试创建分区时传入无效参数。磁盘路径: {disk_path}, 分区表类型: {partition_table_type}") QMessageBox.critical(None, "错误", "无效的磁盘路径或分区表类型。") return False # NEW: 尝试解决磁盘占用问题 if not self._resolve_device_occupation(disk_path, action_description=f"在 {disk_path} 上创建分区"): logger.info(f"用户取消或未能解决磁盘 {disk_path} 的占用问题,取消创建分区。") return False # 1. 检查磁盘是否有分区表 has_partition_table = False # Use suppress_critical_dialog_on_stderr_match for "unrecognized disk label" when checking success_check, stdout_check, stderr_check = self._execute_shell_command( ["parted", "-s", disk_path, "print"], f"检查磁盘 {disk_path} 分区表失败", suppress_critical_dialog_on_stderr_match=("无法辨识的磁盘卷标", "unrecognized disk label"), root_privilege=True ) # If success_check is False, it means _execute_shell_command encountered an error. # If that error was "unrecognized disk label", it was suppressed, and we treat it as no partition table. # If it was another error, a dialog was shown by _execute_shell_command, and we should stop. if not success_check: # Check if the failure was due to "unrecognized disk label" (which means no partition table) if any(s in stderr_check for s in ("无法辨识的磁盘卷标", "unrecognized disk label")): logger.info(f"磁盘 {disk_path} 没有可识别的分区表。") has_partition_table = False # Explicitly set to False else: logger.error(f"检查磁盘 {disk_path} 分区表时发生非预期的错误: {stderr_check}") return False # Other critical error, stop operation. else: # success_check is True if "Partition Table: unknown" not in stdout_check and "分区表:unknown" not in stdout_check: has_partition_table = True else: logger.info(f"parted print 报告磁盘 {disk_path} 的分区表为 'unknown'。") has_partition_table = False actual_start_mib_for_parted = 0.0 # 2. 如果没有分区表,则创建分区表 if not has_partition_table: reply = QMessageBox.question(None, "确认创建分区表", f"磁盘 {disk_path} 没有分区表。您确定要创建 {partition_table_type} 分区表吗?" f"此操作将擦除磁盘上的所有数据。", QMessageBox.Yes | QMessageBox.No, QMessageBox.No) if reply == QMessageBox.No: logger.info(f"用户取消了在 {disk_path} 上创建分区表的操作。") return False logger.info(f"尝试在 {disk_path} 上创建 {partition_table_type} 分区表。") success, _, stderr = self._execute_shell_command( ["parted", "-s", disk_path, "mklabel", partition_table_type], f"创建 {partition_table_type} 分区表失败" ) if not success: return False # 对于新创建分区表的磁盘,第一个分区通常从 1MiB 开始以确保兼容性和对齐 actual_start_mib_for_parted = 1.0 else: # 如果有分区表,获取下一个可用分区的起始位置 start_mib_from_parted, _ = self.get_disk_free_space_info_mib(disk_path, total_disk_mib) if start_mib_from_parted is None: QMessageBox.critical(None, "错误", f"无法确定磁盘 {disk_path} 的分区起始位置或没有空闲空间。") return False # 如果 parted 报告的起始位置非常接近 0 MiB (例如 0.0 MiB 或 0.02 MiB), # 为了安全和兼容性,也将其调整为 1.0 MiB。 # 否则,使用 parted 报告的精确起始位置。 if start_mib_from_parted < 1.0: # covers 0.0MiB and values like 0.017MiB (17.4kB) actual_start_mib_for_parted = 1.0 else: actual_start_mib_for_parted = start_mib_from_parted # 3. 确定分区结束位置 if use_max_space: end_pos = "100%" size_for_log = "最大可用空间" else: # 计算结束 MiB # 注意:这里计算的 end_mib 是基于用户请求的大小, # parted 会根据实际可用空间和对齐进行微调。 end_mib = actual_start_mib_for_parted + size_gb * 1024 # 简单检查,如果请求的大小导致结束位置超出磁盘总大小,则使用最大可用 if end_mib > total_disk_mib: QMessageBox.warning(None, "警告", f"请求的分区大小 ({size_gb}GB) 超出了可用空间。将调整为最大可用空间。") end_pos = "100%" size_for_log = "最大可用空间" else: end_pos = f"{end_mib}MiB" size_for_log = f"{size_gb}GB" # 4. 创建分区 create_cmd = ["parted", "-s", disk_path, "mkpart", "primary", f"{actual_start_mib_for_parted}MiB", end_pos] logger.info(f"尝试在 {disk_path} 上创建 {size_for_log} 的主分区。命令: {' '.join(create_cmd)}") success, _, stderr = self._execute_shell_command( create_cmd, f"在 {disk_path} 上创建分区失败" ) if success: QMessageBox.information(None, "成功", f"在 {disk_path} 上成功创建了 {size_for_log} 的分区。") return True else: return False def delete_partition(self, device_path): """ 删除指定分区。 :param device_path: 要删除的分区路径。 :return: True 如果成功,否则 False。 """ # NEW: 尝试解决分区占用问题 if not self._resolve_device_occupation(device_path, action_description=f"删除分区 {device_path}"): logger.info(f"用户取消或未能解决分区 {device_path} 的占用问题,取消删除分区。") return False reply = QMessageBox.question(None, "确认删除分区", f"您确定要删除分区 {device_path} 吗?此操作将擦除分区上的所有数据!", QMessageBox.Yes | QMessageBox.No, QMessageBox.No) if reply == QMessageBox.No: logger.info(f"用户取消了删除分区 {device_path} 的操作。") return False # 尝试卸载分区 (此方法内部会尝试解决占用问题) self.unmount_partition(device_path, show_dialog_on_error=False) # 从 fstab 中移除条目 # _remove_fstab_entry also handles "not found" gracefully. self._remove_fstab_entry(device_path) # 获取父磁盘和分区号 # 例如 /dev/sdb1 -> /dev/sdb, 1 match = re.match(r'(/dev/[a-z]+)(\d+)', device_path) if not match: QMessageBox.critical(None, "错误", f"无法解析设备路径 {device_path}。") logger.error(f"无法解析设备路径 {device_path}。") return False disk_path = match.group(1) partition_number = match.group(2) logger.info(f"尝试删除分区 {device_path} (磁盘: {disk_path}, 分区号: {partition_number})。") success, _, stderr = self._execute_shell_command( ["parted", "-s", disk_path, "rm", partition_number], f"删除分区 {device_path} 失败" ) if success: QMessageBox.information(None, "成功", f"分区 {device_path} 已成功删除。") return True else: return False def format_partition(self, device_path, fstype=None): """ 启动一个 QInputDialog 让用户选择文件系统类型,然后将格式化操作提交到后台线程。 :param device_path: 要格式化的分区路径。 :param fstype: 文件系统类型 (例如 'ext4', 'xfs', 'fat32', 'ntfs')。如果为 None,则弹出对话框让用户选择。 :return: True 如果成功启动后台任务,否则 False。 """ if fstype is None: items = ("ext4", "xfs", "fat32", "ntfs") fstype, ok = QInputDialog.getItem(None, "选择文件系统", f"请选择要使用的文件系统类型 for {device_path}:", items, 0, False) if not ok or not fstype: logger.info("用户取消了文件系统选择。") return False # NEW: 尝试解决设备占用问题 if not self._resolve_device_occupation(device_path, action_description=f"格式化设备 {device_path}"): logger.info(f"用户取消或未能解决设备 {device_path} 的占用问题,取消格式化。") return False reply = QMessageBox.question(None, "确认格式化", f"您确定要格式化设备 {device_path} 为 {fstype} 吗?此操作将擦除所有数据!", QMessageBox.Yes | QMessageBox.No, QMessageBox.No) if reply == QMessageBox.No: logger.info(f"用户取消了格式化 {device_path} 的操作。") return False # 检查是否已有格式化任务正在进行 if device_path in self.active_format_workers: QMessageBox.warning(None, "警告", f"设备 {device_path} 正在格式化中,请勿重复操作。") return False # 尝试卸载分区(静默,不弹对话框) self.unmount_partition(device_path, show_dialog_on_error=False) # 从 fstab 中移除条目 (因为格式化会改变 UUID) self._remove_fstab_entry(device_path) # 创建 QThread 和 FormatWorker 实例 thread = QThread() # 将 self._execute_shell_command (它内部调用 LvmOperations._execute_shell_command) 传递给工作线程 worker = FormatWorker(device_path, fstype, self._execute_shell_command) worker.moveToThread(thread) # 连接信号和槽 thread.started.connect(worker.run) # 线程启动时执行 worker 的 run 方法 worker.finished.connect(lambda s, dp, o, e: self._on_formatting_finished(s, dp, o, e)) # worker 完成时调用处理函数 worker.finished.connect(thread.quit) # worker 完成时退出线程 worker.finished.connect(worker.deleteLater) # worker 完成时自动删除 worker 对象 thread.finished.connect(thread.deleteLater) # 线程退出时自动删除线程对象 self.active_format_workers[device_path] = thread # 存储线程以便管理 self.formatting_started.emit(device_path) # 发出信号通知主界面格式化已开始 thread.start() # 启动线程 QMessageBox.information(None, "开始格式化", f"设备 {device_path} 正在后台格式化为 {fstype}。完成后将通知您。") return True def _on_formatting_finished(self, success, device_path, stdout, stderr): """ 处理格式化工作线程完成后的结果。此槽函数在主线程中执行。 """ if device_path in self.active_format_workers: del self.active_format_workers[device_path] # 从跟踪列表中移除 # 在主线程中显示最终结果的消息框 if success: QMessageBox.information(None, "格式化成功", f"设备 {device_path} 已成功格式化。") else: QMessageBox.critical(None, "格式化失败", f"格式化设备 {device_path} 失败。\n错误详情: {stderr}") self.formatting_finished.emit(success, device_path, stdout, stderr) # 发出信号通知 MainWindow 刷新界面