Files
diskmanager/mainwindow.py
2026-02-03 04:49:37 +08:00

792 lines
37 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# mainwindow.py
import sys
import logging
import re
import os
from PySide6.QtWidgets import (QApplication, QMainWindow, QTreeWidgetItem,
QMessageBox, QHeaderView, QMenu, QInputDialog, QDialog)
from PySide6.QtCore import Qt, QPoint, QThread # <--- 确保导入 QThread
# 导入自动生成的 UI 文件
from ui_form import Ui_MainWindow
# 导入我们自己编写的系统信息管理模块
from system_info import SystemInfoManager
# 导入日志配置
from logger_config import setup_logging, logger
# 导入磁盘操作模块
from disk_operations import DiskOperations
# 导入 RAID 操作模块
from raid_operations import RaidOperations
# 导入 LVM 操作模块
from lvm_operations import LvmOperations
# 导入自定义对话框
from dialogs import (CreatePartitionDialog, MountDialog, CreateRaidDialog,
CreatePvDialog, CreateVgDialog, CreateLvDialog)
class MainWindow(QMainWindow):
def __init__(self, parent=None):
super().__init__(parent)
self.ui = Ui_MainWindow()
self.ui.setupUi(self)
setup_logging(self.ui.logOutputTextEdit)
logger.info("应用程序启动。")
# 初始化管理器和操作类
self.system_manager = SystemInfoManager()
self.lvm_ops = LvmOperations() # <--- 先初始化 LvmOperations
# <--- 将 lvm_ops 实例传递给 DiskOperations
self.disk_ops = DiskOperations(self.system_manager, self.lvm_ops)
self.raid_ops = RaidOperations()
# self.lvm_ops = LvmOperations() # 这一行是重复的,可以删除
# 连接刷新按钮的信号到槽函数
if hasattr(self.ui, 'refreshButton'):
self.ui.refreshButton.clicked.connect(self.refresh_all_info)
else:
logger.warning("Warning: refreshButton not found in UI. Please add it in form.ui and regenerate ui_form.py.")
# 启用 treeWidget 的自定义上下文菜单
self.ui.treeWidget_block_devices.setContextMenuPolicy(Qt.CustomContextMenu)
self.ui.treeWidget_block_devices.customContextMenuRequested.connect(self.show_block_device_context_menu)
self.ui.treeWidget_raid.setContextMenuPolicy(Qt.CustomContextMenu)
self.ui.treeWidget_raid.customContextMenuRequested.connect(self.show_raid_context_menu)
self.ui.treeWidget_lvm.setContextMenuPolicy(Qt.CustomContextMenu)
self.ui.treeWidget_lvm.customContextMenuRequested.connect(self.show_lvm_context_menu)
# <--- 新增: 连接 DiskOperations 的格式化完成信号
self.disk_ops.formatting_finished.connect(self.on_disk_formatting_finished)
# 可选:连接格式化开始信号,用于显示进度或禁用相关操作
# self.disk_ops.formatting_started.connect(self.on_disk_formatting_started)
# 初始化时刷新所有数据
self.refresh_all_info()
logger.info("所有设备信息已初始化加载。")
def refresh_all_info(self):
"""
刷新所有设备信息块设备、RAID和LVM。
"""
logger.info("开始刷新所有设备信息...")
self.refresh_block_devices_info()
self.refresh_raid_info()
self.refresh_lvm_info()
logger.info("所有设备信息刷新完成。")
# --- 块设备概览 Tab ---
def refresh_block_devices_info(self):
self.ui.treeWidget_block_devices.clear()
columns = [
("设备名", 'name'), ("类型", 'type'), ("大小", 'size'), ("挂载点", 'mountpoint'),
("文件系统", 'fstype'), ("只读", 'ro'), ("UUID", 'uuid'), ("PARTUUID", 'partuuid'),
("厂商", 'vendor'), ("型号", 'model'), ("序列号", 'serial'),
("主次号", 'maj:min'), ("父设备名", 'pkname'),
]
headers = [col[0] for col in columns]
self.field_keys = [col[1] for col in columns]
self.ui.treeWidget_block_devices.setColumnCount(len(headers))
self.ui.treeWidget_block_devices.setHeaderLabels(headers)
for i in range(len(headers)):
self.ui.treeWidget_block_devices.header().setSectionResizeMode(i, QHeaderView.ResizeToContents)
try:
devices = self.system_manager.get_block_devices()
for dev in devices:
self._add_device_to_tree(self.ui.treeWidget_block_devices, dev)
for i in range(len(headers)):
self.ui.treeWidget_block_devices.resizeColumnToContents(i)
logger.info("块设备信息刷新成功。")
except Exception as e:
QMessageBox.critical(self, "错误", f"刷新块设备信息失败: {e}")
logger.error(f"刷新块设备信息失败: {e}")
def _add_device_to_tree(self, parent_item, dev_data):
item = QTreeWidgetItem(parent_item)
for i, key in enumerate(self.field_keys):
value = dev_data.get(key)
if key == 'ro':
item.setText(i, "" if value else "")
elif value is None:
item.setText(i, "")
else:
item.setText(i, str(value))
item.setData(0, Qt.UserRole, dev_data)
if 'children' in dev_data:
for child in dev_data['children']:
self._add_device_to_tree(item, child)
item.setExpanded(True)
def show_block_device_context_menu(self, pos: QPoint):
item = self.ui.treeWidget_block_devices.itemAt(pos)
menu = QMenu(self)
create_menu = QMenu("创建...", self)
create_raid_action = create_menu.addAction("创建 RAID 阵列...")
create_raid_action.triggered.connect(self._handle_create_raid_array)
create_pv_action = create_menu.addAction("创建物理卷 (PV)...")
create_pv_action.triggered.connect(self._handle_create_pv)
menu.addMenu(create_menu)
menu.addSeparator()
if item:
dev_data = item.data(0, Qt.UserRole)
if not dev_data:
logger.warning(f"无法获取设备 {item.text(0)} 的详细数据。")
return
device_name = dev_data.get('name')
device_type = dev_data.get('type')
mount_point = dev_data.get('mountpoint')
device_path = dev_data.get('path')
if not device_path:
device_path = f"/dev/{device_name}"
if device_type == 'disk':
create_partition_action = menu.addAction(f"创建分区 {device_path}...")
create_partition_action.triggered.connect(lambda: self._handle_create_partition(device_path, dev_data))
# --- 新增功能:擦除分区表 ---
wipe_action = menu.addAction(f"擦除分区表 {device_path}...")
wipe_action.triggered.connect(lambda: self._handle_wipe_partition_table(device_path))
# --- 新增功能结束 ---
menu.addSeparator()
if device_type == 'part':
if not mount_point or mount_point == '' or mount_point == 'N/A':
mount_action = menu.addAction(f"挂载 {device_path}...")
mount_action.triggered.connect(lambda: self._handle_mount(device_path))
elif mount_point != '[SWAP]':
unmount_action = menu.addAction(f"卸载 {device_path}")
unmount_action.triggered.connect(lambda: self._unmount_and_refresh(device_path))
menu.addSeparator()
delete_action = menu.addAction(f"删除分区 {device_path}")
delete_action.triggered.connect(lambda: self._handle_delete_partition(device_path))
format_action = menu.addAction(f"格式化分区 {device_path}...")
format_action.triggered.connect(lambda: self._handle_format_partition(device_path))
if menu.actions():
menu.exec(self.ui.treeWidget_block_devices.mapToGlobal(pos))
else:
logger.info("右键点击了空白区域或设备没有可用的操作。")
# --- 新增方法:处理擦除分区表 ---
def _handle_wipe_partition_table(self, device_path):
"""
处理擦除物理盘分区表的操作。
"""
reply = QMessageBox.question(
self,
"确认擦除分区表",
f"您确定要擦除设备 {device_path} 上的所有分区表吗?\n"
f"**此操作将永久删除设备上的所有分区和数据,且不可恢复!**\n"
f"请谨慎操作!",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.No:
logger.info(f"用户取消了擦除 {device_path} 分区表的操作。")
return
logger.warning(f"尝试擦除 {device_path} 上的分区表。")
# 使用 LvmOperations 中通用的 _execute_shell_command 来执行 parted 命令
# 这里不需要 show_dialog=False因为这个操作本身就是同步且需要用户确认的
success, _, stderr = self.lvm_ops._execute_shell_command(
["parted", "-s", device_path, "mklabel", "gpt"], # 默认使用 gpt 分区表类型
f"擦除 {device_path} 上的分区表失败"
)
if success:
QMessageBox.information(self, "成功", f"设备 {device_path} 上的分区表已成功擦除。")
self.refresh_all_info()
else:
# 错误信息已由 _execute_shell_command 处理并显示
pass
# --- 新增方法结束 ---
def _handle_create_partition(self, disk_path, dev_data):
total_disk_mib = 0.0
total_size_str = dev_data.get('size')
logger.debug(f"尝试为磁盘 {disk_path} 创建分区。原始大小字符串: '{total_size_str}'")
if total_size_str:
match = re.match(r'(\d+(\.\d+)?)\s*([KMGT]?B?)', total_size_str, re.IGNORECASE)
if match:
value = float(match.group(1))
unit = match.group(3).upper() if match.group(3) else ''
if unit == 'KB' or unit == 'K': total_disk_mib = value / 1024
elif unit == 'MB' or unit == 'M': total_disk_mib = value
elif unit == 'GB' or unit == 'G': total_disk_mib = value * 1024
elif unit == 'TB' or unit == 'T': total_disk_mib = value * 1024 * 1024
elif unit == 'B':
total_disk_mib = value / (1024 * 1024)
else:
logger.warning(f"无法识别磁盘 {disk_path} 的大小单位: '{unit}' (原始: '{total_size_str}')")
total_disk_mib = 0.0
logger.debug(f"解析后的磁盘总大小 (MiB): {total_disk_mib}")
else:
logger.warning(f"无法解析磁盘 {disk_path} 的大小字符串 '{total_size_str}'。正则表达式不匹配。")
total_disk_mib = 0.0
else:
logger.warning(f"获取磁盘 {disk_path} 的大小字符串为空或None。")
total_disk_mib = 0.0
if total_disk_mib <= 0.0:
QMessageBox.critical(self, "错误", f"无法获取磁盘 {disk_path} 的有效总大小。")
return
start_position_mib = 0.0
max_available_mib = total_disk_mib
if dev_data.get('children'):
logger.debug(f"磁盘 {disk_path} 存在现有分区,尝试计算下一个分区起始位置。")
# 假设 disk_ops.get_disk_free_space_info_mib 能够正确处理
calculated_start_mib, largest_free_space_mib = self.disk_ops.get_disk_free_space_info_mib(disk_path, total_disk_mib)
if calculated_start_mib is None or largest_free_space_mib is None:
QMessageBox.critical(self, "错误", f"无法确定磁盘 {disk_path} 的分区起始位置。")
return
start_position_mib = calculated_start_mib
max_available_mib = largest_free_space_mib
if max_available_mib < 0:
max_available_mib = 0.0
else:
logger.debug(f"磁盘 {disk_path} 没有现有分区,假定从 0 MiB 开始,最大可用空间为整个磁盘。")
max_available_mib = max(0.0, total_disk_mib - 1.0) # 留一点空间,避免边界问题
start_position_mib = 1.0 # 现代分区表通常从1MB或更大偏移开始
logger.debug(f"磁盘 {disk_path} 没有现有分区,假定从 {start_position_mib} MiB 开始,最大可用空间为 {max_available_mib} MiB。")
dialog = CreatePartitionDialog(self, disk_path, total_disk_mib, max_available_mib)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_partition_info()
if info:
if self.disk_ops.create_partition(
info['disk_path'],
info['partition_table_type'],
info['size_gb'],
info['total_disk_mib'],
info['use_max_space']
):
self.refresh_all_info()
def _handle_mount(self, device_path):
dialog = MountDialog(self, device_path)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_mount_info()
if info:
if self.disk_ops.mount_partition(device_path, info['mount_point'], info['add_to_fstab']):
self.refresh_all_info()
def _unmount_and_refresh(self, device_path):
if self.disk_ops.unmount_partition(device_path, show_dialog_on_error=True):
self.refresh_all_info()
def _handle_delete_partition(self, device_path):
if self.disk_ops.delete_partition(device_path):
self.refresh_all_info()
def _handle_format_partition(self, device_path):
"""
调用 DiskOperations 的异步格式化方法。
"""
# DiskOperations.format_partition 会处理用户确认和启动后台线程
self.disk_ops.format_partition(device_path)
# 界面刷新将在格式化完成后由 on_disk_formatting_finished 槽函数触发,所以这里不需要 refresh_all_info()
# <--- 新增: 格式化完成后的槽函数
def on_disk_formatting_finished(self, success, device_path, stdout, stderr):
"""
接收 DiskOperations 发出的格式化完成信号,并刷新界面。
"""
logger.info(f"格式化完成信号接收: 设备 {device_path}, 成功: {success}")
# QMessageBox 已经在 DiskOperations 的 _on_formatting_finished 中处理
self.refresh_all_info() # 刷新所有信息以显示更新后的文件系统类型等
# --- RAID 管理 Tab ---
def refresh_raid_info(self):
self.ui.treeWidget_raid.clear()
raid_headers = [
"阵列设备", "级别", "状态", "大小", "活动设备", "失败设备", "备用设备",
"总设备数", "UUID", "名称", "Chunk Size", "挂载点"
]
self.ui.treeWidget_raid.setColumnCount(len(raid_headers))
self.ui.treeWidget_raid.setHeaderLabels(raid_headers)
for i in range(len(raid_headers)):
self.ui.treeWidget_raid.header().setSectionResizeMode(i, QHeaderView.ResizeToContents)
try:
raid_arrays = self.system_manager.get_mdadm_arrays()
if not raid_arrays:
item = QTreeWidgetItem(self.ui.treeWidget_raid)
item.setText(0, "未找到RAID阵列。")
logger.info("未找到RAID阵列。")
return
for array in raid_arrays:
array_item = QTreeWidgetItem(self.ui.treeWidget_raid)
array_path = array.get('device', 'N/A')
if not array_path or array_path == 'N/A':
logger.warning(f"RAID阵列 '{array.get('name', '未知')}' 的设备路径无效,跳过。")
continue
current_mount_point = self.system_manager.get_mountpoint_for_device(array_path)
array_item.setText(0, array_path)
array_item.setText(1, array.get('level', 'N/A'))
array_item.setText(2, array.get('state', 'N/A'))
array_item.setText(3, array.get('array_size', 'N/A'))
array_item.setText(4, array.get('active_devices', 'N/A'))
array_item.setText(5, array.get('failed_devices', 'N/A'))
array_item.setText(6, array.get('spare_devices', 'N/A'))
array_item.setText(7, array.get('total_devices', 'N/A'))
array_item.setText(8, array.get('uuid', 'N/A'))
array_item.setText(9, array.get('name', 'N/A'))
array_item.setText(10, array.get('chunk_size', 'N/A'))
array_item.setText(11, current_mount_point if current_mount_point else "")
array_item.setExpanded(True)
array_data_for_context = array.copy()
array_data_for_context['device'] = array_path
array_item.setData(0, Qt.UserRole, array_data_for_context)
for member in array.get('member_devices', []):
member_item = QTreeWidgetItem(array_item)
member_item.setText(0, f" {member.get('device_path', 'N/A')}")
member_item.setText(1, f"成员: {member.get('raid_device', 'N/A')}")
member_item.setText(2, member.get('state', 'N/A'))
member_item.setText(3, f"Major: {member.get('major', 'N/A')}, Minor: {member.get('minor', 'N/A')}")
for i in range(len(raid_headers)):
self.ui.treeWidget_raid.resizeColumnToContents(i)
logger.info("RAID阵列信息刷新成功。")
except Exception as e:
QMessageBox.critical(self, "错误", f"刷新RAID阵列信息失败: {e}")
logger.error(f"刷新RAID阵列信息失败: {e}")
def show_raid_context_menu(self, pos: QPoint):
item = self.ui.treeWidget_raid.itemAt(pos)
menu = QMenu(self)
create_raid_action = menu.addAction("创建 RAID 阵列...")
create_raid_action.triggered.connect(self._handle_create_raid_array)
menu.addSeparator()
if item and item.parent() is None:
array_data = item.data(0, Qt.UserRole)
if not array_data:
logger.warning(f"无法获取 RAID 阵列 {item.text(0)} 的详细数据。")
return
array_path = array_data.get('device')
member_devices = [m.get('device_path') for m in array_data.get('member_devices', [])]
if not array_path or array_path == 'N/A':
logger.warning(f"RAID阵列 '{array_data.get('name', '未知')}' 的设备路径无效,无法显示操作。")
return
current_mount_point = self.system_manager.get_mountpoint_for_device(array_path)
if current_mount_point and current_mount_point != '[SWAP]' and current_mount_point != '':
unmount_action = menu.addAction(f"卸载 {array_path} ({current_mount_point})")
unmount_action.triggered.connect(lambda: self._unmount_and_refresh(array_path))
else:
mount_action = menu.addAction(f"挂载 {array_path}...")
mount_action.triggered.connect(lambda: self._handle_mount(array_path))
menu.addSeparator()
stop_action = menu.addAction(f"停止阵列 {array_path}")
stop_action.triggered.connect(lambda: self._handle_stop_raid_array(array_path))
delete_action = menu.addAction(f"删除阵列 {array_path}")
delete_action.triggered.connect(lambda: self._handle_delete_raid_array(array_path, member_devices))
format_action = menu.addAction(f"格式化阵列 {array_path}...")
format_action.triggered.connect(lambda: self._handle_format_raid_array(array_path))
if menu.actions():
menu.exec(self.ui.treeWidget_raid.mapToGlobal(pos))
else:
logger.info("右键点击了空白区域或没有可用的RAID操作。")
def _handle_create_raid_array(self):
available_devices = self.system_manager.get_unallocated_partitions()
if not available_devices:
QMessageBox.warning(self, "警告", "没有可用于创建 RAID 阵列的设备。请确保有未挂载、未被LVM或RAID使用的磁盘或分区。")
return
dialog = CreateRaidDialog(self, available_devices)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_raid_info()
if info:
if self.raid_ops.create_raid_array(info['devices'], info['level'], info['chunk_size']):
self.refresh_all_info()
def _handle_stop_raid_array(self, array_path):
if self.raid_ops.stop_raid_array(array_path):
self.refresh_all_info()
def _handle_delete_raid_array(self, array_path, member_devices):
if self.raid_ops.delete_raid_array(array_path, member_devices):
self.refresh_all_info()
def _handle_format_raid_array(self, array_path):
"""
处理 RAID 阵列的格式化。
"""
# 这将调用 DiskOperations 的异步格式化方法
self.disk_ops.format_partition(array_path)
# 刷新操作会在 on_disk_formatting_finished 中触发,所以这里不需要 refresh_all_info()
# --- LVM 管理 Tab ---
def refresh_lvm_info(self):
self.ui.treeWidget_lvm.clear()
lvm_headers = ["名称", "大小", "属性", "UUID", "关联", "空闲/已用", "路径/格式", "挂载点"]
self.ui.treeWidget_lvm.setColumnCount(len(lvm_headers))
self.ui.treeWidget_lvm.setHeaderLabels(lvm_headers)
for i in range(len(lvm_headers)):
self.ui.treeWidget_lvm.header().setSectionResizeMode(i, QHeaderView.ResizeToContents)
try:
lvm_data = self.system_manager.get_lvm_info()
if not lvm_data.get('pvs') and not lvm_data.get('vgs') and not lvm_data.get('lvs'):
item = QTreeWidgetItem(self.ui.treeWidget_lvm)
item.setText(0, "未找到LVM信息。")
logger.info("未找到LVM信息。")
return
# 物理卷 (PVs)
pv_root_item = QTreeWidgetItem(self.ui.treeWidget_lvm)
pv_root_item.setText(0, "物理卷 (PVs)")
pv_root_item.setExpanded(True)
pv_root_item.setData(0, Qt.UserRole, {'type': 'pv_root'})
if lvm_data.get('pvs'):
for pv in lvm_data['pvs']:
pv_item = QTreeWidgetItem(pv_root_item)
pv_name = pv.get('pv_name', 'N/A')
if pv_name.startswith('/dev/'):
pv_path = pv_name
else:
pv_path = f"/dev/{pv_name}" if pv_name != 'N/A' else 'N/A'
pv_item.setText(0, pv_name)
pv_item.setText(1, pv.get('pv_size', 'N/A'))
pv_item.setText(2, pv.get('pv_attr', 'N/A'))
pv_item.setText(3, pv.get('pv_uuid', 'N/A'))
pv_item.setText(4, f"VG: {pv.get('vg_name', 'N/A')}")
pv_item.setText(5, f"空闲: {pv.get('pv_free', 'N/A')}")
pv_item.setText(6, pv.get('pv_fmt', 'N/A'))
pv_item.setText(7, "")
pv_data_for_context = pv.copy()
pv_data_for_context['pv_name'] = pv_path
pv_item.setData(0, Qt.UserRole, {'type': 'pv', 'data': pv_data_for_context})
else:
item = QTreeWidgetItem(pv_root_item)
item.setText(0, "未找到物理卷。")
# 卷组 (VGs)
vg_root_item = QTreeWidgetItem(self.ui.treeWidget_lvm)
vg_root_item.setText(0, "卷组 (VGs)")
vg_root_item.setExpanded(True)
vg_root_item.setData(0, Qt.UserRole, {'type': 'vg_root'})
if lvm_data.get('vgs'):
for vg in lvm_data['vgs']:
vg_item = QTreeWidgetItem(vg_root_item)
vg_name = vg.get('vg_name', 'N/A')
vg_item.setText(0, vg_name)
vg_item.setText(1, vg.get('vg_size', 'N/A'))
vg_item.setText(2, vg.get('vg_attr', 'N/A'))
vg_item.setText(3, vg.get('vg_uuid', 'N/A'))
vg_item.setText(4, f"PVs: {vg.get('pv_count', 'N/A')}, LVs: {vg.get('lv_count', 'N/A')}")
vg_item.setText(5, f"空闲: {vg.get('vg_free', 'N/A')}, 已分配: {vg.get('vg_alloc_percent', 'N/A')}%")
vg_item.setText(6, vg.get('vg_fmt', 'N/A'))
vg_item.setText(7, "")
vg_data_for_context = vg.copy()
vg_data_for_context['vg_name'] = vg_name
vg_item.setData(0, Qt.UserRole, {'type': 'vg', 'data': vg_data_for_context})
else:
item = QTreeWidgetItem(vg_root_item)
item.setText(0, "未找到卷组。")
# 逻辑卷 (LVs)
lv_root_item = QTreeWidgetItem(self.ui.treeWidget_lvm)
lv_root_item.setText(0, "逻辑卷 (LVs)")
lv_root_item.setExpanded(True)
lv_root_item.setData(0, Qt.UserRole, {'type': 'lv_root'})
if lvm_data.get('lvs'):
for lv in lvm_data['lvs']:
lv_item = QTreeWidgetItem(lv_root_item)
lv_name = lv.get('lv_name', 'N/A')
vg_name = lv.get('vg_name', 'N/A')
lv_attr = lv.get('lv_attr', '')
lv_path = lv.get('lv_path')
if not lv_path or lv_path == 'N/A':
if vg_name != 'N/A' and lv_name != 'N/A':
lv_path = f"/dev/{vg_name}/{lv_name}"
else:
lv_path = 'N/A'
current_mount_point = self.system_manager.get_mountpoint_for_device(lv_path)
lv_item.setText(0, lv_name)
lv_item.setText(1, lv.get('lv_size', 'N/A'))
lv_item.setText(2, lv_attr)
lv_item.setText(3, lv.get('lv_uuid', 'N/A'))
lv_item.setText(4, f"VG: {vg_name}, Origin: {lv.get('origin', 'N/A')}")
lv_item.setText(5, f"快照: {lv.get('snap_percent', 'N/A')}%")
lv_item.setText(6, lv_path)
lv_item.setText(7, current_mount_point if current_mount_point else "")
lv_data_for_context = lv.copy()
lv_data_for_context['lv_path'] = lv_path
lv_data_for_context['lv_name'] = lv_name
lv_data_for_context['vg_name'] = vg_name
lv_data_for_context['lv_attr'] = lv_attr
lv_item.setData(0, Qt.UserRole, {'type': 'lv', 'data': lv_data_for_context})
else:
item = QTreeWidgetItem(lv_root_item)
item.setText(0, "未找到逻辑卷。")
for i in range(len(lvm_headers)):
self.ui.treeWidget_lvm.resizeColumnToContents(i)
logger.info("LVM信息刷新成功。")
except Exception as e:
QMessageBox.critical(self, "错误", f"刷新LVM信息失败: {e}")
logger.error(f"刷新LVM信息失败: {e}")
def show_lvm_context_menu(self, pos: QPoint):
item = self.ui.treeWidget_lvm.itemAt(pos)
menu = QMenu(self)
create_menu = QMenu("创建...", self)
create_pv_action = create_menu.addAction("创建物理卷 (PV)...")
create_pv_action.triggered.connect(self._handle_create_pv)
create_vg_action = create_menu.addAction("创建卷组 (VG)...")
create_vg_action.triggered.connect(self._handle_create_vg)
create_lv_action = create_menu.addAction("创建逻辑卷 (LV)...")
create_lv_action.triggered.connect(self._handle_create_lv)
menu.addMenu(create_menu)
menu.addSeparator()
if item:
item_data = item.data(0, Qt.UserRole)
if not item_data:
logger.warning(f"无法获取 LVM 项 {item.text(0)} 的详细数据。")
return
item_type = item_data.get('type')
data = item_data.get('data', {})
if item_type == 'pv':
pv_name = data.get('pv_name')
if pv_name and pv_name != 'N/A':
delete_pv_action = menu.addAction(f"删除物理卷 {pv_name}")
delete_pv_action.triggered.connect(lambda: self._handle_delete_pv(pv_name))
elif item_type == 'vg':
vg_name = data.get('vg_name')
if vg_name and vg_name != 'N/A':
delete_vg_action = menu.addAction(f"删除卷组 {vg_name}")
delete_vg_action.triggered.connect(lambda: self._handle_delete_vg(vg_name))
elif item_type == 'lv':
lv_name = data.get('lv_name')
vg_name = data.get('vg_name')
lv_attr = data.get('lv_attr', '')
lv_path = data.get('lv_path')
if lv_name and vg_name and lv_path and lv_path != 'N/A':
if 'a' in lv_attr:
deactivate_lv_action = menu.addAction(f"停用逻辑卷 {lv_name}")
deactivate_lv_action.triggered.connect(lambda: self._handle_deactivate_lv(lv_name, vg_name))
else:
activate_lv_action = menu.addAction(f"激活逻辑卷 {lv_name}")
activate_lv_action.triggered.connect(lambda: self._handle_activate_lv(lv_name, vg_name))
current_mount_point = self.system_manager.get_mountpoint_for_device(lv_path)
if current_mount_point and current_mount_point != '[SWAP]' and current_mount_point != '':
unmount_lv_action = menu.addAction(f"卸载 {lv_name} ({current_mount_point})")
unmount_lv_action.triggered.connect(lambda: self._unmount_and_refresh(lv_path))
else:
mount_lv_action = menu.addAction(f"挂载 {lv_name}...")
mount_lv_action.triggered.connect(lambda: self._handle_mount(lv_path))
menu.addSeparator()
delete_lv_action = menu.addAction(f"删除逻辑卷 {lv_name}")
delete_lv_action.triggered.connect(lambda: self._handle_delete_lv(lv_name, vg_name))
format_lv_action = menu.addAction(f"格式化逻辑卷 {lv_name}...")
format_lv_action.triggered.connect(lambda: self._handle_format_partition(lv_path))
else:
logger.warning(f"逻辑卷 '{lv_name}' (VG: {vg_name}) 的路径无效无法显示操作。Lv Path: {lv_path}")
if menu.actions():
menu.exec(self.ui.treeWidget_lvm.mapToGlobal(pos))
else:
logger.info("右键点击了空白区域或没有可用的LVM操作。")
def _handle_create_pv(self):
available_partitions = self.system_manager.get_unallocated_partitions()
if not available_partitions:
QMessageBox.warning(self, "警告", "没有可用于创建物理卷的未分配分区。")
return
dialog = CreatePvDialog(self, available_partitions)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_pv_info()
if info:
if self.lvm_ops.create_pv(info['device_path']):
self.refresh_all_info()
def _handle_delete_pv(self, device_path):
if self.lvm_ops.delete_pv(device_path):
self.refresh_all_info()
def _handle_create_vg(self):
lvm_info = self.system_manager.get_lvm_info()
available_pvs = []
for pv in lvm_info.get('pvs', []):
pv_name = pv.get('pv_name')
if pv_name and pv_name != 'N/A' and not pv.get('vg_name'):
if pv_name.startswith('/dev/'):
available_pvs.append(pv_name)
else:
available_pvs.append(f"/dev/{pv_name}")
if not available_pvs:
QMessageBox.warning(self, "警告", "没有可用于创建卷组的物理卷。请确保有未分配给任何卷组的物理卷。")
return
dialog = CreateVgDialog(self, available_pvs)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_vg_info()
if info:
if self.lvm_ops.create_vg(info['vg_name'], info['pvs']):
self.refresh_all_info()
def _handle_create_lv(self):
lvm_info = self.system_manager.get_lvm_info()
available_vgs = []
vg_sizes = {}
for vg in lvm_info.get('vgs', []):
vg_name = vg.get('vg_name')
if vg_name and vg_name != 'N/A':
available_vgs.append(vg_name)
# 将字符串转换为小写并去除可能存在的 '<' 符号,以便更准确地匹配
free_size_str = vg.get('vg_free', '0B').strip().lower()
current_vg_size_gb = 0.0
# 修正正则表达式:
# 1. 允许可选的 '<' 符号在数字前
# 2. 确保单位匹配正确,如果单位缺失,默认按 GB 处理 (LVM 的 vg_free 常常默认 GB)
match = re.match(r'<?(\d+\.?\d*)\s*([gmktb])?', free_size_str)
if match:
value = float(match.group(1))
unit = match.group(2) # unit will be 'g', 'm', 't', 'k', 'b' or None
if unit == 'k':
current_vg_size_gb = value / (1024 * 1024) # KB to GB
elif unit == 'm':
current_vg_size_gb = value / 1024 # MB to GB
elif unit == 'g' or unit is None: # 如果单位是 'g' 或没有指定单位,则认为是 GB
current_vg_size_gb = value
elif unit == 't':
current_vg_size_gb = value * 1024 # TB to GB
elif unit == 'b': # Bytes to GB
current_vg_size_gb = value / (1024 * 1024 * 1024)
else:
logger.warning(f"未知LVM单位: '{unit}' for '{free_size_str}'")
else:
logger.warning(f"无法解析LVM空闲大小字符串: '{free_size_str}'")
vg_sizes[vg_name] = current_vg_size_gb
if not available_vgs:
QMessageBox.warning(self, "警告", "没有可用于创建逻辑卷的卷组。")
return
dialog = CreateLvDialog(self, available_vgs, vg_sizes)
if dialog.exec() == QDialog.Accepted:
info = dialog.get_lv_info()
if info:
if self.lvm_ops.create_lv(info['lv_name'], info['vg_name'], info['size_gb'], info['use_max_space']):
self.refresh_all_info()
def _handle_delete_lv(self, lv_name, vg_name):
lv_path = f"/dev/{vg_name}/{lv_name}"
self.disk_ops.unmount_partition(lv_path, show_dialog_on_error=False)
self.disk_ops._remove_fstab_entry(lv_path)
if self.lvm_ops.delete_lv(lv_name, vg_name):
self.refresh_all_info()
def _handle_activate_lv(self, lv_name, vg_name):
if self.lvm_ops.activate_lv(lv_name, vg_name):
self.refresh_all_info()
def _handle_deactivate_lv(self, lv_name, vg_name):
if self.lvm_ops.deactivate_lv(lv_name, vg_name):
self.refresh_all_info()
def _handle_delete_vg(self, vg_name):
"""
处理删除卷组 (VG) 的操作。
"""
logger.info(f"尝试删除卷组 (VG) {vg_name}")
reply = QMessageBox.question(
self,
"确认删除卷组",
f"您确定要删除卷组 {vg_name} 吗?此操作将永久删除该卷组及其所有逻辑卷和数据!",
QMessageBox.Yes | QMessageBox.No,
QMessageBox.No
)
if reply == QMessageBox.No:
logger.info(f"用户取消了删除卷组 {vg_name} 的操作。")
return
lvm_info = self.system_manager.get_lvm_info()
vg_info = next((vg for vg in lvm_info.get('vgs', []) if vg.get('vg_name') == vg_name), None)
lv_count_raw = vg_info.get('lv_count', 0) if vg_info else 0
try:
lv_count = int(float(lv_count_raw))
except (ValueError, TypeError):
lv_count = 0
if lv_count > 0:
QMessageBox.critical(self, "删除失败", f"卷组 {vg_name} 中仍包含逻辑卷。请先删除所有逻辑卷。")
logger.error(f"尝试删除包含逻辑卷的卷组 {vg_name}。操作被阻止。")
return
success = self.lvm_ops.delete_vg(vg_name)
if success:
self.refresh_all_info()
else:
QMessageBox.critical(self, "删除卷组失败", f"删除卷组 {vg_name} 失败,请检查日志。")
if __name__ == "__main__":
app = QApplication(sys.argv)
widget = MainWindow()
widget.show()
sys.exit(app.exec())