180 lines
8.1 KiB
Python
180 lines
8.1 KiB
Python
# occupation_resolver_tkinter.py
|
|
import logging
|
|
import subprocess
|
|
import os
|
|
from tkinter import messagebox
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
class OccupationResolver:
|
|
"""设备占用解除类 - Tkinter 版本"""
|
|
|
|
def __init__(self, shell_executor_func, mount_info_getter_func):
|
|
self.shell_executor_func = shell_executor_func
|
|
self.mount_info_getter_func = mount_info_getter_func
|
|
|
|
def resolve_occupation(self, device_path: str) -> bool:
|
|
"""处理设备占用问题"""
|
|
logger.info(f"开始处理设备 {device_path} 的占用问题。")
|
|
|
|
# 1. 获取挂载点
|
|
mount_point = self.mount_info_getter_func(device_path)
|
|
|
|
# 2. 如果设备已挂载,尝试卸载
|
|
if mount_point:
|
|
logger.info(f"设备 {device_path} 已挂载在 {mount_point}。检查是否有嵌套挂载...")
|
|
|
|
# 卸载嵌套挂载点
|
|
if not self._unmount_nested_mounts(mount_point):
|
|
messagebox.showerror("错误", f"未能卸载设备 {device_path} 下的所有嵌套挂载点。")
|
|
# 嵌套挂载卸载失败,可能是被进程占用,继续检查进程
|
|
else:
|
|
# 嵌套挂载卸载成功,尝试卸载设备本身
|
|
if self._unmount_device(device_path):
|
|
messagebox.showinfo("成功", f"设备 {device_path} 及其所有挂载点已成功卸载。")
|
|
return True
|
|
# 设备卸载失败,可能是被进程占用,继续检查进程
|
|
logger.warning(f"设备 {device_path} 卸载失败,可能存在进程占用。")
|
|
else:
|
|
logger.info(f"设备 {device_path} 当前未挂载。")
|
|
|
|
# 3. 检查是否有进程占用设备(无论是否已挂载,都检查进程占用)
|
|
if self._is_device_busy(device_path):
|
|
return self._kill_processes_using_device(device_path)
|
|
|
|
# 4. 如果之前有挂载点但卸载失败,且没有进程占用,说明是其他原因
|
|
if mount_point:
|
|
messagebox.showwarning("警告",
|
|
f"设备 {device_path} 卸载失败,但未检测到进程占用。\n"
|
|
f"可能是其他原因导致,请检查日志。")
|
|
return False
|
|
|
|
# 5. 无任何占用
|
|
messagebox.showinfo("信息", f"设备 {device_path} 未被任何进程占用。")
|
|
return True
|
|
|
|
def _unmount_nested_mounts(self, mount_point: str) -> bool:
|
|
"""卸载指定挂载点下的所有嵌套挂载"""
|
|
logger.info(f"尝试卸载 {mount_point} 下的所有嵌套挂载点。")
|
|
|
|
try:
|
|
with open('/proc/mounts', 'r') as f:
|
|
mounts = f.readlines()
|
|
|
|
nested_mounts = []
|
|
for line in mounts:
|
|
parts = line.split()
|
|
if len(parts) >= 2:
|
|
mp = parts[1]
|
|
if mp.startswith(mount_point + '/') or mp == mount_point:
|
|
nested_mounts.append(mp)
|
|
|
|
# 按深度排序,先卸载深层挂载
|
|
nested_mounts.sort(key=lambda x: x.count('/'), reverse=True)
|
|
|
|
for nm in nested_mounts:
|
|
logger.info(f"尝试卸载嵌套挂载点: {nm}")
|
|
success, _, _ = self.shell_executor_func(
|
|
["umount", nm],
|
|
f"卸载嵌套挂载点 {nm} 失败",
|
|
show_dialog=False
|
|
)
|
|
if not success:
|
|
logger.warning(f"卸载嵌套挂载点 {nm} 失败,但将继续尝试其他挂载点。")
|
|
|
|
return True
|
|
except Exception as e:
|
|
logger.error(f"卸载嵌套挂载点时发生错误: {e}")
|
|
return False
|
|
|
|
def _unmount_device(self, device_path: str) -> bool:
|
|
"""卸载设备"""
|
|
logger.info(f"尝试卸载设备 {device_path}。")
|
|
success, _, stderr = self.shell_executor_func(
|
|
["umount", device_path],
|
|
f"卸载设备 {device_path} 失败",
|
|
show_dialog=False
|
|
)
|
|
if success:
|
|
logger.info(f"设备 {device_path} 已成功卸载。")
|
|
return True
|
|
else:
|
|
# 检查是否是因为设备已经未挂载
|
|
if "未挂载" in stderr or "not mounted" in stderr.lower():
|
|
logger.info(f"设备 {device_path} 已经处于未挂载状态。")
|
|
return True
|
|
logger.error(f"卸载设备 {device_path} 失败: {stderr}")
|
|
return False
|
|
|
|
def _is_device_busy(self, device_path: str) -> bool:
|
|
"""检查设备是否被进程占用"""
|
|
logger.info(f"检查设备 {device_path} 是否被进程占用。")
|
|
|
|
# 尝试使用 lsof
|
|
try:
|
|
result = subprocess.run(
|
|
["sudo", "lsof", device_path],
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8', timeout=10
|
|
)
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
logger.info(f"设备 {device_path} 被以下进程占用:\n{result.stdout}")
|
|
return True
|
|
except Exception as e:
|
|
logger.debug(f"lsof 检查失败: {e}")
|
|
|
|
# 尝试使用 fuser
|
|
try:
|
|
result = subprocess.run(
|
|
["sudo", "fuser", device_path],
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8', timeout=10
|
|
)
|
|
if result.returncode == 0 and result.stdout.strip():
|
|
logger.info(f"设备 {device_path} 被以下进程占用: {result.stdout.strip()}")
|
|
return True
|
|
except Exception as e:
|
|
logger.debug(f"fuser 检查失败: {e}")
|
|
|
|
return False
|
|
|
|
def _kill_processes_using_device(self, device_path: str) -> bool:
|
|
"""终止占用设备的进程"""
|
|
logger.info(f"尝试终止占用设备 {device_path} 的进程。")
|
|
|
|
if messagebox.askyesno("确认终止进程",
|
|
f"设备 {device_path} 当前被进程占用。\n"
|
|
"是否终止这些进程?",
|
|
default=messagebox.NO):
|
|
try:
|
|
# 使用 fuser -k 终止进程
|
|
result = subprocess.run(
|
|
["sudo", "fuser", "-k", device_path],
|
|
stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8', timeout=10
|
|
)
|
|
|
|
if result.returncode == 0 or result.returncode == 1: # 1 表示没有进程被终止
|
|
messagebox.showinfo("成功", f"已尝试终止占用设备 {device_path} 的所有进程。")
|
|
|
|
# 再次尝试卸载
|
|
if self._unmount_device(device_path):
|
|
messagebox.showinfo("成功", f"设备 {device_path} 已成功卸载。")
|
|
return True
|
|
else:
|
|
# 设备可能已经没有挂载
|
|
mount_point = self.mount_info_getter_func(device_path)
|
|
if not mount_point:
|
|
messagebox.showinfo("成功", f"设备 {device_path} 已成功卸载。")
|
|
return True
|
|
messagebox.showwarning("警告", f"进程已终止,但未能成功卸载设备 {device_path}。可能仍有其他问题。")
|
|
return False
|
|
else:
|
|
messagebox.showerror("错误", f"未能终止占用设备 {device_path} 的进程。")
|
|
return False
|
|
except Exception as e:
|
|
logger.error(f"终止进程时发生错误: {e}")
|
|
messagebox.showerror("错误", f"终止进程时发生错误: {e}")
|
|
return False
|
|
else:
|
|
messagebox.showinfo("信息", f"已取消终止占用设备 {device_path} 的进程。")
|
|
return False
|