Files
diskmanager/system_info.py
2026-02-02 04:20:00 +08:00

265 lines
11 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

# system_info.py
import subprocess
import json
import os
import logging
logger = logging.getLogger(__name__)
class SystemInfoManager:
def __init__(self):
pass
def _run_command(self, cmd, root_privilege=False, check_output=True):
"""
执行shell命令并返回stdout和stderr。
如果需要root权限会尝试使用sudo。
所有输出和错误都会通过 logger 记录。
"""
full_cmd = []
if root_privilege:
# 检查当前是否已经是root用户
if os.geteuid() != 0:
full_cmd.append("sudo")
full_cmd.extend(cmd)
cmd_str = ' '.join(full_cmd)
logger.info(f"执行命令: {cmd_str}")
try:
result = subprocess.run(
full_cmd,
capture_output=True,
text=True,
check=check_output,
encoding='utf-8',
env=dict(os.environ, LANG="en_US.UTF-8")
)
if result.stdout:
logger.debug(f"命令输出 (stdout):\n{result.stdout.strip()}")
if result.stderr:
logger.warning(f"命令输出 (stderr):\n{result.stderr.strip()}")
return result.stdout.strip(), result.stderr.strip()
except subprocess.CalledProcessError as e:
error_msg = f"命令执行失败: {cmd_str}\n" \
f"退出码: {e.returncode}\n" \
f"标准输出: {e.stdout}\n" \
f"标准错误: {e.stderr}"
logger.error(error_msg)
raise ValueError(error_msg)
except FileNotFoundError:
error_msg = f"命令未找到: {full_cmd[0]}。请确保已安装。"
logger.error(error_msg)
raise FileNotFoundError(error_msg)
except Exception as e:
error_msg = f"执行命令时发生未知错误: {e}"
logger.error(error_msg)
raise RuntimeError(error_msg)
def get_block_devices(self):
"""
获取所有块设备的信息以JSON格式返回。
使用 lsblk -J 命令。
"""
try:
stdout, _ = self._run_command(["lsblk", "-J", "-o", "NAME,FSTYPE,SIZE,MOUNTPOINT,RO,TYPE,UUID,PARTUUID,VENDOR,MODEL,SERIAL,MAJ:MIN,PKNAME"], root_privilege=False)
data = json.loads(stdout)
logger.info("成功获取块设备信息。")
return data.get('blockdevices', [])
except Exception as e:
logger.error(f"获取块设备信息失败: {e}")
return []
def get_mdadm_arrays(self):
"""
获取所有RAID阵列的详细信息。
首先使用 mdadm --detail --scan 获取阵列列表,
然后对每个阵列使用 mdadm --detail 获取更详细的信息。
"""
all_raid_details = []
try:
# 1. 获取所有RAID设备的路径
scan_stdout, _ = self._run_command(["mdadm", "--detail", "--scan"], root_privilege=False)
array_paths = []
for line in scan_stdout.splitlines():
if line.startswith("ARRAY"):
parts = line.split()
if len(parts) > 1:
array_paths.append(parts[1]) # 例如 /dev/md126
if not array_paths:
logger.info("未找到任何RAID阵列。")
return []
# 2. 对每个RAID设备获取详细信息
for path in array_paths:
try:
detail_stdout, _ = self._run_command(["mdadm", "--detail", path], root_privilege=False)
parsed_detail = self._parse_mdadm_detail_output(detail_stdout)
parsed_detail['device'] = path # 添加设备路径
all_raid_details.append(parsed_detail)
except Exception as e:
logger.error(f"获取RAID阵列 {path} 的详细信息失败: {e}")
logger.info("成功获取RAID阵列信息。")
return all_raid_details
except Exception as e:
logger.error(f"获取RAID阵列列表失败: {e}")
return []
def _parse_mdadm_detail_output(self, output_string):
"""
解析 mdadm --detail 命令的输出字符串,提取关键信息。
"""
details = {
'level': 'N/A',
'array_size': 'N/A',
'raid_devices': 'N/A',
'total_devices': 'N/A',
'state': 'N/A',
'active_devices': 'N/A',
'working_devices': 'N/A',
'failed_devices': 'N/A',
'spare_devices': 'N/A',
'chunk_size': 'N/A',
'uuid': 'N/A',
'name': 'N/A',
'member_devices': []
}
member_devices_section = False
lines = output_string.splitlines()
for line in lines:
line = line.strip()
if not line:
continue
if line.startswith("Number Major Minor RaidDevice State"):
member_devices_section = True
continue
if member_devices_section:
parts = line.split()
if len(parts) >= 6: # Ensure enough parts for device info
# State can be multiple words, e.g., "active sync"
device_state_parts = parts[4:-1]
# Handle cases where device path might have spaces or be complex, though usually not for /dev/sdX
device_path = parts[-1]
details['member_devices'].append({
'number': parts[0],
'major': parts[1],
'minor': parts[2],
'raid_device': parts[3],
'state': ' '.join(device_state_parts),
'device_path': device_path
})
else:
if ':' in line:
key, value = line.split(':', 1)
key = key.strip().lower().replace(' ', '_')
value = value.strip()
if key == 'raid_level':
details['level'] = value
elif key == 'array_size':
details['array_size'] = value
elif key == 'raid_devices':
details['raid_devices'] = value
elif key == 'total_devices':
details['total_devices'] = value
elif key == 'state':
details['state'] = value
elif key == 'active_devices':
details['active_devices'] = value
elif key == 'working_devices':
details['working_devices'] = value
elif key == 'failed_devices':
details['failed_devices'] = value
elif key == 'spare_devices':
details['spare_devices'] = value
elif key == 'chunk_size':
details['chunk_size'] = value
elif key == 'uuid':
details['uuid'] = value
elif key == 'name':
details['name'] = value
return details
def get_lvm_info(self):
"""
获取LVM的物理卷、卷组、逻辑卷信息。
使用 pvs, vgs, lvs 命令并尝试获取JSON格式。
"""
lvm_info = {}
try:
stdout_pvs, _ = self._run_command(["pvs", "--reportformat", "json"], root_privilege=False)
lvm_info['pvs'] = json.loads(stdout_pvs).get('report', [])[0].get('pv', [])
logger.info("成功获取物理卷信息。")
except Exception as e:
logger.error(f"获取物理卷信息失败: {e}")
lvm_info['pvs'] = []
try:
stdout_vgs, _ = self._run_command(["vgs", "--reportformat", "json"], root_privilege=False)
lvm_info['vgs'] = json.loads(stdout_vgs).get('report', [])[0].get('vg', [])
logger.info("成功获取卷组信息。")
except Exception as e:
logger.error(f"获取卷组信息失败: {e}")
lvm_info['vgs'] = []
try:
stdout_lvs, _ = self._run_command(["lvs", "--reportformat", "json"], root_privilege=False)
lvm_info['lvs'] = json.loads(stdout_lvs).get('report', [])[0].get('lv', [])
logger.info("成功获取逻辑卷信息。")
except Exception as e:
logger.error(f"获取逻辑卷信息失败: {e}")
lvm_info['lvs'] = []
return lvm_info
# 示例用法 (可以在此模块中添加测试代码)
if __name__ == "__main__":
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
manager = SystemInfoManager()
logger.info("--- 块设备信息 ---")
devices = manager.get_block_devices()
for dev in devices:
logger.info(f" NAME: {dev.get('name')}, TYPE: {dev.get('type')}, SIZE: {dev.get('size')}, MOUNTPOINT: {dev.get('mountpoint')}")
logger.info("\n--- RAID 阵列信息 ---")
raid_info = manager.get_mdadm_arrays()
if raid_info:
for array in raid_info:
logger.info(f" Device: {array.get('device')}, Level: {array.get('level')}, State: {array.get('state')}, Size: {array.get('array_size')}")
logger.info(f" Active: {array.get('active_devices')}, Failed: {array.get('failed_devices')}, Spare: {array.get('spare_devices')}")
for member in array.get('member_devices', []):
logger.info(f" Member: {member.get('device_path')} (State: {member.get('state')})")
else:
logger.info(" 未找到RAID阵列。")
logger.info("\n--- LVM 信息 ---")
lvm_info = manager.get_lvm_info()
logger.info("物理卷 (PVs):")
if lvm_info['pvs']:
for pv in lvm_info['pvs']:
logger.info(f" {pv.get('pv_name')} (VG: {pv.get('vg_name')}, Size: {pv.get('pv_size')})")
else:
logger.info(" 未找到物理卷。")
logger.info("卷组 (VGs):")
if lvm_info['vgs']:
for vg in lvm_info['vgs']:
logger.info(f" {vg.get('vg_name')} (Size: {vg.get('vg_size')}, PVs: {vg.get('pv_count')}, LVs: {vg.get('lv_count')})")
else:
logger.info(" 未找到卷组。")
logger.info("逻辑卷 (LVs):")
if lvm_info['lvs']:
for lv in lvm_info['lvs']:
logger.info(f" {lv.get('lv_name')} (VG: {lv.get('vg_name')}, Size: {lv.get('lv_size')})")
else:
logger.info(" 未找到逻辑卷。")