Files
diskmanager2/disk_operations_tkinter.py
2026-02-09 03:14:35 +08:00

540 lines
24 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# disk_operations_tkinter.py
import subprocess
import logging
import re
import os
import threading
import queue
from tkinter import messagebox, simpledialog
from system_info import SystemInfoManager
logger = logging.getLogger(__name__)
# --- 后台格式化工作线程 ---
class FormatWorker:
"""后台格式化工作线程"""
def __init__(self, device_path, fs_type, execute_shell_command_func, result_queue):
self.device_path = device_path
self.fs_type = fs_type
self._execute_shell_command_func = execute_shell_command_func
self.result_queue = result_queue
def run(self):
"""在单独线程中执行格式化命令。"""
logger.info(f"后台格式化开始: 设备 {self.device_path}, 文件系统 {self.fs_type}")
# 发送开始信号
self.result_queue.put(('started', self.device_path))
command_list = []
if self.fs_type == "ext4":
command_list = ["mkfs.ext4", "-F", self.device_path]
elif self.fs_type == "xfs":
command_list = ["mkfs.xfs", "-f", self.device_path]
elif self.fs_type == "ntfs":
command_list = ["mkfs.ntfs", "-f", self.device_path]
elif self.fs_type == "fat32":
command_list = ["mkfs.vfat", "-F", "32", self.device_path]
else:
logger.error(f"不支持的文件系统类型: {self.fs_type}")
self.result_queue.put(('finished', False, self.device_path, "", f"不支持的文件系统类型: {self.fs_type}"))
return
# 调用 shell 命令执行函数
success, stdout, stderr = self._execute_shell_command_func(
command_list,
f"格式化设备 {self.device_path}{self.fs_type} 失败",
root_privilege=True,
show_dialog=False
)
self.result_queue.put(('finished', success, self.device_path, stdout, stderr))
logger.info(f"后台格式化完成: 设备 {self.device_path}, 成功: {success}")
class DiskOperations:
"""磁盘操作类 - Tkinter 版本"""
def __init__(self, system_manager: SystemInfoManager, lvm_ops):
self.system_manager = system_manager
self._lvm_ops = lvm_ops
self.active_format_workers = {}
self.result_queue = queue.Queue()
self.formatting_callback = None
self.root = None
def set_formatting_callback(self, callback):
"""设置格式化完成回调函数"""
self.formatting_callback = callback
def start_queue_check(self, root):
"""启动队列检查(应在主线程中调用)"""
self.root = root
self._check_queue()
def _check_queue(self):
"""定期检查队列"""
try:
while True:
msg = self.result_queue.get_nowait()
msg_type = msg[0]
if msg_type == 'started':
device_path = msg[1]
logger.info(f"格式化开始: {device_path}")
elif msg_type == 'finished':
_, success, device_path, stdout, stderr = msg
self._on_formatting_finished(success, device_path, stdout, stderr)
except queue.Empty:
pass
# 继续定时检查
if self.root:
self.root.after(100, self._check_queue)
def _execute_shell_command(self, command_list, error_message, root_privilege=True,
suppress_critical_dialog_on_stderr_match=None, input_data=None,
show_dialog=True):
"""执行 shell 命令"""
return self._lvm_ops._execute_shell_command(
command_list, error_message, root_privilege, suppress_critical_dialog_on_stderr_match, input_data, show_dialog
)
def _add_to_fstab(self, device_path, mount_point, fstype, uuid):
"""将设备的挂载信息添加到 /etc/fstab"""
if not uuid or not mount_point or not fstype:
logger.error(f"无法将设备 {device_path} 添加到 fstab缺少 UUID/挂载点/文件系统类型。")
messagebox.showwarning("警告", f"无法将设备 {device_path} 添加到 fstab缺少 UUID/挂载点/文件系统类型。")
return False
fstab_entry = f"UUID={uuid} {mount_point} {fstype} defaults 0 2"
fstab_path = "/etc/fstab"
try:
# 检查 fstab 中是否已存在相同 UUID 的条目
with open(fstab_path, 'r') as f:
fstab_content = f.readlines()
for line in fstab_content:
if f"UUID={uuid}" in line:
logger.warning(f"设备 {device_path} (UUID={uuid}) 已存在于 fstab 中。跳过添加。")
messagebox.showinfo("信息", f"设备 {device_path} (UUID={uuid}) 已存在于 fstab 中。")
return True
# 追加到 fstab
success, _, stderr = self._execute_shell_command(
["sh", "-c", f"echo '{fstab_entry}' >> {fstab_path}"],
f"{device_path} 添加到 {fstab_path} 失败",
root_privilege=True
)
if success:
logger.info(f"已将 {fstab_entry} 添加到 {fstab_path}")
messagebox.showinfo("成功", f"设备 {device_path} 已成功添加到 /etc/fstab。")
return True
else:
return False
except Exception as e:
logger.error(f"处理 {fstab_path} 失败: {e}")
messagebox.showerror("错误", f"处理 {fstab_path} 失败: {e}")
return False
def _remove_fstab_entry(self, device_path):
"""从 /etc/fstab 中移除指定设备的条目"""
device_details = self.system_manager.get_device_details_by_path(device_path)
if not device_details or not device_details.get('uuid'):
logger.warning(f"无法获取设备 {device_path} 的 UUID无法从 fstab 中移除。")
return False
uuid = device_details.get('uuid')
fstab_path = "/etc/fstab"
command = ["sed", "-i", f"/{re.escape(uuid)}/d", fstab_path]
success, _, stderr = self._execute_shell_command(
command,
f"{fstab_path} 中删除 UUID={uuid} 的条目失败",
root_privilege=True,
suppress_critical_dialog_on_stderr_match=(
f"sed: {fstab_path}: No such file or directory",
f"sed: {fstab_path}: 没有那个文件或目录",
"no changes were made"
)
)
if success:
logger.info(f"已从 {fstab_path} 中删除 UUID={uuid} 的条目。")
return True
else:
if any(s in stderr for s in (
f"sed: {fstab_path}: No such file or directory",
f"sed: {fstab_path}: 没有那个文件或目录",
"no changes were made",
"No such file or directory"
)):
logger.info(f"fstab 中未找到 UUID={uuid} 的条目,无需删除。")
return True
return False
def mount_partition(self, device_path, mount_point, add_to_fstab=False):
"""挂载指定设备到指定挂载点"""
if not os.path.exists(mount_point):
try:
os.makedirs(mount_point)
logger.info(f"创建挂载点目录: {mount_point}")
except OSError as e:
messagebox.showerror("错误", f"创建挂载点目录 {mount_point} 失败: {e}")
logger.error(f"创建挂载点目录 {mount_point} 失败: {e}")
return False
logger.info(f"尝试挂载设备 {device_path}{mount_point}")
success, _, stderr = self._execute_shell_command(
["mount", device_path, mount_point],
f"挂载设备 {device_path} 失败"
)
if success:
messagebox.showinfo("成功", f"设备 {device_path} 已成功挂载到 {mount_point}")
if add_to_fstab:
device_details = self.system_manager.get_device_details_by_path(device_path)
if device_details:
fstype = device_details.get('fstype')
uuid = device_details.get('uuid')
if fstype and uuid:
self._add_to_fstab(device_path, mount_point, fstype, uuid)
else:
logger.error(f"无法获取设备 {device_path} 的文件系统类型或 UUID 以添加到 fstab。")
messagebox.showwarning("警告", f"设备 {device_path} 已挂载,但无法获取文件系统类型或 UUID 以添加到 fstab。")
else:
logger.error(f"无法获取设备 {device_path} 的详细信息以添加到 fstab。")
messagebox.showwarning("警告", f"设备 {device_path} 已挂载,但无法获取详细信息以添加到 fstab。")
return True
else:
return False
def unmount_partition(self, device_path, show_dialog_on_error=True):
"""卸载指定设备"""
logger.info(f"尝试卸载设备 {device_path}")
already_unmounted_errors = ("not mounted", "未挂载", "未指定挂载点")
success, _, stderr = self._execute_shell_command(
["umount", device_path],
f"卸载设备 {device_path} 失败",
suppress_critical_dialog_on_stderr_match=already_unmounted_errors,
show_dialog=show_dialog_on_error
)
if success:
messagebox.showinfo("成功", f"设备 {device_path} 已成功卸载。")
return True
else:
is_already_unmounted_error = any(s in stderr for s in already_unmounted_errors)
if is_already_unmounted_error:
logger.info(f"设备 {device_path} 已经处于未挂载状态。")
return True
else:
return False
def get_disk_free_space_info_mib(self, disk_path, total_disk_mib):
"""获取磁盘上最大的空闲空间块的起始位置和大小"""
logger.debug(f"尝试获取磁盘 {disk_path} 的空闲空间信息 (MiB)。")
success, stdout, stderr = self._execute_shell_command(
["parted", "-s", disk_path, "unit", "MiB", "print", "free"],
f"获取磁盘 {disk_path} 分区信息失败",
root_privilege=True,
suppress_critical_dialog_on_stderr_match=("无法辨识的磁盘卷标", "unrecognized disk label")
)
if not success:
logger.error(f"获取磁盘 {disk_path} 空闲空间信息失败: {stderr}")
return None, None
free_spaces = []
lines = stdout.splitlines()
free_space_line_pattern = re.compile(r'^\s*(\d+\.?\d*)MiB\s+(\d+\.?\d*)MiB\s+(\d+\.?\d*)MiB\s+(?:Free Space|空闲空间|可用空间)')
for line in lines:
match = free_space_line_pattern.match(line)
if match:
try:
start_mib = float(match.group(1))
size_mib = float(match.group(3))
free_spaces.append({'start_mib': start_mib, 'size_mib': size_mib})
except ValueError as ve:
logger.warning(f"解析 parted free space 行 '{line}' 失败: {ve}")
continue
if not free_spaces:
logger.warning(f"在磁盘 {disk_path} 上未找到空闲空间。")
return None, None
largest_free_space = max(free_spaces, key=lambda x: x['size_mib'])
start_mib = largest_free_space['start_mib']
size_mib = largest_free_space['size_mib']
logger.debug(f"磁盘 {disk_path} 的最大空闲空间块起始于 {start_mib:.2f} MiB大小为 {size_mib:.2f} MiB。")
return start_mib, size_mib
def create_partition(self, disk_path, partition_table_type, size_gb, total_disk_mib, use_max_space):
"""在指定磁盘上创建分区"""
if not isinstance(disk_path, str) or not isinstance(partition_table_type, str):
logger.error(f"尝试创建分区时传入无效参数。磁盘路径: {disk_path}, 分区表类型: {partition_table_type}")
messagebox.showerror("错误", "无效的磁盘路径或分区表类型。")
return False
# 1. 检查磁盘是否有分区表
has_partition_table = False
success_check, stdout_check, stderr_check = self._execute_shell_command(
["parted", "-s", disk_path, "print"],
f"检查磁盘 {disk_path} 分区表失败",
suppress_critical_dialog_on_stderr_match=("无法辨识的磁盘卷标", "unrecognized disk label"),
root_privilege=True
)
if not success_check:
if any(s in stderr_check for s in ("无法辨识的磁盘卷标", "unrecognized disk label")):
logger.info(f"磁盘 {disk_path} 没有可识别的分区表。")
has_partition_table = False
else:
logger.error(f"检查磁盘 {disk_path} 分区表时发生非预期的错误: {stderr_check}")
return False
else:
if "Partition Table: unknown" not in stdout_check and "分区表unknown" not in stdout_check:
has_partition_table = True
else:
logger.info(f"parted print 报告磁盘 {disk_path} 的分区表为 'unknown'")
has_partition_table = False
actual_start_mib_for_parted = 0.0
# 2. 如果没有分区表,则创建分区表
if not has_partition_table:
if not messagebox.askyesno("确认创建分区表",
f"磁盘 {disk_path} 没有分区表。您确定要创建 {partition_table_type} 分区表吗?\n"
f"此操作将擦除磁盘上的所有数据。",
default=messagebox.NO):
logger.info(f"用户取消了在 {disk_path} 上创建分区表的操作。")
return False
logger.info(f"尝试在 {disk_path} 上创建 {partition_table_type} 分区表。")
success, _, stderr = self._execute_shell_command(
["parted", "-s", disk_path, "mklabel", partition_table_type],
f"创建 {partition_table_type} 分区表失败"
)
if not success:
return False
actual_start_mib_for_parted = 1.0
else:
start_mib_from_parted, _ = self.get_disk_free_space_info_mib(disk_path, total_disk_mib)
if start_mib_from_parted is None:
messagebox.showerror("错误", f"无法确定磁盘 {disk_path} 的分区起始位置或没有空闲空间。")
return False
if start_mib_from_parted < 1.0:
actual_start_mib_for_parted = 1.0
else:
actual_start_mib_for_parted = start_mib_from_parted
# 3. 确定分区结束位置
if use_max_space:
end_pos = "100%"
size_for_log = "最大可用空间"
else:
end_mib = actual_start_mib_for_parted + size_gb * 1024
if end_mib > total_disk_mib:
messagebox.showwarning("警告", f"请求的分区大小 ({size_gb}GB) 超出了可用空间。将调整为最大可用空间。")
end_pos = "100%"
size_for_log = "最大可用空间"
else:
end_pos = f"{end_mib}MiB"
size_for_log = f"{size_gb}GB"
# 4. 创建分区
create_cmd = ["parted", "-s", disk_path, "mkpart", "primary", f"{actual_start_mib_for_parted}MiB", end_pos]
logger.info(f"尝试在 {disk_path} 上创建 {size_for_log} 的主分区。命令: {' '.join(create_cmd)}")
success, _, stderr = self._execute_shell_command(
create_cmd,
f"{disk_path} 上创建分区失败"
)
if success:
messagebox.showinfo("成功", f"{disk_path} 上成功创建了 {size_for_log} 的分区。")
return True
else:
return False
def delete_partition(self, device_path):
"""删除指定分区"""
if not messagebox.askyesno("确认删除分区",
f"您确定要删除分区 {device_path} 吗?此操作将擦除分区上的所有数据!",
default=messagebox.NO):
logger.info(f"用户取消了删除分区 {device_path} 的操作。")
return False
# 尝试卸载分区
self.unmount_partition(device_path, show_dialog_on_error=False)
# 从 fstab 中移除条目
self._remove_fstab_entry(device_path)
# 获取父磁盘和分区号
match = re.match(r'(/dev/[a-z]+)(\d+)', device_path)
if not match:
messagebox.showerror("错误", f"无法解析设备路径 {device_path}")
logger.error(f"无法解析设备路径 {device_path}")
return False
disk_path = match.group(1)
partition_number = match.group(2)
logger.info(f"尝试删除分区 {device_path} (磁盘: {disk_path}, 分区号: {partition_number})。")
success, _, stderr = self._execute_shell_command(
["parted", "-s", disk_path, "rm", partition_number],
f"删除分区 {device_path} 失败"
)
if success:
messagebox.showinfo("成功", f"分区 {device_path} 已成功删除。")
return True
else:
return False
def format_partition(self, device_path, fstype=None):
"""启动后台格式化线程"""
if fstype is None:
# 使用简单的输入对话框
fstype = simpledialog.askstring("选择文件系统",
f"请选择要使用的文件系统类型 for {device_path}:\n\next4, xfs, fat32, ntfs",
initialvalue="ext4")
if not fstype:
logger.info("用户取消了文件系统选择。")
return False
if fstype not in ["ext4", "xfs", "fat32", "ntfs"]:
messagebox.showerror("错误", f"不支持的文件系统类型: {fstype}")
return False
if not messagebox.askyesno("确认格式化",
f"您确定要格式化设备 {device_path}{fstype} 吗?此操作将擦除所有数据!",
default=messagebox.NO):
logger.info(f"用户取消了格式化 {device_path} 的操作。")
return False
# 检查是否已有格式化任务正在进行
if device_path in self.active_format_workers:
messagebox.showwarning("警告", f"设备 {device_path} 正在格式化中,请勿重复操作。")
return False
# 尝试卸载分区
self.unmount_partition(device_path, show_dialog_on_error=False)
# 从 fstab 中移除条目
self._remove_fstab_entry(device_path)
# 创建并启动后台线程
worker = FormatWorker(device_path, fstype, self._execute_shell_command, self.result_queue)
thread = threading.Thread(target=worker.run, daemon=True)
self.active_format_workers[device_path] = thread
thread.start()
messagebox.showinfo("开始格式化", f"设备 {device_path} 正在后台格式化为 {fstype}。完成后将通知您。")
return True
def _on_formatting_finished(self, success, device_path, stdout, stderr):
"""处理格式化工作线程完成后的结果"""
if device_path in self.active_format_workers:
del self.active_format_workers[device_path]
if success:
messagebox.showinfo("格式化成功", f"设备 {device_path} 已成功格式化。")
else:
messagebox.showerror("格式化失败", f"格式化设备 {device_path} 失败。\n错误详情: {stderr}")
# 调用回调函数通知主窗口
if self.formatting_callback:
self.formatting_callback(success, device_path, stdout, stderr)
# === Loop 设备挂载/卸载功能 ===
def mount_loop_device(self, device_path):
"""挂载 ROM 设备为 loop 设备"""
# 获取默认挂载点
default_mount = f"/mnt/{os.path.basename(device_path)}"
mount_point = simpledialog.askstring(
"挂载 ROM 设备",
f"请输入挂载点路径 for {device_path}:",
initialvalue=default_mount
)
if not mount_point:
logger.info("用户取消了挂载操作。")
return False
# 创建挂载点目录
if not os.path.exists(mount_point):
try:
os.makedirs(mount_point)
logger.info(f"创建挂载点目录: {mount_point}")
except OSError as e:
messagebox.showerror("错误", f"创建挂载点目录 {mount_point} 失败: {e}")
return False
# 使用 mount -o loop 挂载
logger.info(f"尝试以 loop 方式挂载 ROM 设备 {device_path}{mount_point}")
success, _, stderr = self._execute_shell_command(
["mount", "-o", "loop,ro", device_path, mount_point],
f"挂载 ROM 设备 {device_path} 失败"
)
if success:
messagebox.showinfo("成功", f"ROM 设备 {device_path} 已成功挂载到 {mount_point}")
return True
else:
# 检查是否是因为设备不是有效的文件系统
if "wrong fs type" in stderr.lower() or "bad option" in stderr.lower():
messagebox.showerror("错误",
f"无法挂载 {device_path}\n"
f"可能的原因:\n"
f"1. 设备中没有可读取的介质(如光盘未插入)\n"
f"2. 设备不支持 loop 挂载\n\n"
f"错误详情: {stderr}")
return False
def unmount_loop_device(self, device_path):
"""卸载 loop 设备"""
logger.info(f"尝试卸载 loop 设备 {device_path}")
# 首先尝试普通卸载
success, _, stderr = self._execute_shell_command(
["umount", device_path],
f"卸载设备 {device_path} 失败",
show_dialog=False
)
if success:
messagebox.showinfo("成功", f"设备 {device_path} 已成功卸载。")
return True
# 检查是否是因为设备未挂载
if "not mounted" in stderr.lower() or "未挂载" in stderr:
# 尝试查找关联的 loop 设备并卸载
try:
result = subprocess.run(
["losetup", "-a"],
capture_output=True, text=True, check=False
)
if result.returncode == 0:
for line in result.stdout.splitlines():
if device_path in line:
loop_dev = line.split(':')[0]
logger.info(f"找到关联的 loop 设备: {loop_dev}")
# 卸载 loop 设备
success, _, _ = self._execute_shell_command(
["losetup", "-d", loop_dev],
f"删除 loop 设备 {loop_dev} 失败",
show_dialog=False
)
if success:
messagebox.showinfo("成功", f"Loop 设备 {loop_dev} 已卸载。")
return True
except Exception as e:
logger.error(f"查找 loop 设备时出错: {e}")
messagebox.showerror("错误", f"卸载设备 {device_path} 失败:\n{stderr}")
return False