Files
diskmanager/occupation_resolver.py
2026-02-05 03:12:45 +08:00

318 lines
16 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# occupation_resolver.py
import logging
import re
from typing import List, Tuple, Callable, Optional
# 假设 QApplication 和 QMessageBox 在主 UI 线程中可用,
# 并且我们在这里直接使用它们进行用户交互。
# 在更严格的架构中UI交互应该通过回调或事件机制与核心逻辑分离。
# from PyQt5.QtWidgets import QMessageBox
from PySide6.QtWidgets import (QApplication, QMainWindow, QTreeWidgetItem,
QMessageBox, QHeaderView, QMenu, QInputDialog, QDialog)
logger = logging.getLogger(__name__)
class OccupationResolver:
"""
负责检查和解除设备占用的类。
处理包括内核挂载、嵌套挂载和用户进程占用等情况。
"""
def __init__(self, shell_executor_func: Callable, mount_info_getter_func: Callable):
"""
初始化设备占用解决器。
Args:
shell_executor_func: 一个可调用对象,用于执行 shell 命令。
期望签名: (cmd: List[str], error_msg: str, show_dialog: bool,
suppress_critical_dialog_on_stderr_match: Tuple,
expected_non_zero_exit_codes: List[int]) -> Tuple[bool, str, str]
mount_info_getter_func: 一个可调用对象,用于获取设备的挂载点。
期望签名: (device_path: str) -> Optional[str]
"""
self._execute_shell_command = shell_executor_func
self._get_mountpoint_for_device = mount_info_getter_func
def _run_command(self, cmd: List[str], error_msg: str, **kwargs) -> Tuple[bool, str, str]:
"""
执行 shell 命令的包装器。
"""
return self._execute_shell_command(cmd, error_msg, **kwargs)
def get_all_mounts_under_path(self, base_path: str) -> List[str]:
"""
获取指定路径下所有嵌套的挂载点。
返回一个挂载点列表,从最深层到最浅层排序。
"""
mounts = []
# 尝试使用 findmnt 命令更可靠地获取嵌套挂载信息
# -l: 列出所有挂载
# -o TARGET: 只输出目标路径
# -n: 不显示标题行
# -r: 原始输出,方便解析
# --target <base_path>: 限制只查找 base_path 下的挂载
# -t noautofs,nosnapfs,nofuse,nocifs,nonfs,notmpfs,nobind: 排除一些不相关的或虚拟的挂载
findmnt_cmd = ["findmnt", "-l", "-o", "TARGET", "-n", "-r", "--target", base_path,
"-t", "noautofs,nosnapfs,nofuse,nocifs,nonfs,notmpfs,nobind"]
success, stdout, stderr = self._run_command(findmnt_cmd, f"获取 {base_path} 下的挂载信息失败", show_dialog=False)
if not success:
logger.warning(f"findmnt 命令失败,尝试解析 'mount' 命令输出: {stderr}")
# 如果 findmnt 失败或不可用,则回退到解析 'mount' 命令输出
success, stdout, stderr = self._run_command(["mount"], "获取挂载信息失败", show_dialog=False)
if not success:
logger.error(f"获取挂载信息失败: {stderr}")
return []
for line in stdout.splitlines():
match = re.search(r' on (\S+) type ', line)
if match:
mount_point = match.group(1)
# 确保是嵌套挂载点而不是基础挂载点本身
if mount_point.startswith(base_path) and mount_point != base_path:
mounts.append(mount_point)
else:
for line in stdout.splitlines():
mount_point = line.strip()
# findmnt --target 已经过滤了,这里只需确保不是空行
if mount_point and mount_point.startswith(base_path) and mount_point != base_path:
mounts.append(mount_point)
# 排序:从最深层到最浅层,以便正确卸载。路径越长,通常越深层。
mounts.sort(key=lambda x: len(x.split('/')), reverse=True)
logger.debug(f"{base_path} 下找到的嵌套挂载点(从深到浅): {mounts}")
return mounts
def _unmount_nested_mounts(self, base_mount_point: str) -> bool:
"""
尝试卸载指定基础挂载点下的所有嵌套挂载。
"""
nested_mounts = self.get_all_mounts_under_path(base_mount_point)
if not nested_mounts:
logger.debug(f"{base_mount_point} 下没有找到嵌套挂载点。")
return True # 没有需要卸载的嵌套挂载
logger.info(f"开始卸载 {base_mount_point} 下的嵌套挂载点: {nested_mounts}")
all_nested_unmounted = True
for mount_point in nested_mounts:
if not self._unmount_target(mount_point):
logger.error(f"未能卸载嵌套挂载点 {mount_point}")
all_nested_unmounted = False
# 即使一个失败,也尝试卸载其他,但标记整体失败
return all_nested_unmounted
def _unmount_target(self, target_path: str) -> bool:
"""
尝试卸载一个目标(设备或挂载点)。
先尝试普通卸载,如果失败则尝试强制卸载,再失败则尝试懒惰卸载。
"""
logger.info(f"尝试卸载 {target_path}")
success, stdout, stderr = self._run_command(
["umount", target_path],
f"卸载 {target_path} 失败",
show_dialog=False,
expected_non_zero_exit_codes=[1] # umount 返回 1 表示未挂载或忙碌
)
if success:
logger.info(f"成功卸载 {target_path}")
return True
else:
# 检查是否因为未挂载而失败
if "not mounted" in stderr.lower() or "未挂载" in stderr:
logger.info(f"目标 {target_path} 未挂载,无需卸载。")
return True
logger.warning(f"卸载 {target_path} 失败: {stderr}")
# 尝试强制卸载 -f
logger.info(f"尝试强制卸载 {target_path} (umount -f)。")
success_force, stdout_force, stderr_force = self._run_command(
["umount", "-f", target_path],
f"强制卸载 {target_path} 失败",
show_dialog=False,
expected_non_zero_exit_codes=[1]
)
if success_force:
logger.info(f"成功强制卸载 {target_path}")
return True
else:
logger.warning(f"强制卸载 {target_path} 失败: {stderr_force}")
# 尝试懒惰卸载 -l
logger.info(f"尝试懒惰卸载 {target_path} (umount -l)。")
success_lazy, stdout_lazy, stderr_lazy = self._run_command(
["umount", "-l", target_path],
f"懒惰卸载 {target_path} 失败",
show_dialog=False,
expected_non_zero_exit_codes=[1]
)
if success_lazy:
logger.info(f"成功懒惰卸载 {target_path}")
return True
else:
logger.error(f"懒惰卸载 {target_path} 失败: {stderr_lazy}")
return False
def _kill_pids(self, pids: List[str]) -> bool:
"""
终止指定PID的进程。
"""
all_killed = True
for pid in pids:
logger.info(f"尝试终止进程 {pid}")
kill_success, _, kill_stderr = self._run_command(
["kill", "-9", pid],
f"终止进程 {pid} 失败",
show_dialog=False # UI 交互在更高层处理
)
if not kill_success:
logger.error(f"终止进程 {pid} 失败: {kill_stderr}")
all_killed = False
else:
logger.info(f"成功终止进程 {pid}")
return all_killed
def resolve_occupation(self, device_path: str) -> bool:
"""
尝试解除设备占用,包括处理嵌套挂载和终止进程。
返回 True 如果成功解除占用,否则返回 False。
"""
logger.info(f"开始尝试解除设备 {device_path} 的占用。")
# 1. 获取设备的当前主挂载点
main_mount_point = self._get_mountpoint_for_device(device_path)
# 2. 如果设备有主挂载点,先尝试卸载所有嵌套挂载
if main_mount_point and main_mount_point != 'N/A' and main_mount_point != '[SWAP]':
logger.debug(f"设备 {device_path} 的主挂载点是 {main_mount_point}")
max_unmount_attempts = 3
for attempt in range(max_unmount_attempts):
logger.info(f"尝试卸载嵌套挂载点 (第 {attempt + 1} 次尝试)。")
if self._unmount_nested_mounts(main_mount_point):
logger.info(f"所有嵌套挂载点已成功卸载或已不存在。")
break
else:
logger.warning(f"卸载嵌套挂载点失败 (第 {attempt + 1} 次尝试),可能需要重试。")
if attempt == max_unmount_attempts - 1:
QMessageBox.critical(None, "错误", f"未能卸载设备 {device_path} 下的所有嵌套挂载点。")
return False
# 尝试卸载主挂载点本身
logger.info(f"尝试卸载主挂载点 {main_mount_point}")
if self._unmount_target(main_mount_point):
QMessageBox.information(None, "成功", f"设备 {device_path} 及其所有挂载点已成功卸载。")
return True # 主挂载点已卸载,设备应该已空闲
# 3. 检查设备本身或其(可能已不存在的)主挂载点是否仍被占用 (fuser)
fuser_targets = [device_path]
if main_mount_point and main_mount_point != 'N/A' and main_mount_point != '[SWAP]' and main_mount_point not in fuser_targets:
fuser_targets.append(main_mount_point)
pids_to_kill = set()
kernel_mounts_found = False
occupation_info_lines = []
for target in fuser_targets:
logger.debug(f"执行 fuser -vm {target} 检查占用。")
success_fuser, stdout_fuser, stderr_fuser = self._run_command(
["fuser", "-vm", target],
f"检查 {target} 占用失败",
show_dialog=False,
suppress_critical_dialog_on_stderr_match=(
"No such file or directory", "not found", "Usage:", "not mounted"
),
expected_non_zero_exit_codes=[1]
)
if success_fuser and stdout_fuser:
logger.debug(f"fuser -vm {target} 输出:\n{stdout_fuser}")
for line in stdout_fuser.splitlines():
line = line.strip() # 清除行首尾空白
if not line: # 跳过空行
continue
# 收集所有非空行用于最终显示
occupation_info_lines.append(line)
# 检查是否包含 "kernel" 关键字
if "kernel" in line:
kernel_mounts_found = True
# 尝试从详细格式的行中提取 PID (例如: /dev/sda2: root 42032 .rce. gpg-agent)
match_verbose = re.match(r'^\S+:\s+\S+\s+(?P<id>\d+)\s+.*', line)
if match_verbose:
pids_to_kill.add(match_verbose.group('id'))
else:
# 如果不是详细格式,检查是否是纯数字 PID (例如: 42032)
# 这种情况下fuser -vm 可能只输出了 PID
if line.isdigit():
pids_to_kill.add(line)
pids_to_kill_list = list(pids_to_kill)
if not kernel_mounts_found and not pids_to_kill_list:
QMessageBox.information(None, "信息", f"设备 {device_path} 未被任何进程占用。")
logger.info(f"设备 {device_path} 未被任何进程占用。")
return True
# 4. 处理剩余的内核挂载(如果嵌套挂载和主挂载卸载失败,这里会再次捕捉到)
if kernel_mounts_found:
process_list_str = "\n".join(sorted(list(set(occupation_info_lines))))
reply = QMessageBox.question(
None,
"设备占用 - 内核挂载",
f"设备 {device_path} 仍被内核挂载占用:\n{process_list_str}\n\n您确定要尝试卸载此设备吗?",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.No:
logger.info(f"用户取消了卸载设备 {device_path}")
QMessageBox.information(None, "信息", f"已取消卸载设备 {device_path}")
return False
logger.info(f"再次尝试卸载设备 {device_path}")
if self._unmount_target(device_path): # 尝试直接卸载设备路径
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功卸载。")
return True
else:
QMessageBox.critical(None, "错误", f"未能卸载设备 {device_path}")
return False
# 5. 处理用户进程占用
if pids_to_kill_list:
process_list_str = "\n".join(sorted(list(set(occupation_info_lines))))
reply = QMessageBox.question(
None,
"设备占用 - 进程",
f"设备 {device_path} 正在被以下进程占用:\n{process_list_str}\n\n您要强制终止这些进程吗?这可能会导致数据丢失或系统不稳定!",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.No:
logger.info(f"用户取消了终止占用设备 {device_path} 的进程。")
QMessageBox.information(None, "信息", f"已取消终止占用设备 {device_path} 的进程。")
return False
if self._kill_pids(pids_to_kill_list):
QMessageBox.information(None, "成功", f"已尝试终止占用设备 {device_path} 的所有进程。")
# 进程终止后,设备可能仍然被挂载,因此再次尝试卸载。
if main_mount_point and main_mount_point != 'N/A' and main_mount_point != '[SWAP]':
logger.info(f"进程终止后,尝试再次卸载主挂载点 {main_mount_point}")
if self._unmount_target(main_mount_point):
QMessageBox.information(None, "信息", f"设备 {device_path} 已成功卸载。")
return True
else: # 如果没有主挂载点,检查设备本身
logger.info(f"进程终止后,尝试再次卸载设备 {device_path}")
if self._unmount_target(device_path):
QMessageBox.information(None, "信息", f"设备 {device_path} 已成功卸载。")
return True
# 如果进程终止后仍未能卸载
QMessageBox.warning(None, "警告", f"进程已终止,但未能成功卸载设备 {device_path}。可能仍有其他问题。")
return False # 表示部分成功或需要进一步手动干预
else:
QMessageBox.critical(None, "错误", f"未能终止所有占用设备 {device_path} 的进程。")
return False
logger.warning(f"设备 {device_path} 占用解决逻辑未能完全处理。")
return False