# occupation_resolver.py import logging import re from typing import List, Tuple, Callable, Optional # 假设 QApplication 和 QMessageBox 在主 UI 线程中可用, # 并且我们在这里直接使用它们进行用户交互。 # 在更严格的架构中,UI交互应该通过回调或事件机制与核心逻辑分离。 # from PyQt5.QtWidgets import QMessageBox from PySide6.QtWidgets import (QApplication, QMainWindow, QTreeWidgetItem, QMessageBox, QHeaderView, QMenu, QInputDialog, QDialog) logger = logging.getLogger(__name__) class OccupationResolver: """ 负责检查和解除设备占用的类。 处理包括内核挂载、嵌套挂载和用户进程占用等情况。 """ def __init__(self, shell_executor_func: Callable, mount_info_getter_func: Callable): """ 初始化设备占用解决器。 Args: shell_executor_func: 一个可调用对象,用于执行 shell 命令。 期望签名: (cmd: List[str], error_msg: str, show_dialog: bool, suppress_critical_dialog_on_stderr_match: Tuple, expected_non_zero_exit_codes: List[int]) -> Tuple[bool, str, str] mount_info_getter_func: 一个可调用对象,用于获取设备的挂载点。 期望签名: (device_path: str) -> Optional[str] """ self._execute_shell_command = shell_executor_func self._get_mountpoint_for_device = mount_info_getter_func def _run_command(self, cmd: List[str], error_msg: str, **kwargs) -> Tuple[bool, str, str]: """ 执行 shell 命令的包装器。 """ return self._execute_shell_command(cmd, error_msg, **kwargs) def get_all_mounts_under_path(self, base_path: str) -> List[str]: """ 获取指定路径下所有嵌套的挂载点。 返回一个挂载点列表,从最深层到最浅层排序。 """ mounts = [] # 尝试使用 findmnt 命令更可靠地获取嵌套挂载信息 # -l: 列出所有挂载 # -o TARGET: 只输出目标路径 # -n: 不显示标题行 # -r: 原始输出,方便解析 # --target : 限制只查找 base_path 下的挂载 # -t noautofs,nosnapfs,nofuse,nocifs,nonfs,notmpfs,nobind: 排除一些不相关的或虚拟的挂载 findmnt_cmd = ["findmnt", "-l", "-o", "TARGET", "-n", "-r", "--target", base_path, "-t", "noautofs,nosnapfs,nofuse,nocifs,nonfs,notmpfs,nobind"] success, stdout, stderr = self._run_command(findmnt_cmd, f"获取 {base_path} 下的挂载信息失败", show_dialog=False) if not success: logger.warning(f"findmnt 命令失败,尝试解析 'mount' 命令输出: {stderr}") # 如果 findmnt 失败或不可用,则回退到解析 'mount' 命令输出 success, stdout, stderr = self._run_command(["mount"], "获取挂载信息失败", show_dialog=False) if not success: logger.error(f"获取挂载信息失败: {stderr}") return [] for line in stdout.splitlines(): match = re.search(r' on (\S+) type ', line) if match: mount_point = match.group(1) # 确保是嵌套挂载点而不是基础挂载点本身 if mount_point.startswith(base_path) and mount_point != base_path: mounts.append(mount_point) else: for line in stdout.splitlines(): mount_point = line.strip() # findmnt --target 已经过滤了,这里只需确保不是空行 if mount_point and mount_point.startswith(base_path) and mount_point != base_path: mounts.append(mount_point) # 排序:从最深层到最浅层,以便正确卸载。路径越长,通常越深层。 mounts.sort(key=lambda x: len(x.split('/')), reverse=True) logger.debug(f"在 {base_path} 下找到的嵌套挂载点(从深到浅): {mounts}") return mounts def _unmount_nested_mounts(self, base_mount_point: str) -> bool: """ 尝试卸载指定基础挂载点下的所有嵌套挂载。 """ nested_mounts = self.get_all_mounts_under_path(base_mount_point) if not nested_mounts: logger.debug(f"在 {base_mount_point} 下没有找到嵌套挂载点。") return True # 没有需要卸载的嵌套挂载 logger.info(f"开始卸载 {base_mount_point} 下的嵌套挂载点: {nested_mounts}") all_nested_unmounted = True for mount_point in nested_mounts: if not self._unmount_target(mount_point): logger.error(f"未能卸载嵌套挂载点 {mount_point}。") all_nested_unmounted = False # 即使一个失败,也尝试卸载其他,但标记整体失败 return all_nested_unmounted def _unmount_target(self, target_path: str) -> bool: """ 尝试卸载一个目标(设备或挂载点)。 先尝试普通卸载,如果失败则尝试强制卸载,再失败则尝试懒惰卸载。 """ logger.info(f"尝试卸载 {target_path}。") success, stdout, stderr = self._run_command( ["umount", target_path], f"卸载 {target_path} 失败", show_dialog=False, expected_non_zero_exit_codes=[1] # umount 返回 1 表示未挂载或忙碌 ) if success: logger.info(f"成功卸载 {target_path}。") return True else: # 检查是否因为未挂载而失败 if "not mounted" in stderr.lower() or "未挂载" in stderr: logger.info(f"目标 {target_path} 未挂载,无需卸载。") return True logger.warning(f"卸载 {target_path} 失败: {stderr}") # 尝试强制卸载 -f logger.info(f"尝试强制卸载 {target_path} (umount -f)。") success_force, stdout_force, stderr_force = self._run_command( ["umount", "-f", target_path], f"强制卸载 {target_path} 失败", show_dialog=False, expected_non_zero_exit_codes=[1] ) if success_force: logger.info(f"成功强制卸载 {target_path}。") return True else: logger.warning(f"强制卸载 {target_path} 失败: {stderr_force}") # 尝试懒惰卸载 -l logger.info(f"尝试懒惰卸载 {target_path} (umount -l)。") success_lazy, stdout_lazy, stderr_lazy = self._run_command( ["umount", "-l", target_path], f"懒惰卸载 {target_path} 失败", show_dialog=False, expected_non_zero_exit_codes=[1] ) if success_lazy: logger.info(f"成功懒惰卸载 {target_path}。") return True else: logger.error(f"懒惰卸载 {target_path} 失败: {stderr_lazy}") return False def _kill_pids(self, pids: List[str]) -> bool: """ 终止指定PID的进程。 """ all_killed = True for pid in pids: logger.info(f"尝试终止进程 {pid}。") kill_success, _, kill_stderr = self._run_command( ["kill", "-9", pid], f"终止进程 {pid} 失败", show_dialog=False # UI 交互在更高层处理 ) if not kill_success: logger.error(f"终止进程 {pid} 失败: {kill_stderr}") all_killed = False else: logger.info(f"成功终止进程 {pid}。") return all_killed def resolve_occupation(self, device_path: str) -> bool: """ 尝试解除设备占用,包括处理嵌套挂载和终止进程。 返回 True 如果成功解除占用,否则返回 False。 """ logger.info(f"开始尝试解除设备 {device_path} 的占用。") # 1. 获取设备的当前主挂载点 main_mount_point = self._get_mountpoint_for_device(device_path) # 2. 如果设备有主挂载点,先尝试卸载所有嵌套挂载 if main_mount_point and main_mount_point != 'N/A' and main_mount_point != '[SWAP]': logger.debug(f"设备 {device_path} 的主挂载点是 {main_mount_point}。") # 尝试卸载嵌套挂载,并循环几次,因为某些挂载可能需要多次尝试 max_unmount_attempts = 3 for attempt in range(max_unmount_attempts): logger.info(f"尝试卸载嵌套挂载点 (第 {attempt + 1} 次尝试)。") if self._unmount_nested_mounts(main_mount_point): logger.info(f"所有嵌套挂载点已成功卸载或已不存在。") break else: logger.warning(f"卸载嵌套挂载点失败 (第 {attempt + 1} 次尝试),可能需要重试。") if attempt == max_unmount_attempts - 1: QMessageBox.critical(None, "错误", f"未能卸载设备 {device_path} 下的所有嵌套挂载点。") return False # 尝试卸载主挂载点本身 logger.info(f"尝试卸载主挂载点 {main_mount_point}。") if self._unmount_target(main_mount_point): QMessageBox.information(None, "成功", f"设备 {device_path} 及其所有挂载点已成功卸载。") return True # 主挂载点已卸载,设备应该已空闲 # 3. 检查设备本身或其(可能已不存在的)主挂载点是否仍被占用 (fuser) # 即使主挂载点已卸载,也可能存在进程直接占用设备文件的情况,或者之前卸载失败。 fuser_targets = [device_path] if main_mount_point and main_mount_point != 'N/A' and main_mount_point != '[SWAP]' and main_mount_point not in fuser_targets: fuser_targets.append(main_mount_point) pids_to_kill = set() kernel_mounts_found = False occupation_info_lines = [] for target in fuser_targets: logger.debug(f"执行 fuser -vm {target} 检查占用。") success_fuser, stdout_fuser, stderr_fuser = self._run_command( ["fuser", "-vm", target], f"检查 {target} 占用失败", show_dialog=False, suppress_critical_dialog_on_stderr_match=( "No such file or directory", "not found", "Usage:", "not mounted" ), expected_non_zero_exit_codes=[1] ) if success_fuser and stdout_fuser: logger.debug(f"fuser -vm {target} 输出:\n{stdout_fuser}") for line in stdout_fuser.splitlines(): occupation_info_lines.append(line.strip()) if "kernel" in line: kernel_mounts_found = True match = re.match(r'^\S+:\s+\S+\s+(?P\d+)\s+.*', line) if match: pids_to_kill.add(match.group('id')) pids_to_kill_list = list(pids_to_kill) if not kernel_mounts_found and not pids_to_kill_list: QMessageBox.information(None, "信息", f"设备 {device_path} 未被任何进程占用。") logger.info(f"设备 {device_path} 未被任何进程占用。") return True # 4. 处理剩余的内核挂载(如果嵌套挂载和主挂载卸载失败,这里会再次捕捉到) if kernel_mounts_found: process_list_str = "\n".join(sorted(list(set(occupation_info_lines)))) reply = QMessageBox.question( None, "设备占用 - 内核挂载", f"设备 {device_path} 仍被内核挂载占用:\n{process_list_str}\n\n您确定要尝试卸载此设备吗?", QMessageBox.Yes | QMessageBox.No, QMessageBox.No ) if reply == QMessageBox.No: logger.info(f"用户取消了卸载设备 {device_path}。") QMessageBox.information(None, "信息", f"已取消卸载设备 {device_path}。") return False logger.info(f"再次尝试卸载设备 {device_path}。") if self._unmount_target(device_path): # 尝试直接卸载设备路径 QMessageBox.information(None, "成功", f"设备 {device_path} 已成功卸载。") return True else: QMessageBox.critical(None, "错误", f"未能卸载设备 {device_path}。") return False # 5. 处理用户进程占用 if pids_to_kill_list: process_list_str = "\n".join(sorted(list(set(occupation_info_lines)))) reply = QMessageBox.question( None, "设备占用 - 进程", f"设备 {device_path} 正在被以下进程占用:\n{process_list_str}\n\n您要强制终止这些进程吗?这可能会导致数据丢失或系统不稳定!", QMessageBox.Yes | QMessageBox.No, QMessageBox.No ) if reply == QMessageBox.No: logger.info(f"用户取消了终止占用设备 {device_path} 的进程。") QMessageBox.information(None, "信息", f"已取消终止占用设备 {device_path} 的进程。") return False if self._kill_pids(pids_to_kill_list): QMessageBox.information(None, "成功", f"已尝试终止占用设备 {device_path} 的所有进程。") # 进程终止后,设备可能仍然被挂载,因此再次尝试卸载。 if main_mount_point and main_mount_point != 'N/A' and main_mount_point != '[SWAP]': logger.info(f"进程终止后,尝试再次卸载主挂载点 {main_mount_point}。") if self._unmount_target(main_mount_point): QMessageBox.information(None, "信息", f"设备 {device_path} 已成功卸载。") return True else: # 如果没有主挂载点,检查设备本身 logger.info(f"进程终止后,尝试再次卸载设备 {device_path}。") if self._unmount_target(device_path): QMessageBox.information(None, "信息", f"设备 {device_path} 已成功卸载。") return True # 如果进程终止后仍未能卸载 QMessageBox.warning(None, "警告", f"进程已终止,但未能成功卸载设备 {device_path}。可能仍有其他问题。") return False # 表示部分成功或需要进一步手动干预 else: QMessageBox.critical(None, "错误", f"未能终止所有占用设备 {device_path} 的进程。") return False logger.warning(f"设备 {device_path} 占用解决逻辑未能完全处理。") return False