Files
diskmanager/system_info.py
2026-02-02 22:31:02 +08:00

549 lines
28 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# system_info.py
import subprocess
import json
import logging
import re
import os
logger = logging.getLogger(__name__)
class SystemInfoManager:
def __init__(self):
pass
def _run_command(self, command_list, root_privilege=False, check_output=True, input_data=None):
"""
通用地运行一个 shell 命令。
:param command_list: 命令及其参数的列表。
:param root_privilege: 如果为 True则使用 sudo 执行命令。
:param check_output: 如果为 True则捕获 stdout 和 stderr。如果为 False则不捕获用于需要交互或不关心输出的命令。
:param input_data: 传递给命令stdin的数据 (str)。
:return: (stdout_str, stderr_str)
:raises subprocess.CalledProcessError: 如果命令返回非零退出码。
"""
if root_privilege:
command_list = ["sudo"] + command_list
logger.debug(f"运行命令: {' '.join(command_list)}")
try:
result = subprocess.run(
command_list,
capture_output=check_output,
text=True,
check=True,
encoding='utf-8',
input=input_data
)
return result.stdout if check_output else "", result.stderr if check_output else ""
except subprocess.CalledProcessError as e:
logger.error(f"命令执行失败: {' '.join(command_list)}")
logger.error(f"退出码: {e.returncode}")
logger.error(f"标准输出: {e.stdout.strip()}")
logger.error(f"标准错误: {e.stderr.strip()}")
raise
except FileNotFoundError:
logger.error(f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。")
raise
except Exception as e:
logger.error(f"运行命令 {' '.join(command_list)} 时发生未知错误: {e}")
raise
def get_block_devices(self):
"""
使用 lsblk 获取块设备信息。
返回一个字典列表,每个字典代表一个设备。
"""
cmd = [
"lsblk", "-J", "-o",
"NAME,FSTYPE,SIZE,MOUNTPOINT,RO,TYPE,UUID,PARTUUID,VENDOR,MODEL,SERIAL,MAJ:MIN,PKNAME,PATH"
]
try:
stdout, _ = self._run_command(cmd)
data = json.loads(stdout)
devices = data.get('blockdevices', [])
def add_path_recursive(dev_list):
for dev in dev_list:
if 'PATH' not in dev and 'NAME' in dev:
dev['PATH'] = f"/dev/{dev['NAME']}"
# Rename PATH to path (lowercase) for consistency
if 'PATH' in dev:
dev['path'] = dev.pop('PATH')
if 'children' in dev:
add_path_recursive(dev['children'])
add_path_recursive(devices)
return devices
except Exception as e:
logger.error(f"获取块设备信息失败: {e}")
return []
def _find_device_by_path_recursive(self, dev_list, target_path):
"""
Helper to find device data by its path recursively.
Added type check for target_path.
"""
if not isinstance(target_path, str):
logger.warning(f"传入 _find_device_by_path_recursive 的 target_path 不是字符串: {target_path} (类型: {type(target_path)})")
return None
for dev in dev_list:
if dev.get('path') == target_path:
return dev
if 'children' in dev:
found = self._find_device_by_path_recursive(dev['children'], target_path)
if found:
return found
return None
def _find_device_by_maj_min_recursive(self, dev_list, target_maj_min):
"""
Helper to find device data by its major:minor number recursively.
"""
if not isinstance(target_maj_min, str):
logger.warning(f"传入 _find_device_by_maj_min_recursive 的 target_maj_min 不是字符串: {target_maj_min} (类型: {type(target_maj_min)})")
return None
for dev in dev_list:
if dev.get('maj:min') == target_maj_min:
return dev
if 'children' in dev:
found = self._find_device_by_maj_min_recursive(dev['children'], target_maj_min)
if found:
return found
return None
def get_mountpoint_for_device(self, device_path):
"""
根据设备路径获取其挂载点。
此方法现在可以正确处理 /dev/md/new_raid 这样的 RAID 阵列别名,以及 LVM 逻辑卷的符号链接。
"""
if not isinstance(device_path, str):
logger.warning(f"传入 get_mountpoint_for_device 的 device_path 不是字符串: {device_path} (类型: {type(device_path)})")
return None
original_device_path = device_path
logger.debug(f"get_mountpoint_for_device: 正在处理原始设备路径: {original_device_path}")
devices = self.get_block_devices()
logger.debug(f"get_mountpoint_for_device: 从 lsblk 获取到 {len(devices)} 个块设备信息。")
target_maj_min = None
current_search_path = original_device_path
# 1. 解析符号链接以获取实际路径,并尝试获取 maj:min
if original_device_path.startswith('/dev/') and os.path.exists(original_device_path):
try:
# 先尝试解析真实路径,因为 stat 可能对符号链接返回链接本身的 maj:min
resolved_path = os.path.realpath(original_device_path)
logger.debug(f"get_mountpoint_for_device: 原始设备路径 {original_device_path} 解析为 {resolved_path}")
current_search_path = resolved_path # 使用解析后的路径进行 stat 和后续查找
# 获取解析后路径的 maj:min
stat_output, _ = self._run_command(["stat", "-c", "%t:%T", resolved_path], check_output=True)
raw_maj_min = stat_output.strip() # e.g., "fc:0"
# MODIFIED: Convert hex major to decimal
if ':' in raw_maj_min:
major_hex, minor_dec = raw_maj_min.split(':')
try:
major_dec = str(int(major_hex, 16)) # Convert hex to decimal string
target_maj_min = f"{major_dec}:{minor_dec}" # e.g., "252:0"
logger.debug(f"get_mountpoint_for_device: 解析后设备 {resolved_path} 的 maj:min (hex) 为 {raw_maj_min},转换为 decimal 为 {target_maj_min}")
except ValueError:
logger.warning(f"get_mountpoint_for_device: 无法将 maj:min 的主要部分 '{major_hex}' 从十六进制转换为十进制。")
target_maj_min = None # Fallback if conversion fails
else:
logger.warning(f"get_mountpoint_for_device: stat 输出 '{raw_maj_min}' 格式不符合预期。")
target_maj_min = None
except subprocess.CalledProcessError as e:
logger.debug(f"get_mountpoint_for_device: 无法获取 {resolved_path} 的 maj:min (命令失败): {e.stderr.strip()}")
except Exception as e:
logger.debug(f"get_mountpoint_for_device: 无法获取 {resolved_path} 的 maj:min (未知错误): {e}")
elif original_device_path.startswith('/dev/') and not os.path.exists(original_device_path):
logger.warning(f"get_mountpoint_for_device: 设备路径 {original_device_path} 不存在,无法获取挂载点。")
return None
# 2. 尝试使用 (可能已解析的) current_search_path 在 lsblk 输出中查找
dev_info = self._find_device_by_path_recursive(devices, current_search_path)
logger.debug(f"get_mountpoint_for_device: _find_device_by_path_recursive 查找结果 (使用 {current_search_path}): {dev_info}")
if dev_info:
mountpoint = dev_info.get('mountpoint')
logger.debug(f"get_mountpoint_for_device: 直接从 lsblk 获取到 {original_device_path} (解析为 {current_search_path}) 的挂载点: {mountpoint}")
return mountpoint
# 3. 如果直接路径查找失败,并且我们有 maj:min尝试通过 maj:min 查找
if target_maj_min:
logger.debug(f"get_mountpoint_for_device: 直接路径查找失败,尝试通过 maj:min ({target_maj_min}) 查找。")
dev_info_by_maj_min = self._find_device_by_maj_min_recursive(devices, target_maj_min)
logger.debug(f"get_mountpoint_for_device: 通过 maj:min 查找结果: {dev_info_by_maj_min}")
if dev_info_by_maj_min:
mountpoint = dev_info_by_maj_min.get('mountpoint')
logger.debug(f"get_mountpoint_for_device: 通过 maj:min 找到 {original_device_path} 的挂载点: {mountpoint}")
return mountpoint
# 4. 如果仍然查找失败,并且是 RAID 阵列(例如 /dev/md/new_raid
if original_device_path.startswith('/dev/md'):
logger.debug(f"get_mountpoint_for_device: 正在处理 RAID 阵列 {original_device_path} 以获取挂载点...")
actual_md_device_path = self._get_actual_md_device_path(original_device_path)
logger.debug(f"get_mountpoint_for_device: RAID 阵列 {original_device_path} 的实际设备路径: {actual_md_device_path}")
if actual_md_device_path:
actual_dev_info = self._find_device_by_path_recursive(devices, actual_md_device_path)
logger.debug(f"get_mountpoint_for_device: 实际设备路径 {actual_md_device_path} 的 lsblk 详情: {actual_dev_info}")
if actual_dev_info:
mountpoint = actual_dev_info.get('mountpoint')
logger.debug(f"get_mountpoint_for_device: 在实际设备 {actual_md_device_path} 上找到了挂载点: {mountpoint}")
return mountpoint
logger.debug(f"get_mountpoint_for_device: 未能获取到 {original_device_path} 的挂载点。")
return None
def get_mdadm_arrays(self):
"""
获取 mdadm RAID 阵列信息。
"""
cmd = ["mdadm", "--detail", "--scan"]
try:
stdout, _ = self._run_command(cmd)
arrays = []
for line in stdout.splitlines():
if line.startswith("ARRAY"):
parts = line.split(' ')
array_path = parts[1] # e.g., /dev/md0 or /dev/md/new_raid
# Use mdadm --detail for more specific info
detail_cmd = ["mdadm", "--detail", array_path]
detail_stdout, _ = self._run_command(detail_cmd)
array_info = self._parse_mdadm_detail(detail_stdout, array_path)
arrays.append(array_info)
return arrays
except subprocess.CalledProcessError as e:
if "No arrays found" in e.stderr or "No arrays found" in e.stdout: # mdadm --detail --scan might exit 1 if no arrays
logger.info("未找到任何RAID阵列。")
return []
logger.error(f"获取RAID阵列信息失败: {e}")
return []
except Exception as e:
logger.error(f"获取RAID阵列信息失败: {e}")
return []
def _parse_mdadm_detail(self, detail_output, array_path):
"""
解析 mdadm --detail 的输出。
"""
info = {
'device': array_path,
'level': 'N/A',
'state': 'N/A',
'array_size': 'N/A',
'active_devices': 'N/A',
'failed_devices': 'N/A',
'spare_devices': 'N/A',
'total_devices': 'N/A',
'uuid': 'N/A',
'name': 'N/A',
'chunk_size': 'N/A',
'member_devices': []
}
member_pattern = re.compile(r'^\s*(\d+)\s+(\d+)\s+(\d+)\s+(.+?)\s+(/dev/.+)$')
for line in detail_output.splitlines():
if "Raid Level :" in line:
info['level'] = line.split(':')[-1].strip()
elif "Array Size :" in line:
info['array_size'] = line.split(':')[-1].strip()
elif "State :" in line:
info['state'] = line.split(':')[-1].strip()
elif "Active Devices :" in line:
info['active_devices'] = line.split(':')[-1].strip()
elif "Failed Devices :" in line:
info['failed_devices'] = line.split(':')[-1].strip()
elif "Spare Devices :" in line:
info['spare_devices'] = line.split(':')[-1].strip()
elif "Total Devices :" in line:
info['total_devices'] = line.split(':')[-1].strip()
elif "UUID :" in line:
info['uuid'] = line.split(':')[-1].strip()
elif "Name :" in line:
info['name'] = line.split(':')[-1].strip()
elif "Chunk Size :" in line:
info['chunk_size'] = line.split(':')[-1].strip()
# Member devices
match = member_pattern.match(line)
if match:
member_info = {
'number': match.group(1),
'major': match.group(2),
'minor': match.group(3),
'raid_device': match.group(4),
'device_path': match.group(5)
}
info['member_devices'].append(member_info)
return info
def get_lvm_info(self):
"""
获取 LVM 物理卷 (PVs)、卷组 (VGs) 和逻辑卷 (LVs) 的信息。
"""
lvm_info = {'pvs': [], 'vgs': [], 'lvs': []}
# Get PVs
try:
stdout, _ = self._run_command(["pvs", "--reportformat", "json"])
data = json.loads(stdout)
if 'report' in data and data['report']:
for pv_data in data['report'][0].get('pv', []):
lvm_info['pvs'].append({
'pv_name': pv_data.get('pv_name'),
'vg_name': pv_data.get('vg_name'),
'pv_uuid': pv_data.get('pv_uuid'),
'pv_size': pv_data.get('pv_size'),
'pv_free': pv_data.get('pv_free'),
'pv_attr': pv_data.get('pv_attr'),
'pv_fmt': pv_data.get('pv_fmt')
})
except subprocess.CalledProcessError as e:
if "No physical volume found" in e.stderr or "No physical volumes found" in e.stdout:
logger.info("未找到任何LVM物理卷。")
else:
logger.error(f"获取LVM物理卷信息失败: {e}")
except Exception as e:
logger.error(f"获取LVM物理卷信息失败: {e}")
# Get VGs
try:
stdout, _ = self._run_command(["vgs", "--reportformat", "json"])
data = json.loads(stdout)
if 'report' in data and data['report']:
for vg_data in data['report'][0].get('vg', []):
lvm_info['vgs'].append({
'vg_name': vg_data.get('vg_name'),
'vg_uuid': vg_data.get('vg_uuid'),
'vg_size': vg_data.get('vg_size'),
'vg_free': vg_data.get('vg_free'),
'vg_attr': vg_data.get('vg_attr'),
'pv_count': vg_data.get('pv_count'),
'lv_count': vg_data.get('lv_count'),
'vg_alloc_percent': vg_data.get('vg_alloc_percent'),
'vg_fmt': vg_data.get('vg_fmt')
})
except subprocess.CalledProcessError as e:
if "No volume group found" in e.stderr or "No volume groups found" in e.stdout:
logger.info("未找到任何LVM卷组。")
else:
logger.error(f"获取LVM卷组信息失败: {e}")
except Exception as e:
logger.error(f"获取LVM卷组信息失败: {e}")
# Get LVs (MODIFIED: added -o lv_path)
try:
# 明确请求 lv_path因为默认的 --reportformat json 不包含它
stdout, _ = self._run_command(["lvs", "-o", "lv_name,vg_name,lv_uuid,lv_size,lv_attr,origin,snap_percent,lv_path", "--reportformat", "json"])
data = json.loads(stdout)
if 'report' in data and data['report']:
for lv_data in data['report'][0].get('lv', []):
lvm_info['lvs'].append({
'lv_name': lv_data.get('lv_name'),
'vg_name': lv_data.get('vg_name'),
'lv_uuid': lv_data.get('lv_uuid'),
'lv_size': lv_data.get('lv_size'),
'lv_attr': lv_data.get('lv_attr'),
'origin': lv_data.get('origin'),
'snap_percent': lv_data.get('snap_percent'),
'lv_path': lv_data.get('lv_path') # 现在应该能正确获取到路径了
})
except subprocess.CalledProcessError as e:
if "No logical volume found" in e.stderr or "No logical volumes found" in e.stdout:
logger.info("未找到任何LVM逻辑卷。")
else:
logger.error(f"获取LVM逻辑卷信息失败: {e}")
except Exception as e:
logger.error(f"获取LVM逻辑卷信息失败: {e}")
return lvm_info
def get_unallocated_partitions(self):
"""
获取所有可用于创建RAID阵列或LVM物理卷的设备。
这些设备包括:
1. 没有分区表的整个磁盘。
2. 未挂载的分区包括有文件系统但未挂载的如ext4/xfs等
3. 未被LVM或RAID使用的分区或磁盘。
返回一个设备路径列表,例如 ['/dev/sdb1', '/dev/sdc']。
"""
block_devices = self.get_block_devices()
candidates = []
def process_device(dev):
dev_path = dev.get('path')
dev_type = dev.get('type')
mountpoint = dev.get('mountpoint')
fstype = dev.get('fstype')
# 1. 检查是否已挂载 (排除SWAP因为SWAP可以被覆盖但如果活跃则不应动)
# 如果是活跃挂载点非SWAP则不是候选
if mountpoint and mountpoint != '[SWAP]':
# 如果设备已挂载,则它不是候选。
# 但仍需处理其子设备,因为父设备挂载不代表子设备不能用(虽然不常见)
# 或者子设备挂载不代表父设备不能用(例如使用整个磁盘)
if 'children' in dev:
for child in dev['children']:
process_device(child)
return
# 2. 检查是否是LVM物理卷或RAID成员
# 如果是LVM PV或RAID成员则它不是新RAID/PV的候选
if fstype in ['LVM2_member', 'linux_raid_member']:
if 'children' in dev: # 即使是LVM/RAID成员也可能存在子设备例如LVM上的分区虽然不常见
for child in dev['children']:
process_device(child)
return
# 3. 如果是整个磁盘且没有分区,则是一个候选
if dev_type == 'disk' and not dev.get('children'):
candidates.append(dev_path)
# 4. 如果是分区且通过了上述挂载和LVM/RAID检查则是一个候选
elif dev_type == 'part':
candidates.append(dev_path)
# 5. 递归处理子设备 (对于有分区的磁盘)
# 这一步放在最后,确保先对当前设备进行评估
if dev_type == 'disk' and dev.get('children'): # 只有当是磁盘且有子设备时才需要递归
for child in dev['children']:
process_device(child)
for dev in block_devices:
process_device(dev)
# 去重并排序
return sorted(list(set(candidates)))
def _get_actual_md_device_path(self, array_path):
"""
解析 mdadm 阵列名称(例如 /dev/md/new_raid
对应的实际内核设备路径(例如 /dev/md127
通过检查 /dev/md/ 目录下的符号链接来获取。
Added type check for array_path.
"""
if not isinstance(array_path, str):
logger.warning(f"传入 _get_actual_md_device_path 的 array_path 不是字符串: {array_path} (类型: {type(array_path)})")
return None
if os.path.exists(array_path):
try:
# os.path.realpath 会解析符号链接,例如 /dev/md/new_raid -> /dev/md127
actual_path = os.path.realpath(array_path)
# 确保解析后仍然是 md 设备(以 /dev/md 开头)
if actual_path.startswith('/dev/md'):
logger.debug(f"已将 RAID 阵列 {array_path} 解析为实际设备路径: {actual_path}")
return actual_path
except Exception as e:
logger.warning(f"无法通过 os.path.realpath 获取 RAID 阵列 {array_path} 的实际设备路径: {e}")
logger.warning(f"RAID 阵列 {array_path} 不存在或无法解析其真实路径。")
return None
def get_device_details_by_path(self, device_path):
"""
根据设备路径获取设备的 UUID 和文件系统类型 (fstype)。
此方法现在可以正确处理 /dev/md/new_raid 这样的 RAID 阵列别名,以及 LVM 逻辑卷的符号链接。
Added type check for device_path.
:param device_path: 设备的路径,例如 '/dev/sdb1''/dev/md0''/dev/md/new_raid'
:return: 包含 'uuid''fstype' 的字典,如果未找到则返回 None。
"""
if not isinstance(device_path, str):
logger.warning(f"传入 get_device_details_by_path 的 device_path 不是字符串: {device_path} (类型: {type(device_path)})")
return None
original_device_path = device_path
logger.debug(f"get_device_details_by_path: 正在处理原始设备路径: {original_device_path}")
devices = self.get_block_devices() # 获取所有块设备信息
logger.debug(f"get_device_details_by_path: 从 lsblk 获取到 {len(devices)} 个块设备信息。")
target_maj_min = None
current_search_path = original_device_path
# 1. 解析符号链接以获取实际路径,并尝试获取 maj:min
if original_device_path.startswith('/dev/') and os.path.exists(original_device_path):
try:
resolved_path = os.path.realpath(original_device_path)
logger.debug(f"get_device_details_by_path: 原始设备路径 {original_device_path} 解析为 {resolved_path}")
current_search_path = resolved_path
stat_output, _ = self._run_command(["stat", "-c", "%t:%T", resolved_path], check_output=True)
raw_maj_min = stat_output.strip() # e.g., "fc:0"
# MODIFIED: Convert hex major to decimal
if ':' in raw_maj_min:
major_hex, minor_dec = raw_maj_min.split(':')
try:
major_dec = str(int(major_hex, 16)) # Convert hex to decimal string
target_maj_min = f"{major_dec}:{minor_dec}" # e.g., "252:0"
logger.debug(f"get_device_details_by_path: 解析后设备 {resolved_path} 的 maj:min (hex) 为 {raw_maj_min},转换为 decimal 为 {target_maj_min}")
except ValueError:
logger.warning(f"get_device_details_by_path: 无法将 maj:min 的主要部分 '{major_hex}' 从十六进制转换为十进制。")
target_maj_min = None
else:
logger.warning(f"get_device_details_by_path: stat 输出 '{raw_maj_min}' 格式不符合预期。")
target_maj_min = None
except subprocess.CalledProcessError as e:
logger.debug(f"get_device_details_by_path: 无法获取 {resolved_path} 的 maj:min (命令失败): {e.stderr.strip()}")
except Exception as e:
logger.debug(f"get_device_details_by_path: 无法获取 {resolved_path} 的 maj:min (未知错误): {e}")
elif original_device_path.startswith('/dev/') and not os.path.exists(original_device_path):
logger.warning(f"get_device_details_by_path: 设备路径 {original_device_path} 不存在,无法获取详情。")
return None
# 2. 首先,尝试使用 (可能已解析的) current_search_path 在 lsblk 输出中查找
lsblk_details = self._find_device_by_path_recursive(devices, current_search_path)
logger.debug(f"get_device_details_by_path: _find_device_by_path_recursive 查找结果 (使用 {current_search_path}, 直接查找): {lsblk_details}")
if lsblk_details and lsblk_details.get('fstype'):
logger.debug(f"get_device_details_by_path: 直接从 lsblk 获取到 {original_device_path} (解析为 {current_search_path}) 的详情: {lsblk_details}")
return {
'uuid': lsblk_details.get('uuid'),
'fstype': lsblk_details.get('fstype')
}
# 3. 如果直接路径查找失败,并且我们有 maj:min尝试通过 maj:min 查找
if target_maj_min:
logger.debug(f"get_device_details_by_path: 直接路径查找失败,尝试通过 maj:min ({target_maj_min}) 查找。")
dev_info_by_maj_min = self._find_device_by_maj_min_recursive(devices, target_maj_min)
logger.debug(f"get_device_details_by_path: 通过 maj:min 查找结果: {dev_info_by_maj_min}")
if dev_info_by_maj_min and dev_info_by_maj_min.get('fstype'):
logger.debug(f"get_device_details_by_path: 通过 maj:min 找到 {original_device_path} 的文件系统详情: {dev_info_by_maj_min}")
return {
'uuid': dev_info_by_maj_min.get('uuid'),
'fstype': dev_info_by_maj_min.get('fstype')
}
# 4. 如果仍然没有找到,并且是 RAID 阵列(例如 /dev/md/new_raid
if original_device_path.startswith('/dev/md'):
logger.debug(f"get_device_details_by_path: 正在处理 RAID 阵列 {original_device_path}...")
actual_md_device_path = self._get_actual_md_device_path(original_device_path)
logger.debug(f"get_device_details_by_path: RAID 阵列 {original_device_path} 的实际设备路径: {actual_md_device_path}")
if actual_md_device_path:
actual_device_lsblk_details = self._find_device_by_path_recursive(devices, actual_md_device_path)
logger.debug(f"get_device_details_by_path: 实际设备路径 {actual_md_device_path} 的 lsblk 详情: {actual_device_lsblk_details}")
if actual_device_lsblk_details and actual_device_lsblk_details.get('fstype'):
logger.debug(f"get_device_details_by_path: 在实际设备 {actual_md_device_path} 上找到了文件系统详情: {actual_device_lsblk_details}")
return {
'uuid': actual_device_lsblk_details.get('uuid'),
'fstype': actual_device_lsblk_details.get('fstype')
}
else:
logger.warning(f"get_device_details_by_path: RAID 阵列 {original_device_path} (实际设备 {actual_md_device_path}) 未找到文件系统类型。")
return None
else:
logger.warning(f"get_device_details_by_path: 无法确定 RAID 阵列 {original_device_path} 的实际内核设备路径。")
return None
# 5. 如果仍然没有找到,返回 None
logger.debug(f"get_device_details_by_path: 未能获取到 {original_device_path} 的任何详情。")
return None