first commit

This commit is contained in:
zj
2026-02-07 14:58:56 +08:00
commit 8c53b9c0a0
19 changed files with 4540 additions and 0 deletions

317
occupation_resolver.py Normal file
View File

@@ -0,0 +1,317 @@
# occupation_resolver.py
import logging
import re
from typing import List, Tuple, Callable, Optional
# 假设 QApplication 和 QMessageBox 在主 UI 线程中可用,
# 并且我们在这里直接使用它们进行用户交互。
# 在更严格的架构中UI交互应该通过回调或事件机制与核心逻辑分离。
# from PyQt5.QtWidgets import QMessageBox
from PySide6.QtWidgets import (QApplication, QMainWindow, QTreeWidgetItem,
QMessageBox, QHeaderView, QMenu, QInputDialog, QDialog)
logger = logging.getLogger(__name__)
class OccupationResolver:
"""
负责检查和解除设备占用的类。
处理包括内核挂载、嵌套挂载和用户进程占用等情况。
"""
def __init__(self, shell_executor_func: Callable, mount_info_getter_func: Callable):
"""
初始化设备占用解决器。
Args:
shell_executor_func: 一个可调用对象,用于执行 shell 命令。
期望签名: (cmd: List[str], error_msg: str, show_dialog: bool,
suppress_critical_dialog_on_stderr_match: Tuple,
expected_non_zero_exit_codes: List[int]) -> Tuple[bool, str, str]
mount_info_getter_func: 一个可调用对象,用于获取设备的挂载点。
期望签名: (device_path: str) -> Optional[str]
"""
self._execute_shell_command = shell_executor_func
self._get_mountpoint_for_device = mount_info_getter_func
def _run_command(self, cmd: List[str], error_msg: str, **kwargs) -> Tuple[bool, str, str]:
"""
执行 shell 命令的包装器。
"""
return self._execute_shell_command(cmd, error_msg, **kwargs)
def get_all_mounts_under_path(self, base_path: str) -> List[str]:
"""
获取指定路径下所有嵌套的挂载点。
返回一个挂载点列表,从最深层到最浅层排序。
"""
mounts = []
# 尝试使用 findmnt 命令更可靠地获取嵌套挂载信息
# -l: 列出所有挂载
# -o TARGET: 只输出目标路径
# -n: 不显示标题行
# -r: 原始输出,方便解析
# --target <base_path>: 限制只查找 base_path 下的挂载
# -t noautofs,nosnapfs,nofuse,nocifs,nonfs,notmpfs,nobind: 排除一些不相关的或虚拟的挂载
findmnt_cmd = ["findmnt", "-l", "-o", "TARGET", "-n", "-r", "--target", base_path,
"-t", "noautofs,nosnapfs,nofuse,nocifs,nonfs,notmpfs,nobind"]
success, stdout, stderr = self._run_command(findmnt_cmd, f"获取 {base_path} 下的挂载信息失败", show_dialog=False)
if not success:
logger.warning(f"findmnt 命令失败,尝试解析 'mount' 命令输出: {stderr}")
# 如果 findmnt 失败或不可用,则回退到解析 'mount' 命令输出
success, stdout, stderr = self._run_command(["mount"], "获取挂载信息失败", show_dialog=False)
if not success:
logger.error(f"获取挂载信息失败: {stderr}")
return []
for line in stdout.splitlines():
match = re.search(r' on (\S+) type ', line)
if match:
mount_point = match.group(1)
# 确保是嵌套挂载点而不是基础挂载点本身
if mount_point.startswith(base_path) and mount_point != base_path:
mounts.append(mount_point)
else:
for line in stdout.splitlines():
mount_point = line.strip()
# findmnt --target 已经过滤了,这里只需确保不是空行
if mount_point and mount_point.startswith(base_path) and mount_point != base_path:
mounts.append(mount_point)
# 排序:从最深层到最浅层,以便正确卸载。路径越长,通常越深层。
mounts.sort(key=lambda x: len(x.split('/')), reverse=True)
logger.debug(f"{base_path} 下找到的嵌套挂载点(从深到浅): {mounts}")
return mounts
def _unmount_nested_mounts(self, base_mount_point: str) -> bool:
"""
尝试卸载指定基础挂载点下的所有嵌套挂载。
"""
nested_mounts = self.get_all_mounts_under_path(base_mount_point)
if not nested_mounts:
logger.debug(f"{base_mount_point} 下没有找到嵌套挂载点。")
return True # 没有需要卸载的嵌套挂载
logger.info(f"开始卸载 {base_mount_point} 下的嵌套挂载点: {nested_mounts}")
all_nested_unmounted = True
for mount_point in nested_mounts:
if not self._unmount_target(mount_point):
logger.error(f"未能卸载嵌套挂载点 {mount_point}")
all_nested_unmounted = False
# 即使一个失败,也尝试卸载其他,但标记整体失败
return all_nested_unmounted
def _unmount_target(self, target_path: str) -> bool:
"""
尝试卸载一个目标(设备或挂载点)。
先尝试普通卸载,如果失败则尝试强制卸载,再失败则尝试懒惰卸载。
"""
logger.info(f"尝试卸载 {target_path}")
success, stdout, stderr = self._run_command(
["umount", target_path],
f"卸载 {target_path} 失败",
show_dialog=False,
expected_non_zero_exit_codes=[1] # umount 返回 1 表示未挂载或忙碌
)
if success:
logger.info(f"成功卸载 {target_path}")
return True
else:
# 检查是否因为未挂载而失败
if "not mounted" in stderr.lower() or "未挂载" in stderr:
logger.info(f"目标 {target_path} 未挂载,无需卸载。")
return True
logger.warning(f"卸载 {target_path} 失败: {stderr}")
# 尝试强制卸载 -f
logger.info(f"尝试强制卸载 {target_path} (umount -f)。")
success_force, stdout_force, stderr_force = self._run_command(
["umount", "-f", target_path],
f"强制卸载 {target_path} 失败",
show_dialog=False,
expected_non_zero_exit_codes=[1]
)
if success_force:
logger.info(f"成功强制卸载 {target_path}")
return True
else:
logger.warning(f"强制卸载 {target_path} 失败: {stderr_force}")
# 尝试懒惰卸载 -l
logger.info(f"尝试懒惰卸载 {target_path} (umount -l)。")
success_lazy, stdout_lazy, stderr_lazy = self._run_command(
["umount", "-l", target_path],
f"懒惰卸载 {target_path} 失败",
show_dialog=False,
expected_non_zero_exit_codes=[1]
)
if success_lazy:
logger.info(f"成功懒惰卸载 {target_path}")
return True
else:
logger.error(f"懒惰卸载 {target_path} 失败: {stderr_lazy}")
return False
def _kill_pids(self, pids: List[str]) -> bool:
"""
终止指定PID的进程。
"""
all_killed = True
for pid in pids:
logger.info(f"尝试终止进程 {pid}")
kill_success, _, kill_stderr = self._run_command(
["kill", "-9", pid],
f"终止进程 {pid} 失败",
show_dialog=False # UI 交互在更高层处理
)
if not kill_success:
logger.error(f"终止进程 {pid} 失败: {kill_stderr}")
all_killed = False
else:
logger.info(f"成功终止进程 {pid}")
return all_killed
def resolve_occupation(self, device_path: str) -> bool:
"""
尝试解除设备占用,包括处理嵌套挂载和终止进程。
返回 True 如果成功解除占用,否则返回 False。
"""
logger.info(f"开始尝试解除设备 {device_path} 的占用。")
# 1. 获取设备的当前主挂载点
main_mount_point = self._get_mountpoint_for_device(device_path)
# 2. 如果设备有主挂载点,先尝试卸载所有嵌套挂载
if main_mount_point and main_mount_point != 'N/A' and main_mount_point != '[SWAP]':
logger.debug(f"设备 {device_path} 的主挂载点是 {main_mount_point}")
max_unmount_attempts = 3
for attempt in range(max_unmount_attempts):
logger.info(f"尝试卸载嵌套挂载点 (第 {attempt + 1} 次尝试)。")
if self._unmount_nested_mounts(main_mount_point):
logger.info(f"所有嵌套挂载点已成功卸载或已不存在。")
break
else:
logger.warning(f"卸载嵌套挂载点失败 (第 {attempt + 1} 次尝试),可能需要重试。")
if attempt == max_unmount_attempts - 1:
QMessageBox.critical(None, "错误", f"未能卸载设备 {device_path} 下的所有嵌套挂载点。")
return False
# 尝试卸载主挂载点本身
logger.info(f"尝试卸载主挂载点 {main_mount_point}")
if self._unmount_target(main_mount_point):
QMessageBox.information(None, "成功", f"设备 {device_path} 及其所有挂载点已成功卸载。")
return True # 主挂载点已卸载,设备应该已空闲
# 3. 检查设备本身或其(可能已不存在的)主挂载点是否仍被占用 (fuser)
fuser_targets = [device_path]
if main_mount_point and main_mount_point != 'N/A' and main_mount_point != '[SWAP]' and main_mount_point not in fuser_targets:
fuser_targets.append(main_mount_point)
pids_to_kill = set()
kernel_mounts_found = False
occupation_info_lines = []
for target in fuser_targets:
logger.debug(f"执行 fuser -vm {target} 检查占用。")
success_fuser, stdout_fuser, stderr_fuser = self._run_command(
["fuser", "-vm", target],
f"检查 {target} 占用失败",
show_dialog=False,
suppress_critical_dialog_on_stderr_match=(
"No such file or directory", "not found", "Usage:", "not mounted"
),
expected_non_zero_exit_codes=[1]
)
if success_fuser and stdout_fuser:
logger.debug(f"fuser -vm {target} 输出:\n{stdout_fuser}")
for line in stdout_fuser.splitlines():
line = line.strip() # 清除行首尾空白
if not line: # 跳过空行
continue
# 收集所有非空行用于最终显示
occupation_info_lines.append(line)
# 检查是否包含 "kernel" 关键字
if "kernel" in line:
kernel_mounts_found = True
# 尝试从详细格式的行中提取 PID (例如: /dev/sda2: root 42032 .rce. gpg-agent)
match_verbose = re.match(r'^\S+:\s+\S+\s+(?P<id>\d+)\s+.*', line)
if match_verbose:
pids_to_kill.add(match_verbose.group('id'))
else:
# 如果不是详细格式,检查是否是纯数字 PID (例如: 42032)
# 这种情况下fuser -vm 可能只输出了 PID
if line.isdigit():
pids_to_kill.add(line)
pids_to_kill_list = list(pids_to_kill)
if not kernel_mounts_found and not pids_to_kill_list:
QMessageBox.information(None, "信息", f"设备 {device_path} 未被任何进程占用。")
logger.info(f"设备 {device_path} 未被任何进程占用。")
return True
# 4. 处理剩余的内核挂载(如果嵌套挂载和主挂载卸载失败,这里会再次捕捉到)
if kernel_mounts_found:
process_list_str = "\n".join(sorted(list(set(occupation_info_lines))))
reply = QMessageBox.question(
None,
"设备占用 - 内核挂载",
f"设备 {device_path} 仍被内核挂载占用:\n{process_list_str}\n\n您确定要尝试卸载此设备吗?",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.No:
logger.info(f"用户取消了卸载设备 {device_path}")
QMessageBox.information(None, "信息", f"已取消卸载设备 {device_path}")
return False
logger.info(f"再次尝试卸载设备 {device_path}")
if self._unmount_target(device_path): # 尝试直接卸载设备路径
QMessageBox.information(None, "成功", f"设备 {device_path} 已成功卸载。")
return True
else:
QMessageBox.critical(None, "错误", f"未能卸载设备 {device_path}")
return False
# 5. 处理用户进程占用
if pids_to_kill_list:
process_list_str = "\n".join(sorted(list(set(occupation_info_lines))))
reply = QMessageBox.question(
None,
"设备占用 - 进程",
f"设备 {device_path} 正在被以下进程占用:\n{process_list_str}\n\n您要强制终止这些进程吗?这可能会导致数据丢失或系统不稳定!",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.No:
logger.info(f"用户取消了终止占用设备 {device_path} 的进程。")
QMessageBox.information(None, "信息", f"已取消终止占用设备 {device_path} 的进程。")
return False
if self._kill_pids(pids_to_kill_list):
QMessageBox.information(None, "成功", f"已尝试终止占用设备 {device_path} 的所有进程。")
# 进程终止后,设备可能仍然被挂载,因此再次尝试卸载。
if main_mount_point and main_mount_point != 'N/A' and main_mount_point != '[SWAP]':
logger.info(f"进程终止后,尝试再次卸载主挂载点 {main_mount_point}")
if self._unmount_target(main_mount_point):
QMessageBox.information(None, "信息", f"设备 {device_path} 已成功卸载。")
return True
else: # 如果没有主挂载点,检查设备本身
logger.info(f"进程终止后,尝试再次卸载设备 {device_path}")
if self._unmount_target(device_path):
QMessageBox.information(None, "信息", f"设备 {device_path} 已成功卸载。")
return True
# 如果进程终止后仍未能卸载
QMessageBox.warning(None, "警告", f"进程已终止,但未能成功卸载设备 {device_path}。可能仍有其他问题。")
return False # 表示部分成功或需要进一步手动干预
else:
QMessageBox.critical(None, "错误", f"未能终止所有占用设备 {device_path} 的进程。")
return False
logger.warning(f"设备 {device_path} 占用解决逻辑未能完全处理。")
return False