Files
diskmanager/mainwindow.py
2026-02-03 15:18:43 +08:00

854 lines
40 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# mainwindow.py
import sys
import logging
import re
import os
from PySide6.QtWidgets import (QApplication, QMainWindow, QTreeWidgetItem,
QMessageBox, QHeaderView, QMenu, QInputDialog, QDialog)
from PySide6.QtCore import Qt, QPoint, QThread # <--- 确保导入 QThread
# 导入自动生成的 UI 文件
from ui_form import Ui_MainWindow
# 导入我们自己编写的系统信息管理模块
from system_info import SystemInfoManager
# 导入日志配置
from logger_config import setup_logging, logger
# 导入磁盘操作模块
from disk_operations import DiskOperations
# 导入 RAID 操作模块
from raid_operations import RaidOperations
# 导入 LVM 操作模块
from lvm_operations import LvmOperations
# 导入自定义对话框
from dialogs import (CreatePartitionDialog, MountDialog, CreateRaidDialog,
CreatePvDialog, CreateVgDialog, CreateLvDialog)
class MainWindow(QMainWindow):
def __init__(self, parent=None):
super().__init__(parent)
self.ui = Ui_MainWindow()
self.ui.setupUi(self)
setup_logging(self.ui.logOutputTextEdit)
logger.info("应用程序启动。")
# 初始化管理器和操作类
self.system_manager = SystemInfoManager()
self.lvm_ops = LvmOperations() # <--- 先初始化 LvmOperations
# <--- 将 lvm_ops 实例传递给 DiskOperations
self.disk_ops = DiskOperations(self.system_manager, self.lvm_ops)
self.raid_ops = RaidOperations()
# self.lvm_ops = LvmOperations() # 这一行是重复的,可以删除
# 连接刷新按钮的信号到槽函数
if hasattr(self.ui, 'refreshButton'):
self.ui.refreshButton.clicked.connect(self.refresh_all_info)
else:
logger.warning("Warning: refreshButton not found in UI. Please add it in form.ui and regenerate ui_form.py.")
# 启用 treeWidget 的自定义上下文菜单
self.ui.treeWidget_block_devices.setContextMenuPolicy(Qt.CustomContextMenu)
self.ui.treeWidget_block_devices.customContextMenuRequested.connect(self.show_block_device_context_menu)
self.ui.treeWidget_raid.setContextMenuPolicy(Qt.CustomContextMenu)
self.ui.treeWidget_raid.customContextMenuRequested.connect(self.show_raid_context_menu)
self.ui.treeWidget_lvm.setContextMenuPolicy(Qt.CustomContextMenu)
self.ui.treeWidget_lvm.customContextMenuRequested.connect(self.show_lvm_context_menu)
# <--- 新增: 连接 DiskOperations 的格式化完成信号
self.disk_ops.formatting_finished.connect(self.on_disk_formatting_finished)
# 可选:连接格式化开始信号,用于显示进度或禁用相关操作
# self.disk_ops.formatting_started.connect(self.on_disk_formatting_started)
# 初始化时刷新所有数据
self.refresh_all_info()
logger.info("所有设备信息已初始化加载。")
def refresh_all_info(self):
"""
刷新所有设备信息块设备、RAID和LVM。
"""
logger.info("开始刷新所有设备信息...")
self.refresh_block_devices_info()
self.refresh_raid_info()
self.refresh_lvm_info()
logger.info("所有设备信息刷新完成。")
# --- 块设备概览 Tab ---
def refresh_block_devices_info(self):
self.ui.treeWidget_block_devices.clear()
columns = [
("设备名", 'name'), ("类型", 'type'), ("大小", 'size'), ("挂载点", 'mountpoint'),
("文件系统", 'fstype'), ("只读", 'ro'), ("UUID", 'uuid'), ("PARTUUID", 'partuuid'),
("厂商", 'vendor'), ("型号", 'model'), ("序列号", 'serial'),
("主次号", 'maj:min'), ("父设备名", 'pkname'),
]
headers = [col[0] for col in columns]
self.field_keys = [col[1] for col in columns]
self.ui.treeWidget_block_devices.setColumnCount(len(headers))
self.ui.treeWidget_block_devices.setHeaderLabels(headers)
for i in range(len(headers)):
self.ui.treeWidget_block_devices.header().setSectionResizeMode(i, QHeaderView.ResizeToContents)
try:
devices = self.system_manager.get_block_devices()
for dev in devices:
self._add_device_to_tree(self.ui.treeWidget_block_devices, dev)
for i in range(len(headers)):
self.ui.treeWidget_block_devices.resizeColumnToContents(i)
logger.info("块设备信息刷新成功。")
except Exception as e:
QMessageBox.critical(self, "错误", f"刷新块设备信息失败: {e}")
logger.error(f"刷新块设备信息失败: {e}")
def _add_device_to_tree(self, parent_item, dev_data):
item = QTreeWidgetItem(parent_item)
for i, key in enumerate(self.field_keys):
value = dev_data.get(key)
if key == 'ro':
item.setText(i, "" if value else "")
elif value is None:
item.setText(i, "")
else:
item.setText(i, str(value))
item.setData(0, Qt.UserRole, dev_data)
if 'children' in dev_data:
for child in dev_data['children']:
self._add_device_to_tree(item, child)
item.setExpanded(True)
def show_block_device_context_menu(self, pos: QPoint):
item = self.ui.treeWidget_block_devices.itemAt(pos)
menu = QMenu(self)
create_menu = QMenu("创建...", self)
create_raid_action = create_menu.addAction("创建 RAID 阵列...")
create_raid_action.triggered.connect(self._handle_create_raid_array)
create_pv_action = create_menu.addAction("创建物理卷 (PV)...")
create_pv_action.triggered.connect(self._handle_create_pv)
menu.addMenu(create_menu)
menu.addSeparator()
if item:
dev_data = item.data(0, Qt.UserRole)
if not dev_data:
logger.warning(f"无法获取设备 {item.text(0)} 的详细数据。")
return
device_name = dev_data.get('name')
device_type = dev_data.get('type')
mount_point = dev_data.get('mountpoint')
device_path = dev_data.get('path')
if not device_path:
device_path = f"/dev/{device_name}"
if device_type == 'disk':
create_partition_action = menu.addAction(f"创建分区 {device_path}...")
create_partition_action.triggered.connect(lambda: self._handle_create_partition(device_path, dev_data))
# --- 新增功能:擦除分区表 ---
wipe_action = menu.addAction(f"擦除分区表 {device_path}...")
wipe_action.triggered.connect(lambda: self._handle_wipe_partition_table(device_path))
# --- 新增功能结束 ---
menu.addSeparator()
if device_type == 'part':
if not mount_point or mount_point == '' or mount_point == 'N/A':
mount_action = menu.addAction(f"挂载 {device_path}...")
mount_action.triggered.connect(lambda: self._handle_mount(device_path))
elif mount_point != '[SWAP]':
unmount_action = menu.addAction(f"卸载 {device_path}")
unmount_action.triggered.connect(lambda: self._unmount_and_refresh(device_path))
menu.addSeparator()
delete_action = menu.addAction(f"删除分区 {device_path}")
delete_action.triggered.connect(lambda: self._handle_delete_partition(device_path))
format_action = menu.addAction(f"格式化分区 {device_path}...")
format_action.triggered.connect(lambda: self._handle_format_partition(device_path))
if menu.actions():
menu.exec(self.ui.treeWidget_block_devices.mapToGlobal(pos))
else:
logger.info("右键点击了空白区域或设备没有可用的操作。")
# --- 新增方法:处理擦除分区表 ---
def _handle_wipe_partition_table(self, device_path):
"""
处理擦除物理盘分区表的操作。
"""
reply = QMessageBox.question(
self,
"确认擦除分区表",
f"您确定要擦除设备 {device_path} 上的所有分区表吗?\n"
f"**此操作将永久删除设备上的所有分区和数据,且不可恢复!**\n"
f"请谨慎操作!",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.No:
logger.info(f"用户取消了擦除 {device_path} 分区表的操作。")
return
logger.warning(f"尝试擦除 {device_path} 上的分区表。")
# 使用 LvmOperations 中通用的 _execute_shell_command 来执行 parted 命令
# 这里不需要 show_dialog=False因为这个操作本身就是同步且需要用户确认的
success, _, stderr = self.lvm_ops._execute_shell_command(
["parted", "-s", device_path, "mklabel", "gpt"], # 默认使用 gpt 分区表类型
f"擦除 {device_path} 上的分区表失败"
)
if success:
QMessageBox.information(self, "成功", f"设备 {device_path} 上的分区表已成功擦除。")
self.refresh_all_info()
else:
# 错误信息已由 _execute_shell_command 处理并显示
pass
# --- 新增方法结束 ---
def _handle_create_partition(self, disk_path, dev_data):
total_disk_mib = 0.0
total_size_str = dev_data.get('size')
logger.debug(f"尝试为磁盘 {disk_path} 创建分区。原始大小字符串: '{total_size_str}'")
if total_size_str:
match = re.match(r'(\d+(\.\d+)?)\s*([KMGT]?B?)', total_size_str, re.IGNORECASE)
if match:
value = float(match.group(1))
unit = match.group(3).upper() if match.group(3) else ''
if unit == 'KB' or unit == 'K': total_disk_mib = value / 1024
elif unit == 'MB' or unit == 'M': total_disk_mib = value
elif unit == 'GB' or unit == 'G': total_disk_mib = value * 1024
elif unit == 'TB' or unit == 'T': total_disk_mib = value * 1024 * 1024
elif unit == 'B':
total_disk_mib = value / (1024 * 1024)
else:
logger.warning(f"无法识别磁盘 {disk_path} 的大小单位: '{unit}' (原始: '{total_size_str}')")
total_disk_mib = 0.0
logger.debug(f"解析后的磁盘总大小 (MiB): {total_disk_mib}")
else:
logger.warning(f"无法解析磁盘 {disk_path} 的大小字符串 '{total_size_str}'。正则表达式不匹配。")
total_disk_mib = 0.0
else:
logger.warning(f"获取磁盘 {disk_path} 的大小字符串为空或None。")
total_disk_mib = 0.0
if total_disk_mib <= 0.0:
QMessageBox.critical(self, "错误", f"无法获取磁盘 {disk_path} 的有效总大小。")
return
start_position_mib = 0.0
max_available_mib = total_disk_mib
if dev_data.get('children'):
logger.debug(f"磁盘 {disk_path} 存在现有分区,尝试计算下一个分区起始位置。")
# 假设 disk_ops.get_disk_free_space_info_mib 能够正确处理
calculated_start_mib, largest_free_space_mib = self.disk_ops.get_disk_free_space_info_mib(disk_path, total_disk_mib)
if calculated_start_mib is None or largest_free_space_mib is None:
QMessageBox.critical(self, "错误", f"无法确定磁盘 {disk_path} 的分区起始位置。")
return
start_position_mib = calculated_start_mib
max_available_mib = largest_free_space_mib
if max_available_mib < 0:
max_available_mib = 0.0
else:
logger.debug(f"磁盘 {disk_path} 没有现有分区,假定从 0 MiB 开始,最大可用空间为整个磁盘。")
max_available_mib = max(0.0, total_disk_mib - 1.0) # 留一点空间,避免边界问题
start_position_mib = 1.0 # 现代分区表通常从1MB或更大偏移开始
logger.debug(f"磁盘 {disk_path} 没有现有分区,假定从 {start_position_mib} MiB 开始,最大可用空间为 {max_available_mib} MiB。")
dialog = CreatePartitionDialog(self, disk_path, total_disk_mib, max_available_mib)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_partition_info()
if info:
if self.disk_ops.create_partition(
info['disk_path'],
info['partition_table_type'],
info['size_gb'],
info['total_disk_mib'],
info['use_max_space']
):
self.refresh_all_info()
def _handle_mount(self, device_path):
dialog = MountDialog(self, device_path)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_mount_info()
if info:
if self.disk_ops.mount_partition(device_path, info['mount_point'], info['add_to_fstab']):
self.refresh_all_info()
def _unmount_and_refresh(self, device_path):
if self.disk_ops.unmount_partition(device_path, show_dialog_on_error=True):
self.refresh_all_info()
def _handle_delete_partition(self, device_path):
if self.disk_ops.delete_partition(device_path):
self.refresh_all_info()
def _handle_format_partition(self, device_path):
"""
调用 DiskOperations 的异步格式化方法。
"""
# DiskOperations.format_partition 会处理用户确认和启动后台线程
self.disk_ops.format_partition(device_path)
# 界面刷新将在格式化完成后由 on_disk_formatting_finished 槽函数触发,所以这里不需要 refresh_all_info()
# <--- 新增: 格式化完成后的槽函数
def on_disk_formatting_finished(self, success, device_path, stdout, stderr):
"""
接收 DiskOperations 发出的格式化完成信号,并刷新界面。
"""
logger.info(f"格式化完成信号接收: 设备 {device_path}, 成功: {success}")
# QMessageBox 已经在 DiskOperations 的 _on_formatting_finished 中处理
self.refresh_all_info() # 刷新所有信息以显示更新后的文件系统类型等
# --- RAID 管理 Tab ---
def refresh_raid_info(self):
self.ui.treeWidget_raid.clear()
raid_headers = [
"阵列设备", "级别", "状态", "大小", "活动设备", "失败设备", "备用设备",
"总设备数", "UUID", "名称", "Chunk Size", "挂载点"
]
self.ui.treeWidget_raid.setColumnCount(len(raid_headers))
self.ui.treeWidget_raid.setHeaderLabels(raid_headers)
for i in range(len(raid_headers)):
self.ui.treeWidget_raid.header().setSectionResizeMode(i, QHeaderView.ResizeToContents)
try:
raid_arrays = self.system_manager.get_mdadm_arrays() # 现在会返回所有阵列,包括停止的
if not raid_arrays:
item = QTreeWidgetItem(self.ui.treeWidget_raid)
item.setText(0, "未找到RAID阵列。")
logger.info("未找到RAID阵列。")
return
for array in raid_arrays:
array_item = QTreeWidgetItem(self.ui.treeWidget_raid)
array_path = array.get('device', 'N/A')
if not array_path or array_path == 'N/A':
logger.warning(f"RAID阵列 '{array.get('name', '未知')}' 的设备路径无效,跳过。")
continue
# 对于停止状态的阵列,挂载点可能不存在或不相关
current_mount_point = ""
if array.get('state') != 'Stopped (Configured)':
current_mount_point = self.system_manager.get_mountpoint_for_device(array_path)
array_item.setText(0, array_path)
array_item.setText(1, array.get('level', 'N/A'))
array_item.setText(2, array.get('state', 'N/A')) # 显示实际状态
array_item.setText(3, array.get('array_size', 'N/A'))
array_item.setText(4, array.get('active_devices', 'N/A'))
array_item.setText(5, array.get('failed_devices', 'N/A'))
array_item.setText(6, array.get('spare_devices', 'N/A'))
array_item.setText(7, array.get('total_devices', 'N/A'))
array_item.setText(8, array.get('uuid', 'N/A'))
array_item.setText(9, array.get('name', 'N/A'))
array_item.setText(10, array.get('chunk_size', 'N/A'))
array_item.setText(11, current_mount_point if current_mount_point else "")
array_item.setExpanded(True)
# 存储完整的阵列数据,包括状态,供上下文菜单使用
array_item.setData(0, Qt.UserRole, array)
# 对于停止状态的阵列,成员设备可能为空,跳过显示子项
if array.get('state') != 'Stopped (Configured)':
for member in array.get('member_devices', []):
member_item = QTreeWidgetItem(array_item)
member_item.setText(0, f" {member.get('device_path', 'N/A')}")
member_item.setText(1, f"成员: {member.get('raid_device', 'N/A')}") # 假设 raid_device 字段存在
member_item.setText(2, member.get('state', 'N/A'))
# member_item.setText(3, f"Major: {member.get('major', 'N/A')}, Minor: {member.get('minor', 'N/A')}") # 假设 major/minor 字段存在
for i in range(len(raid_headers)):
self.ui.treeWidget_raid.resizeColumnToContents(i)
logger.info("RAID阵列信息刷新成功。")
except Exception as e:
QMessageBox.critical(self, "错误", f"刷新RAID阵列信息失败: {e}")
logger.error(f"刷新RAID阵列信息失败: {e}")
def show_raid_context_menu(self, pos: QPoint):
item = self.ui.treeWidget_raid.itemAt(pos)
menu = QMenu(self)
create_raid_action = menu.addAction("创建 RAID 阵列...")
create_raid_action.triggered.connect(self._handle_create_raid_array)
menu.addSeparator()
if item and item.parent() is None: # 只针对顶层阵列项
array_data = item.data(0, Qt.UserRole)
if not array_data:
logger.warning(f"无法获取 RAID 阵列 {item.text(0)} 的详细数据。")
return
array_path = array_data.get('device')
array_state = array_data.get('state', 'N/A')
member_devices = [m.get('device_path') for m in array_data.get('member_devices', [])]
array_uuid = array_data.get('uuid') # <--- 获取 UUID
if not array_path or array_path == 'N/A':
logger.warning(f"RAID阵列 '{array_data.get('name', '未知')}' 的设备路径无效,无法显示操作。")
return
if array_state == 'Stopped (Configured)':
activate_action = menu.addAction(f"激活阵列 {array_path}")
activate_action.triggered.connect(lambda: self._handle_activate_raid_array(array_path))
# <--- 新增:删除停止状态阵列的配置条目
delete_config_action = menu.addAction(f"删除配置文件条目 (UUID: {array_uuid})")
delete_config_action.triggered.connect(lambda: self._handle_delete_configured_raid_array(array_uuid))
# 停止状态的阵列不显示卸载、停止、删除、格式化等操作
else: # 活动或降级状态的阵列
current_mount_point = self.system_manager.get_mountpoint_for_device(array_path)
if current_mount_point and current_mount_point != '[SWAP]' and current_mount_point != '':
unmount_action = menu.addAction(f"卸载 {array_path} ({current_mount_point})")
unmount_action.triggered.connect(lambda: self._unmount_and_refresh(array_path))
else:
mount_action = menu.addAction(f"挂载 {array_path}...")
mount_action.triggered.connect(lambda: self._handle_mount(array_path))
menu.addSeparator()
stop_action = menu.addAction(f"停止阵列 {array_path}")
stop_action.triggered.connect(lambda: self._handle_stop_raid_array(array_path))
# <--- 修改:删除活动阵列的动作,传递 UUID
delete_action = menu.addAction(f"删除阵列 {array_path}")
delete_action.triggered.connect(lambda: self._handle_delete_active_raid_array(array_path, member_devices, array_uuid))
format_action = menu.addAction(f"格式化阵列 {array_path}...")
format_action.triggered.connect(lambda: self._handle_format_raid_array(array_path))
if menu.actions():
menu.exec(self.ui.treeWidget_raid.mapToGlobal(pos))
else:
logger.info("右键点击了空白区域或没有可用的RAID操作。")
def _handle_activate_raid_array(self, array_path):
"""
处理激活已停止的 RAID 阵列。
"""
item = self.ui.treeWidget_raid.currentItem() # 获取当前选中的项
if not item or item.parent() is not None: # 确保是顶层阵列项
logger.warning("未选择有效的 RAID 阵列进行激活。")
return
array_data = item.data(0, Qt.UserRole)
if not array_data or array_data.get('device') != array_path:
logger.error(f"获取 RAID 阵列 {array_path} 的数据失败或数据不匹配。")
return
array_uuid = array_data.get('uuid')
if not array_uuid or array_uuid == 'N/A':
QMessageBox.critical(self, "错误", f"无法激活 RAID 阵列 {array_path}:缺少 UUID 信息。")
logger.error(f"激活 RAID 阵列 {array_path} 失败:缺少 UUID。")
return
reply = QMessageBox.question(
self,
"确认激活 RAID 阵列",
f"您确定要激活 RAID 阵列 {array_path} (UUID: {array_uuid}) 吗?\n"
"这将尝试重新组装阵列。",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.No:
logger.info(f"用户取消了激活 RAID 阵列 {array_path} 的操作。")
return
if self.raid_ops.activate_raid_array(array_path, array_uuid): # 传递 UUID
self.refresh_all_info()
def _handle_create_raid_array(self):
available_devices = self.system_manager.get_unallocated_partitions()
if not available_devices:
QMessageBox.warning(self, "警告", "没有可用于创建 RAID 阵列的设备。请确保有未挂载、未被LVM或RAID使用的磁盘或分区。")
return
dialog = CreateRaidDialog(self, available_devices)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_raid_info()
if info:
if self.raid_ops.create_raid_array(info['devices'], info['level'], info['chunk_size']):
self.refresh_all_info()
def _handle_stop_raid_array(self, array_path):
if self.raid_ops.stop_raid_array(array_path):
self.refresh_all_info()
def _handle_delete_active_raid_array(self, array_path, member_devices, uuid): # <--- 修改方法名并添加 uuid 参数
"""
处理删除活动 RAID 阵列的操作。
"""
if self.raid_ops.delete_active_raid_array(array_path, member_devices, uuid): # <--- 调用修改后的方法
self.refresh_all_info()
def _handle_delete_configured_raid_array(self, uuid): # <--- 新增方法
"""
处理删除停止状态 RAID 阵列的配置文件条目。
"""
if self.raid_ops.delete_configured_raid_array(uuid):
self.refresh_all_info()
def _handle_format_raid_array(self, array_path):
"""
处理 RAID 阵列的格式化。
"""
# 这将调用 DiskOperations 的异步格式化方法
self.disk_ops.format_partition(array_path)
# 刷新操作会在 on_disk_formatting_finished 中触发,所以这里不需要 refresh_all_info()
# --- LVM 管理 Tab ---
def refresh_lvm_info(self):
self.ui.treeWidget_lvm.clear()
lvm_headers = ["名称", "大小", "属性", "UUID", "关联", "空闲/已用", "路径/格式", "挂载点"]
self.ui.treeWidget_lvm.setColumnCount(len(lvm_headers))
self.ui.treeWidget_lvm.setHeaderLabels(lvm_headers)
for i in range(len(lvm_headers)):
self.ui.treeWidget_lvm.header().setSectionResizeMode(i, QHeaderView.ResizeToContents)
try:
lvm_data = self.system_manager.get_lvm_info()
if not lvm_data.get('pvs') and not lvm_data.get('vgs') and not lvm_data.get('lvs'):
item = QTreeWidgetItem(self.ui.treeWidget_lvm)
item.setText(0, "未找到LVM信息。")
logger.info("未找到LVM信息。")
return
# 物理卷 (PVs)
pv_root_item = QTreeWidgetItem(self.ui.treeWidget_lvm)
pv_root_item.setText(0, "物理卷 (PVs)")
pv_root_item.setExpanded(True)
pv_root_item.setData(0, Qt.UserRole, {'type': 'pv_root'})
if lvm_data.get('pvs'):
for pv in lvm_data['pvs']:
pv_item = QTreeWidgetItem(pv_root_item)
pv_name = pv.get('pv_name', 'N/A')
if pv_name.startswith('/dev/'):
pv_path = pv_name
else:
pv_path = f"/dev/{pv_name}" if pv_name != 'N/A' else 'N/A'
pv_item.setText(0, pv_name)
pv_item.setText(1, pv.get('pv_size', 'N/A'))
pv_item.setText(2, pv.get('pv_attr', 'N/A'))
pv_item.setText(3, pv.get('pv_uuid', 'N/A'))
pv_item.setText(4, f"VG: {pv.get('vg_name', 'N/A')}")
pv_item.setText(5, f"空闲: {pv.get('pv_free', 'N/A')}")
pv_item.setText(6, pv.get('pv_fmt', 'N/A'))
pv_item.setText(7, "")
pv_data_for_context = pv.copy()
pv_data_for_context['pv_name'] = pv_path
pv_item.setData(0, Qt.UserRole, {'type': 'pv', 'data': pv_data_for_context})
else:
item = QTreeWidgetItem(pv_root_item)
item.setText(0, "未找到物理卷。")
# 卷组 (VGs)
vg_root_item = QTreeWidgetItem(self.ui.treeWidget_lvm)
vg_root_item.setText(0, "卷组 (VGs)")
vg_root_item.setExpanded(True)
vg_root_item.setData(0, Qt.UserRole, {'type': 'vg_root'})
if lvm_data.get('vgs'):
for vg in lvm_data['vgs']:
vg_item = QTreeWidgetItem(vg_root_item)
vg_name = vg.get('vg_name', 'N/A')
vg_item.setText(0, vg_name)
vg_item.setText(1, vg.get('vg_size', 'N/A'))
vg_item.setText(2, vg.get('vg_attr', 'N/A'))
vg_item.setText(3, vg.get('vg_uuid', 'N/A'))
vg_item.setText(4, f"PVs: {vg.get('pv_count', 'N/A')}, LVs: {vg.get('lv_count', 'N/A')}")
vg_item.setText(5, f"空闲: {vg.get('vg_free', 'N/A')}, 已分配: {vg.get('vg_alloc_percent', 'N/A')}%")
vg_item.setText(6, vg.get('vg_fmt', 'N/A'))
vg_item.setText(7, "")
vg_data_for_context = vg.copy()
vg_data_for_context['vg_name'] = vg_name
vg_item.setData(0, Qt.UserRole, {'type': 'vg', 'data': vg_data_for_context})
else:
item = QTreeWidgetItem(vg_root_item)
item.setText(0, "未找到卷组。")
# 逻辑卷 (LVs)
lv_root_item = QTreeWidgetItem(self.ui.treeWidget_lvm)
lv_root_item.setText(0, "逻辑卷 (LVs)")
lv_root_item.setExpanded(True)
lv_root_item.setData(0, Qt.UserRole, {'type': 'lv_root'})
if lvm_data.get('lvs'):
for lv in lvm_data['lvs']:
lv_item = QTreeWidgetItem(lv_root_item)
lv_name = lv.get('lv_name', 'N/A')
vg_name = lv.get('vg_name', 'N/A')
lv_attr = lv.get('lv_attr', '')
lv_path = lv.get('lv_path')
if not lv_path or lv_path == 'N/A':
if vg_name != 'N/A' and lv_name != 'N/A':
lv_path = f"/dev/{vg_name}/{lv_name}"
else:
lv_path = 'N/A'
current_mount_point = self.system_manager.get_mountpoint_for_device(lv_path)
lv_item.setText(0, lv_name)
lv_item.setText(1, lv.get('lv_size', 'N/A'))
lv_item.setText(2, lv_attr)
lv_item.setText(3, lv.get('lv_uuid', 'N/A'))
lv_item.setText(4, f"VG: {vg_name}, Origin: {lv.get('origin', 'N/A')}")
lv_item.setText(5, f"快照: {lv.get('snap_percent', 'N/A')}%")
lv_item.setText(6, lv_path)
lv_item.setText(7, current_mount_point if current_mount_point else "")
lv_data_for_context = lv.copy()
lv_data_for_context['lv_path'] = lv_path
lv_data_for_context['lv_name'] = lv_name
lv_data_for_context['vg_name'] = vg_name
lv_data_for_context['lv_attr'] = lv_attr
lv_item.setData(0, Qt.UserRole, {'type': 'lv', 'data': lv_data_for_context})
else:
item = QTreeWidgetItem(lv_root_item)
item.setText(0, "未找到逻辑卷。")
for i in range(len(lvm_headers)):
self.ui.treeWidget_lvm.resizeColumnToContents(i)
logger.info("LVM信息刷新成功。")
except Exception as e:
QMessageBox.critical(self, "错误", f"刷新LVM信息失败: {e}")
logger.error(f"刷新LVM信息失败: {e}")
def show_lvm_context_menu(self, pos: QPoint):
item = self.ui.treeWidget_lvm.itemAt(pos)
menu = QMenu(self)
create_menu = QMenu("创建...", self)
create_pv_action = create_menu.addAction("创建物理卷 (PV)...")
create_pv_action.triggered.connect(self._handle_create_pv)
create_vg_action = create_menu.addAction("创建卷组 (VG)...")
create_vg_action.triggered.connect(self._handle_create_vg)
create_lv_action = create_menu.addAction("创建逻辑卷 (LV)...")
create_lv_action.triggered.connect(self._handle_create_lv)
menu.addMenu(create_menu)
menu.addSeparator()
if item:
item_data = item.data(0, Qt.UserRole)
if not item_data:
logger.warning(f"无法获取 LVM 项 {item.text(0)} 的详细数据。")
return
item_type = item_data.get('type')
data = item_data.get('data', {})
if item_type == 'pv':
pv_name = data.get('pv_name')
if pv_name and pv_name != 'N/A':
delete_pv_action = menu.addAction(f"删除物理卷 {pv_name}")
delete_pv_action.triggered.connect(lambda: self._handle_delete_pv(pv_name))
elif item_type == 'vg':
vg_name = data.get('vg_name')
if vg_name and vg_name != 'N/A':
delete_vg_action = menu.addAction(f"删除卷组 {vg_name}")
delete_vg_action.triggered.connect(lambda: self._handle_delete_vg(vg_name))
elif item_type == 'lv':
lv_name = data.get('lv_name')
vg_name = data.get('vg_name')
lv_attr = data.get('lv_attr', '')
lv_path = data.get('lv_path')
if lv_name and vg_name and lv_path and lv_path != 'N/A':
if 'a' in lv_attr:
deactivate_lv_action = menu.addAction(f"停用逻辑卷 {lv_name}")
deactivate_lv_action.triggered.connect(lambda: self._handle_deactivate_lv(lv_name, vg_name))
else:
activate_lv_action = menu.addAction(f"激活逻辑卷 {lv_name}")
activate_lv_action.triggered.connect(lambda: self._handle_activate_lv(lv_name, vg_name))
current_mount_point = self.system_manager.get_mountpoint_for_device(lv_path)
if current_mount_point and current_mount_point != '[SWAP]' and current_mount_point != '':
unmount_lv_action = menu.addAction(f"卸载 {lv_name} ({current_mount_point})")
unmount_lv_action.triggered.connect(lambda: self._unmount_and_refresh(lv_path))
else:
mount_lv_action = menu.addAction(f"挂载 {lv_name}...")
mount_lv_action.triggered.connect(lambda: self._handle_mount(lv_path))
menu.addSeparator()
delete_lv_action = menu.addAction(f"删除逻辑卷 {lv_name}")
delete_lv_action.triggered.connect(lambda: self._handle_delete_lv(lv_name, vg_name))
format_lv_action = menu.addAction(f"格式化逻辑卷 {lv_name}...")
format_lv_action.triggered.connect(lambda: self._handle_format_partition(lv_path))
else:
logger.warning(f"逻辑卷 '{lv_name}' (VG: {vg_name}) 的路径无效无法显示操作。Lv Path: {lv_path}")
if menu.actions():
menu.exec(self.ui.treeWidget_lvm.mapToGlobal(pos))
else:
logger.info("右键点击了空白区域或没有可用的LVM操作。")
def _handle_create_pv(self):
available_partitions = self.system_manager.get_unallocated_partitions()
if not available_partitions:
QMessageBox.warning(self, "警告", "没有可用于创建物理卷的未分配分区。")
return
dialog = CreatePvDialog(self, available_partitions)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_pv_info()
if info:
if self.lvm_ops.create_pv(info['device_path']):
self.refresh_all_info()
def _handle_delete_pv(self, device_path):
if self.lvm_ops.delete_pv(device_path):
self.refresh_all_info()
def _handle_create_vg(self):
lvm_info = self.system_manager.get_lvm_info()
available_pvs = []
for pv in lvm_info.get('pvs', []):
pv_name = pv.get('pv_name')
if pv_name and pv_name != 'N/A' and not pv.get('vg_name'):
if pv_name.startswith('/dev/'):
available_pvs.append(pv_name)
else:
available_pvs.append(f"/dev/{pv_name}")
if not available_pvs:
QMessageBox.warning(self, "警告", "没有可用于创建卷组的物理卷。请确保有未分配给任何卷组的物理卷。")
return
dialog = CreateVgDialog(self, available_pvs)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_vg_info()
if info:
if self.lvm_ops.create_vg(info['vg_name'], info['pvs']):
self.refresh_all_info()
def _handle_create_lv(self):
lvm_info = self.system_manager.get_lvm_info()
available_vgs = []
vg_sizes = {}
for vg in lvm_info.get('vgs', []):
vg_name = vg.get('vg_name')
if vg_name and vg_name != 'N/A':
available_vgs.append(vg_name)
# 将字符串转换为小写并去除可能存在的 '<' 符号,以便更准确地匹配
free_size_str = vg.get('vg_free', '0B').strip().lower()
current_vg_size_gb = 0.0
# 修正正则表达式:
# 1. 允许可选的 '<' 符号在数字前
# 2. 确保单位匹配正确,如果单位缺失,默认按 GB 处理 (LVM 的 vg_free 常常默认 GB)
match = re.match(r'<?(\d+\.?\d*)\s*([gmktb])?', free_size_str)
if match:
value = float(match.group(1))
unit = match.group(2) # unit will be 'g', 'm', 't', 'k', 'b' or None
if unit == 'k':
current_vg_size_gb = value / (1024 * 1024) # KB to GB
elif unit == 'm':
current_vg_size_gb = value / 1024 # MB to GB
elif unit == 'g' or unit is None: # 如果单位是 'g' 或没有指定单位,则认为是 GB
current_vg_size_gb = value
elif unit == 't':
current_vg_size_gb = value * 1024 # TB to GB
elif unit == 'b': # Bytes to GB
current_vg_size_gb = value / (1024 * 1024 * 1024)
else:
logger.warning(f"未知LVM单位: '{unit}' for '{free_size_str}'")
else:
logger.warning(f"无法解析LVM空闲大小字符串: '{free_size_str}'")
vg_sizes[vg_name] = current_vg_size_gb
if not available_vgs:
QMessageBox.warning(self, "警告", "没有可用于创建逻辑卷的卷组。")
return
dialog = CreateLvDialog(self, available_vgs, vg_sizes)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_lv_info()
if info:
if self.lvm_ops.create_lv(info['lv_name'], info['vg_name'], info['size_gb'], info['use_max_space']):
self.refresh_all_info()
def _handle_delete_lv(self, lv_name, vg_name):
lv_path = f"/dev/{vg_name}/{lv_name}"
self.disk_ops.unmount_partition(lv_path, show_dialog_on_error=False)
self.disk_ops._remove_fstab_entry(lv_path)
if self.lvm_ops.delete_lv(lv_name, vg_name):
self.refresh_all_info()
def _handle_activate_lv(self, lv_name, vg_name):
if self.lvm_ops.activate_lv(lv_name, vg_name):
self.refresh_all_info()
def _handle_deactivate_lv(self, lv_name, vg_name):
if self.lvm_ops.deactivate_lv(lv_name, vg_name):
self.refresh_all_info()
def _handle_delete_vg(self, vg_name):
"""
处理删除卷组 (VG) 的操作。
"""
logger.info(f"尝试删除卷组 (VG) {vg_name}")
reply = QMessageBox.question(
self,
"确认删除卷组",
f"您确定要删除卷组 {vg_name} 吗?此操作将永久删除该卷组及其所有逻辑卷和数据!",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.No:
logger.info(f"用户取消了删除卷组 {vg_name} 的操作。")
return
lvm_info = self.system_manager.get_lvm_info()
vg_info = next((vg for vg in lvm_info.get('vgs', []) if vg.get('vg_name') == vg_name), None)
lv_count_raw = vg_info.get('lv_count', 0) if vg_info else 0
try:
lv_count = int(float(lv_count_raw))
except (ValueError, TypeError):
lv_count = 0
if lv_count > 0:
QMessageBox.critical(self, "删除失败", f"卷组 {vg_name} 中仍包含逻辑卷。请先删除所有逻辑卷。")
logger.error(f"尝试删除包含逻辑卷的卷组 {vg_name}。操作被阻止。")
return
success = self.lvm_ops.delete_vg(vg_name)
if success:
self.refresh_all_info()
else:
QMessageBox.critical(self, "删除卷组失败", f"删除卷组 {vg_name} 失败,请检查日志。")
if __name__ == "__main__":
app = QApplication(sys.argv)
widget = MainWindow()
widget.show()
sys.exit(app.exec())