Files
diskmanager2/occupation_resolver_tkinter.py
2026-02-10 02:54:13 +08:00

180 lines
8.1 KiB
Python

# occupation_resolver_tkinter.py
import logging
import subprocess
import os
from tkinter import messagebox
logger = logging.getLogger(__name__)
class OccupationResolver:
"""设备占用解除类 - Tkinter 版本"""
def __init__(self, shell_executor_func, mount_info_getter_func):
self.shell_executor_func = shell_executor_func
self.mount_info_getter_func = mount_info_getter_func
def resolve_occupation(self, device_path: str) -> bool:
"""处理设备占用问题"""
logger.info(f"开始处理设备 {device_path} 的占用问题。")
# 1. 获取挂载点
mount_point = self.mount_info_getter_func(device_path)
# 2. 如果设备已挂载,尝试卸载
if mount_point:
logger.info(f"设备 {device_path} 已挂载在 {mount_point}。检查是否有嵌套挂载...")
# 卸载嵌套挂载点
if not self._unmount_nested_mounts(mount_point):
messagebox.showerror("错误", f"未能卸载设备 {device_path} 下的所有嵌套挂载点。")
# 嵌套挂载卸载失败,可能是被进程占用,继续检查进程
else:
# 嵌套挂载卸载成功,尝试卸载设备本身
if self._unmount_device(device_path):
messagebox.showinfo("成功", f"设备 {device_path} 及其所有挂载点已成功卸载。")
return True
# 设备卸载失败,可能是被进程占用,继续检查进程
logger.warning(f"设备 {device_path} 卸载失败,可能存在进程占用。")
else:
logger.info(f"设备 {device_path} 当前未挂载。")
# 3. 检查是否有进程占用设备(无论是否已挂载,都检查进程占用)
if self._is_device_busy(device_path):
return self._kill_processes_using_device(device_path)
# 4. 如果之前有挂载点但卸载失败,且没有进程占用,说明是其他原因
if mount_point:
messagebox.showwarning("警告",
f"设备 {device_path} 卸载失败,但未检测到进程占用。\n"
f"可能是其他原因导致,请检查日志。")
return False
# 5. 无任何占用
messagebox.showinfo("信息", f"设备 {device_path} 未被任何进程占用。")
return True
def _unmount_nested_mounts(self, mount_point: str) -> bool:
"""卸载指定挂载点下的所有嵌套挂载"""
logger.info(f"尝试卸载 {mount_point} 下的所有嵌套挂载点。")
try:
with open('/proc/mounts', 'r') as f:
mounts = f.readlines()
nested_mounts = []
for line in mounts:
parts = line.split()
if len(parts) >= 2:
mp = parts[1]
if mp.startswith(mount_point + '/') or mp == mount_point:
nested_mounts.append(mp)
# 按深度排序,先卸载深层挂载
nested_mounts.sort(key=lambda x: x.count('/'), reverse=True)
for nm in nested_mounts:
logger.info(f"尝试卸载嵌套挂载点: {nm}")
success, _, _ = self.shell_executor_func(
["umount", nm],
f"卸载嵌套挂载点 {nm} 失败",
show_dialog=False
)
if not success:
logger.warning(f"卸载嵌套挂载点 {nm} 失败,但将继续尝试其他挂载点。")
return True
except Exception as e:
logger.error(f"卸载嵌套挂载点时发生错误: {e}")
return False
def _unmount_device(self, device_path: str) -> bool:
"""卸载设备"""
logger.info(f"尝试卸载设备 {device_path}")
success, _, stderr = self.shell_executor_func(
["umount", device_path],
f"卸载设备 {device_path} 失败",
show_dialog=False
)
if success:
logger.info(f"设备 {device_path} 已成功卸载。")
return True
else:
# 检查是否是因为设备已经未挂载
if "未挂载" in stderr or "not mounted" in stderr.lower():
logger.info(f"设备 {device_path} 已经处于未挂载状态。")
return True
logger.error(f"卸载设备 {device_path} 失败: {stderr}")
return False
def _is_device_busy(self, device_path: str) -> bool:
"""检查设备是否被进程占用"""
logger.info(f"检查设备 {device_path} 是否被进程占用。")
# 尝试使用 lsof
try:
result = subprocess.run(
["sudo", "lsof", device_path],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8', timeout=10
)
if result.returncode == 0 and result.stdout.strip():
logger.info(f"设备 {device_path} 被以下进程占用:\n{result.stdout}")
return True
except Exception as e:
logger.debug(f"lsof 检查失败: {e}")
# 尝试使用 fuser
try:
result = subprocess.run(
["sudo", "fuser", device_path],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8', timeout=10
)
if result.returncode == 0 and result.stdout.strip():
logger.info(f"设备 {device_path} 被以下进程占用: {result.stdout.strip()}")
return True
except Exception as e:
logger.debug(f"fuser 检查失败: {e}")
return False
def _kill_processes_using_device(self, device_path: str) -> bool:
"""终止占用设备的进程"""
logger.info(f"尝试终止占用设备 {device_path} 的进程。")
if messagebox.askyesno("确认终止进程",
f"设备 {device_path} 当前被进程占用。\n"
"是否终止这些进程?",
default=messagebox.NO):
try:
# 使用 fuser -k 终止进程
result = subprocess.run(
["sudo", "fuser", "-k", device_path],
stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8', timeout=10
)
if result.returncode == 0 or result.returncode == 1: # 1 表示没有进程被终止
messagebox.showinfo("成功", f"已尝试终止占用设备 {device_path} 的所有进程。")
# 再次尝试卸载
if self._unmount_device(device_path):
messagebox.showinfo("成功", f"设备 {device_path} 已成功卸载。")
return True
else:
# 设备可能已经没有挂载
mount_point = self.mount_info_getter_func(device_path)
if not mount_point:
messagebox.showinfo("成功", f"设备 {device_path} 已成功卸载。")
return True
messagebox.showwarning("警告", f"进程已终止,但未能成功卸载设备 {device_path}。可能仍有其他问题。")
return False
else:
messagebox.showerror("错误", f"未能终止占用设备 {device_path} 的进程。")
return False
except Exception as e:
logger.error(f"终止进程时发生错误: {e}")
messagebox.showerror("错误", f"终止进程时发生错误: {e}")
return False
else:
messagebox.showinfo("信息", f"已取消终止占用设备 {device_path} 的进程。")
return False