""" ServerGuard - 系统信息概览模块 收集服务器的硬件和操作系统基本信息。 """ import os import re import platform from typing import Dict, Any, List, Optional import sys sys.path.insert(0, os.path.dirname(os.path.dirname(os.path.abspath(__file__)))) from utils import ( execute_command, parse_key_value_output, check_command_exists, safe_int, safe_float, format_bytes ) def get_system_info() -> Dict[str, Any]: """ 获取系统硬件和操作系统信息。 Returns: Dict[str, Any]: 系统信息字典 """ result = { "status": "success", "os": {}, "cpu": {}, "memory": {}, "motherboard": {}, "storage": [], "network": [], "gpu": [] } try: result["os"] = get_os_info() result["cpu"] = get_cpu_info() result["memory"] = get_memory_info() result["motherboard"] = get_motherboard_info() result["storage"] = get_storage_list() result["network"] = get_network_info() result["gpu"] = get_gpu_list() except Exception as e: result["status"] = "error" result["error"] = str(e) return result def get_os_info() -> Dict[str, str]: """获取操作系统信息。""" info = { "platform": platform.system(), "release": platform.release(), "version": platform.version(), "machine": platform.machine(), "processor": platform.processor() } # 尝试获取 Linux 发行版信息 if os.path.exists('/etc/os-release'): try: with open('/etc/os-release', 'r') as f: for line in f: if line.startswith('PRETTY_NAME='): info["distribution"] = line.split('=', 1)[1].strip().strip('"') break except: pass # 获取主机名 try: _, hostname, _ = execute_command(['hostname'], check_returncode=False) info["hostname"] = hostname.strip() except: info["hostname"] = "unknown" # 获取 uptime try: with open('/proc/uptime', 'r') as f: uptime_seconds = float(f.readline().split()[0]) days = int(uptime_seconds // 86400) hours = int((uptime_seconds % 86400) // 3600) minutes = int((uptime_seconds % 3600) // 60) info["uptime"] = f"{days}天 {hours}小时 {minutes}分钟" except: info["uptime"] = "unknown" return info def get_cpu_info() -> Dict[str, Any]: """获取 CPU 信息。""" info = { "model": "Unknown", "vendor": "Unknown", "architecture": "Unknown", "cores": 0, "threads": 0, "frequency_mhz": 0, "cache_size_kb": {} } # 从 /proc/cpuinfo 获取 try: cpu_data = {} with open('/proc/cpuinfo', 'r') as f: for line in f: if ':' in line: key, value = line.split(':', 1) cpu_data[key.strip()] = value.strip() info["model"] = cpu_data.get('model name', 'Unknown') info["vendor"] = cpu_data.get('vendor_id', 'Unknown') info["architecture"] = cpu_data.get('cpu architecture', platform.machine()) info["cores"] = safe_int(cpu_data.get('cpu cores', 0)) info["threads"] = safe_int(cpu_data.get('siblings', 0)) info["frequency_mhz"] = safe_int(cpu_data.get('cpu MHz', 0)) # 缓存信息 if 'cache size' in cpu_data: cache = cpu_data['cache size'] info["cache_size_kb"] = {"general": cache} except Exception as e: pass # 使用 lscpu 获取更详细的信息 if check_command_exists('lscpu'): try: _, stdout, _ = execute_command(['lscpu'], check_returncode=False, timeout=10) lscpu_data = parse_key_value_output(stdout) if 'Model name' in lscpu_data: info["model"] = lscpu_data['Model name'] if 'Architecture' in lscpu_data: info["architecture"] = lscpu_data['Architecture'] if 'CPU(s)' in lscpu_data: info["threads"] = safe_int(lscpu_data['CPU(s)']) if 'Core(s) per socket' in lscpu_data and 'Socket(s)' in lscpu_data: cores_per_socket = safe_int(lscpu_data['Core(s) per socket']) sockets = safe_int(lscpu_data['Socket(s)']) info["cores"] = cores_per_socket * sockets if 'CPU max MHz' in lscpu_data: info["max_frequency_mhz"] = safe_float(lscpu_data['CPU max MHz']) if 'CPU min MHz' in lscpu_data: info["min_frequency_mhz"] = safe_float(lscpu_data['CPU min MHz']) if 'Virtualization' in lscpu_data: info["virtualization"] = lscpu_data['Virtualization'] except: pass return info def get_memory_info() -> Dict[str, Any]: """获取内存信息。""" info = { "total_gb": 0, "available_gb": 0, "slots_total": 0, "slots_used": 0, "slots": [], "type": "Unknown", "speed_mhz": 0, "ecc_supported": False } # 从 /proc/meminfo 获取总内存 try: with open('/proc/meminfo', 'r') as f: for line in f: if line.startswith('MemTotal:'): kb = safe_int(line.split()[1]) info["total_gb"] = round(kb / 1024 / 1024, 2) elif line.startswith('MemAvailable:'): kb = safe_int(line.split()[1]) info["available_gb"] = round(kb / 1024 / 1024, 2) except: pass # 使用 dmidecode 获取详细内存信息 if check_command_exists('dmidecode'): try: _, stdout, _ = execute_command( ['dmidecode', '-t', 'memory'], check_returncode=False, timeout=15 ) memory_devices = stdout.split('Memory Device') slots = [] for device in memory_devices[1:]: # 第一个是标题,跳过 slot = {} # 解析各项属性 size_match = re.search(r'Size:\s*(\d+)\s*MB', device) if size_match: slot["size_gb"] = round(safe_int(size_match.group(1)) / 1024, 2) type_match = re.search(r'Type:\s*(DDR\d+)', device) if type_match: slot["type"] = type_match.group(1) info["type"] = type_match.group(1) speed_match = re.search(r'Speed:\s*(\d+)\s*MT/s', device) if speed_match: slot["speed_mhz"] = safe_int(speed_match.group(1)) manufacturer_match = re.search(r'Manufacturer:\s*(\S+)', device) if manufacturer_match: slot["manufacturer"] = manufacturer_match.group(1) locator_match = re.search(r'Locator:\s*(.+)', device) if locator_match: slot["locator"] = locator_match.group(1).strip() if slot and slot.get("size_gb", 0) > 0: slots.append(slot) info["slots"] = slots info["slots_used"] = len(slots) # 计算总插槽数 array_match = re.search(r'Number Of Devices:\s*(\d+)', stdout) if array_match: info["slots_total"] = safe_int(array_match.group(1)) else: info["slots_total"] = len(slots) except: pass # 使用 free 命令作为备用 if info["total_gb"] == 0 and check_command_exists('free'): try: _, stdout, _ = execute_command(['free', '-m'], check_returncode=False) lines = stdout.strip().split('\n') if len(lines) > 1: parts = lines[1].split() if len(parts) >= 2: info["total_gb"] = round(safe_int(parts[1]) / 1024, 2) except: pass # 检查 ECC 支持 try: with open('/proc/meminfo', 'r') as f: content = f.read() if 'HardwareCorrupted' in content: info["ecc_supported"] = True except: pass return info def get_motherboard_info() -> Dict[str, str]: """获取主板信息。""" info = { "manufacturer": "Unknown", "product_name": "Unknown", "version": "Unknown", "serial_number": "Unknown", "bios_vendor": "Unknown", "bios_version": "Unknown", "bios_date": "Unknown" } if check_command_exists('dmidecode'): try: # 获取主板信息 _, stdout, _ = execute_command( ['dmidecode', '-t', 'baseboard'], check_returncode=False, timeout=10 ) patterns = { "manufacturer": r'Manufacturer:\s*(.+)', "product_name": r'Product Name:\s*(.+)', "version": r'Version:\s*(.+)', "serial_number": r'Serial Number:\s*(.+)' } for key, pattern in patterns.items(): match = re.search(pattern, stdout) if match: value = match.group(1).strip() if value not in ['Not Specified', 'To be filled by O.E.M.', 'None']: info[key] = value # 获取 BIOS 信息 _, stdout, _ = execute_command( ['dmidecode', '-t', 'bios'], check_returncode=False, timeout=10 ) bios_patterns = { "bios_vendor": r'Vendor:\s*(.+)', "bios_version": r'Version:\s*(.+)', "bios_date": r'Release Date:\s*(.+)' } for key, pattern in bios_patterns.items(): match = re.search(pattern, stdout) if match: info[key] = match.group(1).strip() except: pass return info def get_storage_list() -> List[Dict[str, Any]]: """获取存储设备列表。""" devices = [] # 使用 lsblk 获取块设备列表 if check_command_exists('lsblk'): try: _, stdout, _ = execute_command( ['lsblk', '-d', '-o', 'NAME,SIZE,TYPE,MODEL,VENDOR,ROTA', '-n', '-J'], check_returncode=False, timeout=10 ) import json data = json.loads(stdout) for device in data.get('blockdevices', []): dev_info = { "name": device.get('name', 'unknown'), "path": f"/dev/{device.get('name', 'unknown')}", "size": device.get('size', 'unknown'), "type": device.get('type', 'unknown'), "model": device.get('model', 'unknown'), "vendor": device.get('vendor', 'unknown'), "is_rotational": device.get('rota', True) } devices.append(dev_info) except: pass # 备用方法:直接读取 /sys/block if not devices: try: for name in os.listdir('/sys/block'): if name.startswith(('sd', 'hd', 'nvme', 'vd')): dev_info = {"name": name, "path": f"/dev/{name}"} # 尝试读取大小 try: with open(f'/sys/block/{name}/size', 'r') as f: sectors = safe_int(f.read().strip()) size_bytes = sectors * 512 dev_info["size"] = format_bytes(size_bytes) except: dev_info["size"] = "unknown" # 判断是否为 SSD try: with open(f'/sys/block/{name}/queue/rotational', 'r') as f: dev_info["is_rotational"] = f.read().strip() == '1' dev_info["type"] = 'hdd' if dev_info["is_rotational"] else 'ssd' except: dev_info["type"] = 'unknown' devices.append(dev_info) except: pass return devices def get_network_info() -> List[Dict[str, Any]]: """获取网络接口信息。""" interfaces = [] # 使用 ip 命令 if check_command_exists('ip'): try: _, stdout, _ = execute_command( ['ip', '-j', 'link', 'show'], check_returncode=False, timeout=10 ) import json data = json.loads(stdout) for iface in data: iface_info = { "name": iface.get('ifname', 'unknown'), "state": iface.get('operstate', 'unknown'), "mac_address": iface.get('address', 'unknown'), "type": iface.get('link_type', 'unknown') } # 获取 IP 地址 if 'addr_info' in iface: ips = [] for addr in iface['addr_info']: if addr.get('family') == 'inet': ips.append(f"{addr.get('local')}/{addr.get('prefixlen', '')}") if ips: iface_info["ip_addresses"] = ips interfaces.append(iface_info) except: pass return interfaces def get_gpu_list() -> List[Dict[str, Any]]: """获取显卡列表。""" gpus = [] # 使用 lspci 查找 VGA 和 3D 控制器 if check_command_exists('lspci'): try: _, stdout, _ = execute_command( ['lspci', '-nn'], check_returncode=False, timeout=10 ) for line in stdout.split('\n'): if 'VGA' in line or '3D controller' in line or 'Display controller' in line: # 提取设备信息 parts = line.split(': ', 1) if len(parts) == 2: bus_id = parts[0].split()[0] description = parts[1] gpu_info = { "bus_id": bus_id, "description": description, "type": "integrated" if "Intel" in description else "discrete" } # 尝试获取更详细的信息 try: _, detail, _ = execute_command( ['lspci', '-v', '-s', bus_id], check_returncode=False, timeout=5 ) # 提取驱动信息 driver_match = re.search(r'Kernel driver in use:\s*(\S+)', detail) if driver_match: gpu_info["driver"] = driver_match.group(1) # 提取模块信息 modules_match = re.search(r'Kernel modules:\s*(.+)', detail) if modules_match: gpu_info["modules"] = modules_match.group(1).strip() except: pass gpus.append(gpu_info) except: pass return gpus if __name__ == '__main__': # 测试模块 import json print(json.dumps(get_system_info(), indent=2, ensure_ascii=False))