Files
diskmanager/system_info.py
2026-02-03 15:18:43 +08:00

759 lines
39 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# system_info.py
import subprocess
import json
import logging
import re
import os
logger = logging.getLogger(__name__)
class SystemInfoManager:
def __init__(self):
pass
def _run_command(self, command_list, root_privilege=False, check_output=True, input_data=None):
"""
通用地运行一个 shell 命令。
:param command_list: 命令及其参数的列表。
:param root_privilege: 如果为 True则使用 sudo 执行命令。
:param check_output: 如果为 True则捕获 stdout 和 stderr。如果为 False则不捕获用于需要交互或不关心输出的命令。
:param input_data: 传递给命令stdin的数据 (str)。
:return: (stdout_str, stderr_str)
:raises subprocess.CalledProcessError: 如果命令返回非零退出码。
"""
if root_privilege:
command_list = ["sudo"] + command_list
logger.debug(f"运行命令: {' '.join(command_list)}")
try:
result = subprocess.run(
command_list,
capture_output=check_output,
text=True,
check=True,
encoding='utf-8',
input=input_data
)
return result.stdout if check_output else "", result.stderr if check_output else ""
except subprocess.CalledProcessError as e:
logger.error(f"命令执行失败: {' '.join(command_list)}")
logger.error(f"退出码: {e.returncode}")
logger.error(f"标准输出: {e.stdout.strip()}")
logger.error(f"标准错误: {e.stderr.strip()}")
raise
except FileNotFoundError:
logger.error(f"命令 '{command_list[0]}' 未找到。请确保已安装相关工具。")
raise
except Exception as e:
logger.error(f"运行命令 {' '.join(command_list)} 时发生未知错误: {e}")
raise
def get_block_devices(self):
"""
使用 lsblk 获取块设备信息。
返回一个字典列表,每个字典代表一个设备。
"""
cmd = [
"lsblk", "-J", "-o",
"NAME,FSTYPE,SIZE,MOUNTPOINT,RO,TYPE,UUID,PARTUUID,VENDOR,MODEL,SERIAL,MAJ:MIN,PKNAME,PATH"
]
try:
stdout, _ = self._run_command(cmd)
data = json.loads(stdout)
devices = data.get('blockdevices', [])
def add_path_recursive(dev_list):
for dev in dev_list:
if 'PATH' not in dev and 'NAME' in dev:
dev['PATH'] = f"/dev/{dev['NAME']}"
# Rename PATH to path (lowercase) for consistency
if 'PATH' in dev:
dev['path'] = dev.pop('PATH')
if 'children' in dev:
add_path_recursive(dev['children'])
add_path_recursive(devices)
return devices
except subprocess.CalledProcessError as e:
logger.error(f"获取块设备信息失败: {e.stderr.strip()}")
return []
except json.JSONDecodeError as e:
logger.error(f"解析 lsblk JSON 输出失败: {e}")
return []
except Exception as e:
logger.error(f"获取块设备信息失败 (未知错误): {e}")
return []
def _find_device_by_path_recursive(self, dev_list, target_path):
"""
Helper to find device data by its path recursively.
Added type check for target_path.
"""
if not isinstance(target_path, str):
logger.warning(f"传入 _find_device_by_path_recursive 的 target_path 不是字符串: {target_path} (类型: {type(target_path)})")
return None
for dev in dev_list:
if dev.get('path') == target_path:
return dev
if 'children' in dev:
found = self._find_device_by_path_recursive(dev['children'], target_path)
if found:
return found
return None
def _find_device_by_maj_min_recursive(self, dev_list, target_maj_min):
"""
Helper to find device data by its major:minor number recursively.
"""
if not isinstance(target_maj_min, str):
logger.warning(f"传入 _find_device_by_maj_min_recursive 的 target_maj_min 不是字符串: {target_maj_min} (类型: {type(target_maj_min)})")
return None
for dev in dev_list:
if dev.get('maj:min') == target_maj_min:
return dev
if 'children' in dev:
found = self._find_device_by_maj_min_recursive(dev['children'], target_maj_min)
if found:
return found
return None
def get_mountpoint_for_device(self, device_path):
"""
根据设备路径获取其挂载点。
此方法现在可以正确处理 /dev/md/new_raid 这样的 RAID 阵列别名,以及 LVM 逻辑卷的符号链接。
"""
if not isinstance(device_path, str):
logger.warning(f"传入 get_mountpoint_for_device 的 device_path 不是字符串: {device_path} (类型: {type(device_path)})")
return None
original_device_path = device_path
logger.debug(f"get_mountpoint_for_device: 正在处理原始设备路径: {original_device_path}")
devices = self.get_block_devices()
logger.debug(f"get_mountpoint_for_device: 从 lsblk 获取到 {len(devices)} 个块设备信息。")
target_maj_min = None
current_search_path = original_device_path
# 1. 解析符号链接以获取实际路径,并尝试获取 maj:min
if original_device_path.startswith('/dev/') and os.path.exists(original_device_path):
try:
# 先尝试解析真实路径,因为 stat 可能对符号链接返回链接本身的 maj:min
resolved_path = os.path.realpath(original_device_path)
logger.debug(f"get_mountpoint_for_device: 原始设备路径 {original_device_path} 解析为 {resolved_path}")
current_search_path = resolved_path # 使用解析后的路径进行 stat 和后续查找
# 获取解析后路径的 maj:min
stat_output, _ = self._run_command(["stat", "-c", "%t:%T", resolved_path], check_output=True)
raw_maj_min = stat_output.strip() # e.g., "fc:0"
# MODIFIED: Convert hex major to decimal
if ':' in raw_maj_min:
major_hex, minor_dec = raw_maj_min.split(':')
try:
major_dec = str(int(major_hex, 16)) # Convert hex to decimal string
target_maj_min = f"{major_dec}:{minor_dec}" # e.g., "252:0"
logger.debug(f"get_mountpoint_for_device: 解析后设备 {resolved_path} 的 maj:min (hex) 为 {raw_maj_min},转换为 decimal 为 {target_maj_min}")
except ValueError:
logger.warning(f"get_mountpoint_for_device: 无法将 maj:min 的主要部分 '{major_hex}' 从十六进制转换为十进制。")
target_maj_min = None # Fallback if conversion fails
else:
logger.warning(f"get_mountpoint_for_device: stat 输出 '{raw_maj_min}' 格式不符合预期。")
target_maj_min = None
except subprocess.CalledProcessError as e:
logger.debug(f"get_mountpoint_for_device: 无法获取 {resolved_path} 的 maj:min (命令失败): {e.stderr.strip()}")
except Exception as e:
logger.debug(f"get_mountpoint_for_device: 无法获取 {resolved_path} 的 maj:min (未知错误): {e}")
elif original_device_path.startswith('/dev/') and not os.path.exists(original_device_path):
logger.warning(f"get_mountpoint_for_device: 设备路径 {original_device_path} 不存在,无法获取挂载点。")
return None
# 2. 尝试使用 (可能已解析的) current_search_path 在 lsblk 输出中查找
dev_info = self._find_device_by_path_recursive(devices, current_search_path)
logger.debug(f"get_mountpoint_for_device: _find_device_by_path_recursive 查找结果 (使用 {current_search_path}): {dev_info}")
if dev_info:
mountpoint = dev_info.get('mountpoint')
logger.debug(f"get_mountpoint_for_device: 直接从 lsblk 获取到 {original_device_path} (解析为 {current_search_path}) 的挂载点: {mountpoint}")
return mountpoint
# 3. 如果直接路径查找失败,并且我们有 maj:min尝试通过 maj:min 查找
if target_maj_min:
logger.debug(f"get_mountpoint_for_device: 直接路径查找失败,尝试通过 maj:min ({target_maj_min}) 查找。")
dev_info_by_maj_min = self._find_device_by_maj_min_recursive(devices, target_maj_min)
logger.debug(f"get_mountpoint_for_device: 通过 maj:min 查找结果: {dev_info_by_maj_min}")
if dev_info_by_maj_min:
mountpoint = dev_info_by_maj_min.get('mountpoint')
logger.debug(f"get_mountpoint_for_device: 通过 maj:min 找到 {original_device_path} 的挂载点: {mountpoint}")
return mountpoint
# 4. 如果仍然查找失败,并且是 RAID 阵列(例如 /dev/md/new_raid
if original_device_path.startswith('/dev/md'):
logger.debug(f"get_mountpoint_for_device: 正在处理 RAID 阵列 {original_device_path} 以获取挂载点...")
actual_md_device_path = self._get_actual_md_device_path(original_device_path)
logger.debug(f"get_mountpoint_for_device: RAID 阵列 {original_device_path} 的实际设备路径: {actual_md_device_path}")
if actual_md_device_path:
actual_dev_info = self._find_device_by_path_recursive(devices, actual_md_device_path)
logger.debug(f"get_mountpoint_for_device: 实际设备路径 {actual_md_device_path} 的 lsblk 详情: {actual_dev_info}")
if actual_dev_info:
mountpoint = actual_dev_info.get('mountpoint')
logger.debug(f"get_mountpoint_for_device: 在实际设备 {actual_md_device_path} 上找到了挂载点: {mountpoint}")
return mountpoint
logger.debug(f"get_mountpoint_for_device: 未能获取到 {original_device_path} 的挂载点。")
return None
def _parse_mdadm_detail_output(self, output, device_path):
"""
解析 mdadm --detail 命令的输出。
"""
array_info = {
'device': device_path, # Default to input path, will be updated by canonical_device_path
'level': 'N/A',
'state': 'N/A',
'array_size': 'N/A',
'active_devices': 'N/A',
'failed_devices': 'N/A',
'spare_devices': 'N/A',
'total_devices': 'N/A',
'uuid': 'N/A',
'name': os.path.basename(device_path), # Default name, will be overridden
'chunk_size': 'N/A',
'member_devices': []
}
# 首先,尝试从 mdadm --detail 输出的第一行获取规范的设备路径
device_path_header_match = re.match(r'^(?P<canonical_path>/dev/md\d+):', output)
if device_path_header_match:
array_info['device'] = device_path_header_match.group('canonical_path')
array_info['name'] = os.path.basename(array_info['device']) # 根据规范路径更新默认名称
# 逐行解析键值对
for line in output.splitlines():
line = line.strip()
if not line:
continue
# 尝试匹配 "Key : Value" 格式
kv_match = re.match(r'^(?P<key>[A-Za-z ]+)\s*:\s*(?P<value>.+)', line)
if kv_match:
key = kv_match.group('key').strip().lower().replace(' ', '_')
value = kv_match.group('value').strip()
if key == 'raid_level':
array_info['level'] = value
elif key == 'state':
array_info['state'] = value
elif key == 'array_size':
array_info['array_size'] = value
elif key == 'uuid':
array_info['uuid'] = value
elif key == 'raid_devices':
array_info['total_devices'] = value
elif key == 'active_devices':
array_info['active_devices'] = value
elif key == 'failed_devices':
array_info['failed_devices'] = value
elif key == 'spare_devices':
array_info['spare_devices'] = value
elif key == 'name':
# 只取名字的第一部分,忽略括号内的额外信息
array_info['name'] = value.split(' ')[0]
elif key == 'chunk_size':
array_info['chunk_size'] = value
# 成员设备解析 (独立于键值对,因为它有固定格式)
member_pattern = re.match(r'^\s*(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s+([^\s]+(?:\s+[^\s]+)*)\s+(/dev/\S+)$', line)
if member_pattern:
array_info['member_devices'].append({
'number': member_pattern.group(1),
'major': member_pattern.group(2),
'minor': member_pattern.group(3),
'raid_device': member_pattern.group(4),
'state': member_pattern.group(5).strip(),
'device_path': member_pattern.group(6)
})
# 最终状态标准化
if array_info['state'] and 'active' in array_info['state'].lower():
array_info['state'] = array_info['state'].replace('active', 'Active')
elif array_info['state'] == 'clean':
array_info['state'] = 'Active, Clean'
elif array_info['state'] == 'N/A': # 如果未明确找到状态,则回退到推断
if "clean" in output.lower() and "active" in output.lower():
array_info['state'] = "Active, Clean"
elif "degraded" in output.lower():
array_info['state'] = "Degraded"
elif "inactive" in output.lower() or "stopped" in output.lower():
array_info['state'] = "Stopped"
return array_info
def get_mdadm_arrays(self):
"""
获取所有 RAID 阵列的信息,包括活动的和已停止的。
"""
all_arrays_info = {} # 使用 UUID 作为键,存储阵列的详细信息
# 1. 获取活动的 RAID 阵列信息 (通过 mdadm --detail --scan 和 /proc/mdstat)
active_md_devices = []
# 从 mdadm --detail --scan 获取
try:
stdout_scan, stderr_scan = self._run_command(["mdadm", "--detail", "--scan"], check_output=True)
for line in stdout_scan.splitlines():
if line.startswith("ARRAY"):
match = re.match(r'ARRAY\s+(\S+)(?:\s+\S+=\S+)*\s+UUID=([0-9a-f:]+)', line)
if match:
dev_path = match.group(1)
if dev_path not in active_md_devices:
active_md_devices.append(dev_path)
except subprocess.CalledProcessError as e:
if "No arrays found" not in e.stderr and "No arrays found" not in e.stdout:
logger.warning(f"执行 mdadm --detail --scan 失败: {e.stderr.strip()}")
except Exception as e:
logger.warning(f"处理 mdadm --detail --scan 输出时发生错误: {e}")
# 从 /proc/mdstat 获取 (补充可能未被 --scan 报告的活动阵列)
try:
with open("/proc/mdstat", "r") as f:
mdstat_content = f.read()
for line in mdstat_content.splitlines():
if line.startswith("md") and "active" in line:
device_name_match = re.match(r'^(md\d+)\s+:', line)
if device_name_match:
dev_path = f"/dev/{device_name_match.group(1)}"
if dev_path not in active_md_devices:
active_md_devices.append(dev_path)
except FileNotFoundError:
logger.debug("/proc/mdstat not found. Cannot check active arrays from /proc/mdstat.")
except Exception as e:
logger.error(f"Error reading /proc/mdstat: {e}")
# 现在,对每个找到的活动 MD 设备获取其详细信息
for device_path in active_md_devices:
try:
# 解析符号链接以获取规范路径,例如 /dev/md/0 -> /dev/md0
canonical_device_path = self._get_actual_md_device_path(device_path)
if not canonical_device_path:
canonical_device_path = device_path # 如果解析失败,使用原始路径作为回退
detail_stdout, detail_stderr = self._run_command(["mdadm", "--detail", canonical_device_path], check_output=True)
array_info = self._parse_mdadm_detail_output(detail_stdout, canonical_device_path)
if array_info and array_info.get('uuid') and array_info.get('device'):
# _parse_mdadm_detail_output 现在会更新 array_info['device'] 为规范路径
# 使用 UUID 作为键,如果同一个阵列有多个表示,只保留一个(通常是活动的)
all_arrays_info[array_info['uuid']] = array_info
else:
logger.warning(f"无法从 mdadm --detail {canonical_device_path} 输出中解析 RAID 阵列信息: {detail_stdout[:100]}...")
except subprocess.CalledProcessError as e:
logger.warning(f"获取 RAID 阵列 {device_path} 详细信息失败: {e.stderr.strip()}")
except Exception as e:
logger.warning(f"处理 RAID 阵列 {device_path} 详细信息时发生错误: {e}")
# 2. 从 /etc/mdadm.conf 获取配置的 RAID 阵列信息 (可能包含已停止的)
mdadm_conf_paths = ["/etc/mdadm/mdadm.conf", "/etc/mdadm.conf"]
found_conf = False
for mdadm_conf_path in mdadm_conf_paths:
if os.path.exists(mdadm_conf_path):
found_conf = True
try:
with open(mdadm_conf_path, 'r') as f:
for line in f:
line = line.strip()
if line.startswith("ARRAY"):
match_base = re.match(r'ARRAY\s+(\S+)\s*(.*)', line)
if match_base:
device_path = match_base.group(1)
rest_of_line = match_base.group(2)
uuid = 'N/A'
name = os.path.basename(device_path) # Default name
uuid_match_conf = re.search(r'UUID=([0-9a-f:]+)', rest_of_line)
if uuid_match_conf:
uuid = uuid_match_conf.group(1)
name_match_conf = re.search(r'NAME=(\S+)', rest_of_line)
if name_match_conf:
name = name_match_conf.group(1)
if uuid != 'N/A': # 只有成功提取到 UUID 才添加
# 只有当此 UUID 对应的阵列尚未被识别为活动状态时,才添加为停止状态
if uuid not in all_arrays_info:
all_arrays_info[uuid] = {
'device': device_path,
'uuid': uuid,
'name': name,
'level': 'Unknown',
'state': 'Stopped (Configured)',
'array_size': 'N/A',
'active_devices': 'N/A',
'failed_devices': 'N/A',
'spare_devices': 'N/A',
'total_devices': 'N/A',
'chunk_size': 'N/A',
'member_devices': []
}
else:
logger.warning(f"无法从 mdadm.conf 行 '{line}' 中提取 UUID。")
else:
logger.warning(f"无法解析 mdadm.conf 中的 ARRAY 行: '{line}'")
except Exception as e:
logger.warning(f"读取或解析 {mdadm_conf_path} 失败: {e}")
break # 找到并处理了一个配置文件就退出循环
if not found_conf:
logger.info(f"未找到 mdadm 配置文件。")
# 返回所有阵列的列表
# 排序:活动阵列在前,然后是停止的
sorted_arrays = sorted(all_arrays_info.values(), key=lambda x: (x.get('state') != 'Active', x.get('device')))
return sorted_arrays
def get_lvm_info(self):
"""
获取 LVM 物理卷 (PVs)、卷组 (VGs) 和逻辑卷 (LVs) 的信息。
"""
lvm_info = {'pvs': [], 'vgs': [], 'lvs': []}
# Get PVs
try:
stdout, stderr = self._run_command(["pvs", "--reportformat", "json"])
data = json.loads(stdout)
if 'report' in data and data['report']:
for pv_data in data['report'][0].get('pv', []):
lvm_info['pvs'].append({
'pv_name': pv_data.get('pv_name'),
'vg_name': pv_data.get('vg_name'),
'pv_uuid': pv_data.get('pv_uuid'),
'pv_size': pv_data.get('pv_size'),
'pv_free': pv_data.get('pv_free'),
'pv_attr': pv_data.get('pv_attr'),
'pv_fmt': pv_data.get('pv_fmt')
})
except subprocess.CalledProcessError as e:
if "No physical volume found" in e.stderr or "No physical volumes found" in e.stdout:
logger.info("未找到任何LVM物理卷。")
else:
logger.error(f"获取LVM物理卷信息失败: {e.stderr.strip()}")
except json.JSONDecodeError as e:
logger.error(f"解析LVM物理卷JSON输出失败: {e}")
except Exception as e:
logger.error(f"获取LVM物理卷信息失败 (未知错误): {e}")
# Get VGs
try:
stdout, stderr = self._run_command(["vgs", "--reportformat", "json"])
data = json.loads(stdout)
if 'report' in data and data['report']:
for vg_data in data['report'][0].get('vg', []):
lvm_info['vgs'].append({
'vg_name': vg_data.get('vg_name'),
'vg_uuid': vg_data.get('vg_uuid'),
'vg_size': vg_data.get('vg_size'),
'vg_free': vg_data.get('vg_free'),
'vg_attr': vg_data.get('vg_attr'),
'pv_count': vg_data.get('pv_count'),
'lv_count': vg_data.get('lv_count'),
'vg_alloc_percent': vg_data.get('vg_alloc_percent'),
'vg_fmt': vg_data.get('vg_fmt')
})
except subprocess.CalledProcessError as e:
if "No volume group found" in e.stderr or "No volume groups found" in e.stdout:
logger.info("未找到任何LVM卷组。")
else:
logger.error(f"获取LVM卷组信息失败: {e.stderr.strip()}")
except json.JSONDecodeError as e:
logger.error(f"解析LVM卷组JSON输出失败: {e}")
except Exception as e:
logger.error(f"获取LVM卷组信息失败 (未知错误): {e}")
# Get LVs (MODIFIED: added -o lv_path)
try:
# 明确请求 lv_path因为默认的 --reportformat json 不包含它
stdout, stderr = self._run_command(["lvs", "-o", "lv_name,vg_name,lv_uuid,lv_size,lv_attr,origin,snap_percent,lv_path", "--reportformat", "json"])
data = json.loads(stdout)
if 'report' in data and data['report']:
for lv_data in data['report'][0].get('lv', []):
lvm_info['lvs'].append({
'lv_name': lv_data.get('lv_name'),
'vg_name': lv_data.get('vg_name'),
'lv_uuid': lv_data.get('lv_uuid'),
'lv_size': lv_data.get('lv_size'),
'lv_attr': lv_data.get('lv_attr'),
'origin': lv_data.get('origin'),
'snap_percent': lv_data.get('snap_percent'),
'lv_path': lv_data.get('lv_path') # 现在应该能正确获取到路径了
})
except subprocess.CalledProcessError as e:
if "No logical volume found" in e.stderr or "No logical volumes found" in e.stdout:
logger.info("未找到任何LVM逻辑卷。")
else:
logger.error(f"获取LVM逻辑卷信息失败: {e.stderr.strip()}")
except json.JSONDecodeError as e:
logger.error(f"解析LVM逻辑卷JSON输出失败: {e}")
except Exception as e:
logger.error(f"获取LVM逻辑卷信息失败 (未知错误): {e}")
return lvm_info
def get_unallocated_partitions(self):
"""
获取所有可用于创建RAID阵列或LVM物理卷的设备。
这些设备包括:
1. 没有分区表的整个磁盘。
2. 未挂载的分区包括有文件系统但未挂载的如ext4/xfs等
3. 未被LVM或RAID使用的分区或磁盘。
返回一个设备路径列表,例如 ['/dev/sdb1', '/dev/sdc']。
"""
block_devices = self.get_block_devices()
candidates = []
def process_device(dev):
dev_path = dev.get('path')
dev_type = dev.get('type')
mountpoint = dev.get('mountpoint')
fstype = dev.get('fstype')
# 1. 检查是否已挂载 (排除SWAP因为SWAP可以被覆盖但如果活跃则不应动)
# 如果是活跃挂载点非SWAP则不是候选
if mountpoint and mountpoint != '[SWAP]':
# 如果设备已挂载,则它不是候选。
# 但仍需处理其子设备,因为父设备挂载不代表子设备不能用(虽然不常见)
# 或者子设备挂载不代表父设备不能用(例如使用整个磁盘)
if 'children' in dev:
for child in dev['children']:
process_device(child)
return
# 2. 检查是否是LVM物理卷或RAID成员
# 如果是LVM PV或RAID成员则它不是新RAID/PV的候选
if fstype in ['LVM2_member', 'linux_raid_member']:
if 'children' in dev: # 即使是LVM/RAID成员也可能存在子设备例如LVM上的分区虽然不常见
for child in dev['children']:
process_device(child)
return
# 3. 如果是整个磁盘且没有分区,则是一个候选
if dev_type == 'disk' and not dev.get('children'):
candidates.append(dev_path)
# 4. 如果是分区且通过了上述挂载和LVM/RAID检查则是一个候选
elif dev_type == 'part':
candidates.append(dev_path)
# 5. 递归处理子设备 (对于有分区的磁盘)
# 这一步放在最后,确保先对当前设备进行评估
if dev_type == 'disk' and dev.get('children'): # 只有当是磁盘且有子设备时才需要递归
for child in dev['children']:
process_device(child)
for dev in block_devices:
process_device(dev)
# 去重并排序
return sorted(list(set(candidates)))
def _get_actual_md_device_path(self, array_path):
"""
解析 mdadm 阵列名称(例如 /dev/md/new_raid
对应的实际内核设备路径(例如 /dev/md127
通过检查 /dev/md/ 目录下的符号链接来获取。
Added type check for array_path.
"""
if not isinstance(array_path, str):
logger.warning(f"传入 _get_actual_md_device_path 的 array_path 不是字符串: {array_path} (类型: {type(array_path)})")
return None
# 如果已经是规范的 /dev/mdX 路径,直接返回
if re.match(r'^/dev/md\d+$', array_path):
return array_path
if os.path.exists(array_path):
try:
# os.path.realpath 会解析符号链接,例如 /dev/md/new_raid -> /dev/md127
actual_path = os.path.realpath(array_path)
# 确保解析后仍然是 md 设备(以 /dev/md 开头)
if actual_path.startswith('/dev/md'):
logger.debug(f"已将 RAID 阵列 {array_path} 解析为实际设备路径: {actual_path}")
return actual_path
except Exception as e:
logger.warning(f"无法通过 os.path.realpath 获取 RAID 阵列 {array_path} 的实际设备路径: {e}")
logger.warning(f"RAID 阵列 {array_path} 不存在或无法解析其真实路径。")
return None
def get_device_details_by_path(self, device_path):
"""
根据设备路径获取设备的 UUID 和文件系统类型 (fstype)。
此方法现在可以正确处理 /dev/md/new_raid 这样的 RAID 阵列别名,以及 LVM 逻辑卷的符号链接。
Added type check for device_path.
:param device_path: 设备的路径,例如 '/dev/sdb1''/dev/md0''/dev/md/new_raid'
:return: 包含 'uuid''fstype' 的字典,如果未找到则返回 None。
"""
if not isinstance(device_path, str):
logger.warning(f"传入 get_device_details_by_path 的 device_path 不是字符串: {device_path} (类型: {type(device_path)})")
return None
original_device_path = device_path
logger.debug(f"get_device_details_by_path: 正在处理原始设备路径: {original_device_path}")
devices = self.get_block_devices() # 获取所有块设备信息
logger.debug(f"get_device_details_by_path: 从 lsblk 获取到 {len(devices)} 个块设备信息。")
target_maj_min = None
current_search_path = original_device_path
# 1. 解析符号链接以获取实际路径,并尝试获取 maj:min
if original_device_path.startswith('/dev/') and os.path.exists(original_device_path):
try:
resolved_path = os.path.realpath(original_device_path)
logger.debug(f"get_device_details_by_path: 原始设备路径 {original_device_path} 解析为 {resolved_path}")
current_search_path = resolved_path
stat_output, _ = self._run_command(["stat", "-c", "%t:%T", resolved_path], check_output=True)
raw_maj_min = stat_output.strip() # e.g., "fc:0"
# MODIFIED: Convert hex major to decimal
if ':' in raw_maj_min:
major_hex, minor_dec = raw_maj_min.split(':')
try:
major_dec = str(int(major_hex, 16)) # Convert hex to decimal string
target_maj_min = f"{major_dec}:{minor_dec}" # e.g., "252:0"
logger.debug(f"get_device_details_by_path: 解析后设备 {resolved_path} 的 maj:min (hex) 为 {raw_maj_min},转换为 decimal 为 {target_maj_min}")
except ValueError:
logger.warning(f"get_device_details_by_path: 无法将 maj:min 的主要部分 '{major_hex}' 从十六进制转换为十进制。")
target_maj_min = None
else:
logger.warning(f"get_device_details_by_path: stat 输出 '{raw_maj_min}' 格式不符合预期。")
target_maj_min = None
except subprocess.CalledProcessError as e:
logger.debug(f"get_device_details_by_path: 无法获取 {resolved_path} 的 maj:min (命令失败): {e.stderr.strip()}")
except Exception as e:
logger.debug(f"get_device_details_by_path: 无法获取 {resolved_path} 的 maj:min (未知错误): {e}")
elif original_device_path.startswith('/dev/') and not os.path.exists(original_device_path):
logger.warning(f"get_device_details_by_path: 设备路径 {original_device_path} 不存在,无法获取详情。")
return None
# 2. 首先,尝试使用 (可能已解析的) current_search_path 在 lsblk 输出中查找
lsblk_details = self._find_device_by_path_recursive(devices, current_search_path)
logger.debug(f"get_device_details_by_path: _find_device_by_path_recursive 查找结果 (使用 {current_search_path}, 直接查找): {lsblk_details}")
if lsblk_details and lsblk_details.get('fstype'):
logger.debug(f"get_device_details_by_path: 直接从 lsblk 获取到 {original_device_path} (解析为 {current_search_path}) 的详情: {lsblk_details}")
return {
'uuid': lsblk_details.get('uuid'),
'fstype': lsblk_details.get('fstype')
}
# 3. 如果直接路径查找失败,并且我们有 maj:min尝试通过 maj:min 查找
if target_maj_min:
logger.debug(f"get_device_details_by_path: 直接路径查找失败,尝试通过 maj:min ({target_maj_min}) 查找。")
dev_info_by_maj_min = self._find_device_by_maj_min_recursive(devices, target_maj_min)
logger.debug(f"get_device_details_by_path: 通过 maj:min 查找结果: {dev_info_by_maj_min}")
if dev_info_by_maj_min and dev_info_by_maj_min.get('fstype'):
logger.debug(f"get_device_details_by_path: 通过 maj:min 找到 {original_device_path} 的文件系统详情: {dev_info_by_maj_min}")
return {
'uuid': dev_info_by_maj_min.get('uuid'),
'fstype': dev_info_by_maj_min.get('fstype')
}
# 4. 如果仍然没有找到,并且是 RAID 阵列(例如 /dev/md/new_raid
if original_device_path.startswith('/dev/md'):
logger.debug(f"get_device_details_by_path: 正在处理 RAID 阵列 {original_device_path}...")
actual_md_device_path = self._get_actual_md_device_path(original_device_path)
logger.debug(f"get_device_details_by_path: RAID 阵列 {original_device_path} 的实际设备路径: {actual_md_device_path}")
if actual_md_device_path:
actual_device_lsblk_details = self._find_device_by_path_recursive(devices, actual_md_device_path)
logger.debug(f"get_device_details_by_path: 实际设备路径 {actual_md_device_path} 的 lsblk 详情: {actual_device_lsblk_details}")
if actual_device_lsblk_details and actual_device_lsblk_details.get('fstype'):
logger.debug(f"get_device_details_by_path: 在实际设备 {actual_md_device_path} 上找到了文件系统详情: {actual_device_lsblk_details}")
return {
'uuid': actual_device_lsblk_details.get('uuid'),
'fstype': actual_device_lsblk_details.get('fstype')
}
else:
logger.warning(f"get_device_details_by_path: RAID 阵列 {original_device_path} (实际设备 {actual_md_device_path}) 未找到文件系统类型。")
return None
else:
logger.warning(f"get_device_details_by_path: 无法确定 RAID 阵列 {original_device_path} 的实际内核设备路径。")
return None
# 5. 如果仍然没有找到,返回 None
logger.debug(f"get_device_details_by_path: 未能获取到 {original_device_path} 的任何详情。")
return None
def delete_raid_array_config(self, uuid):
"""
从 mdadm.conf 文件中删除指定 UUID 的 RAID 阵列配置条目。
:param uuid: 要删除的 RAID 阵列的 UUID。
:return: True 如果成功删除或未找到条目False 如果发生错误。
:raises Exception: 如果删除失败。
"""
logger.info(f"尝试从 mdadm.conf 中删除 UUID 为 {uuid} 的 RAID 阵列配置。")
mdadm_conf_paths = ["/etc/mdadm/mdadm.conf", "/etc/mdadm.conf"]
target_conf_path = None
# 找到存在的 mdadm.conf 文件
for path in mdadm_conf_paths:
if os.path.exists(path):
target_conf_path = path
break
if not target_conf_path:
logger.warning("未找到 mdadm 配置文件,无法删除配置条目。")
raise FileNotFoundError("mdadm 配置文件未找到。")
original_lines = []
try:
with open(target_conf_path, 'r') as f:
original_lines = f.readlines()
except Exception as e:
logger.error(f"读取 mdadm 配置文件 {target_conf_path} 失败: {e}")
raise Exception(f"读取 mdadm 配置文件失败: {e}")
new_lines = []
entry_found = False
for line in original_lines:
stripped_line = line.strip()
if stripped_line.startswith("ARRAY"):
# 尝试匹配 UUID
uuid_match = re.search(r'UUID=([0-9a-f:]+)', stripped_line)
if uuid_match and uuid_match.group(1) == uuid:
logger.debug(f"找到并跳过 UUID 为 {uuid} 的配置行: {stripped_line}")
entry_found = True
continue # 跳过此行,即删除
new_lines.append(line)
if not entry_found:
logger.warning(f"在 mdadm.conf 中未找到 UUID 为 {uuid} 的 RAID 阵列配置条目。")
# 即使未找到,也不视为错误,因为可能已经被手动删除或从未写入
return True
# 将修改后的内容写回文件 (需要 root 权限)
try:
# 使用临时文件进行原子写入,防止数据损坏
temp_file_path = f"{target_conf_path}.tmp"
with open(temp_file_path, 'w') as f:
f.writelines(new_lines)
# 替换原文件
self._run_command(["mv", temp_file_path, target_conf_path], root_privilege=True)
logger.info(f"成功从 {target_conf_path} 中删除 UUID 为 {uuid} 的 RAID 阵列配置。")
return True
except subprocess.CalledProcessError as e:
logger.error(f"删除 mdadm 配置条目失败 (mv 命令错误): {e.stderr.strip()}")
raise Exception(f"删除 mdadm 配置条目失败: {e.stderr.strip()}")
except Exception as e:
logger.error(f"写入 mdadm 配置文件 {target_conf_path} 失败: {e}")
raise Exception(f"写入 mdadm 配置文件失败: {e}")