# smart_monitor.py """SMART 磁盘健康监控模块""" import subprocess import logging import re import json import sys from typing import Dict, List, Optional, Tuple from enum import Enum # Python 3.6 兼容性: dataclasses 是 3.7+ 的特性 if sys.version_info >= (3, 7): from dataclasses import dataclass else: # Python 3.6 回退: 使用普通类 def dataclass(cls): """简化的 dataclass 装饰器兼容层""" # 自动添加 __init__ annotations = getattr(cls, '__annotations__', {}) def __init__(self, **kwargs): for key, value in kwargs.items(): setattr(self, key, value) # 设置默认值 for key in annotations: if not hasattr(self, key): setattr(self, key, None) cls.__init__ = __init__ # 添加 __repr__ def __repr__(self): fields = ', '.join(f'{k}={getattr(self, k, None)!r}' for k in annotations) return f"{cls.__name__}({fields})" cls.__repr__ = __repr__ return cls logger = logging.getLogger(__name__) class HealthStatus(Enum): """磁盘健康状态""" GOOD = "良好" WARNING = "警告" DANGER = "危险" UNKNOWN = "未知" @dataclass class SmartAttribute: """SMART 属性""" id: int name: str value: int worst: int threshold: int raw_value: str status: str @dataclass class SmartInfo: """SMART 信息""" device: str model: str serial: str firmware: str health_status: HealthStatus temperature: Optional[int] = None power_on_hours: Optional[int] = None power_cycle_count: Optional[int] = None attributes: List[SmartAttribute] = None overall_health: str = "" errors: List[str] = None def __post_init__(self): if self.attributes is None: self.attributes = [] if self.errors is None: self.errors = [] class SmartMonitor: """SMART 监控类""" # 关键属性 ID CRITICAL_ATTRIBUTES = { 5: "Reallocated_Sector_Ct", # 重映射扇区数 10: "Spin_Retry_Count", # 旋转重试次数 184: "End-to-End_Error", # 端到端错误 187: "Reported_Uncorrect", # 报告不可纠正错误 188: "Command_Timeout", # 命令超时 196: "Reallocation_Event_Count", # 重映射事件计数 197: "Current_Pending_Sector", # 当前待处理扇区 198: "Offline_Uncorrectable", # 离线不可纠正 } def __init__(self): self._check_smartctl() def _check_smartctl(self) -> bool: """检查 smartctl 是否可用""" try: result = subprocess.run( ["which", "smartctl"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8', check=False ) return result.returncode == 0 except Exception: return False def is_available(self) -> bool: """检查 SMART 监控是否可用""" return self._check_smartctl() def get_disk_smart_info(self, device_path: str) -> Optional[SmartInfo]: """获取指定设备的 SMART 信息""" if not self.is_available(): logger.warning("smartctl 不可用,无法获取 SMART 信息") return None try: # 获取基本信息 result = subprocess.run( ["sudo", "smartctl", "-a", device_path], stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8', check=False, timeout=30 ) if result.returncode not in [0, 4, 8]: # 4=磁盘处于故障状态, 8=无法获取部分信息 logger.error(f"获取 SMART 信息失败: {result.stderr}") return None return self._parse_smart_output(device_path, result.stdout) except subprocess.TimeoutExpired: logger.error(f"获取 {device_path} SMART 信息超时") return None except Exception as e: logger.error(f"获取 {device_path} SMART 信息出错: {e}") return None def _parse_smart_output(self, device: str, output: str) -> SmartInfo: """解析 smartctl 输出""" info = SmartInfo( device=device, model="", serial="", firmware="", health_status=HealthStatus.UNKNOWN ) lines = output.split('\n') in_attributes = False for line in lines: line = line.strip() # 解析设备信息 if line.startswith("Device Model:"): info.model = line.split(":", 1)[1].strip() elif line.startswith("Serial Number:"): info.serial = line.split(":", 1)[1].strip() elif line.startswith("Firmware Version:"): info.firmware = line.split(":", 1)[1].strip() # 解析整体健康状态 elif "SMART overall-health self-assessment test result:" in line: result = line.split(":")[-1].strip().upper() info.overall_health = result if result == "PASSED": info.health_status = HealthStatus.GOOD else: info.health_status = HealthStatus.DANGER # 解析温度 elif "Temperature:" in line or "Current Drive Temperature:" in line: match = re.search(r'(\d+)\s*(?:Celsius|°C|C)', line) if match: info.temperature = int(match.group(1)) # 解析通电时间 elif line.startswith("Power_On_Hours"): match = re.search(r'(\d+)\s*\(', line) if match: info.power_on_hours = int(match.group(1)) # 解析通电次数 elif line.startswith("Power_Cycle_Count"): match = re.search(r'(\d+)\s*\(', line) if match: info.power_cycle_count = int(match.group(1)) # 解析 SMART 属性表 elif "Vendor Specific SMART Attributes" in line: in_attributes = True continue if in_attributes and line.startswith("0x"): attr = self._parse_attribute_line(line) if attr: info.attributes.append(attr) # 检查关键属性 if attr.id in self.CRITICAL_ATTRIBUTES: if attr.raw_value != "0" and int(attr.raw_value) > 0: if info.health_status != HealthStatus.DANGER: info.health_status = HealthStatus.WARNING info.errors.append( f"{attr.name}: {attr.raw_value}" ) # 如果没有明确的整体健康状态,根据属性判断 if info.health_status == HealthStatus.UNKNOWN: if info.errors: info.health_status = HealthStatus.WARNING else: info.health_status = HealthStatus.GOOD return info def _parse_attribute_line(self, line: str) -> Optional[SmartAttribute]: """解析 SMART 属性行""" # 格式: 0x05 0x64 0x64 0x64 0x0000 0x0000 0x0000 000 parts = line.split() if len(parts) < 10: return None try: attr_id = int(parts[0], 16) name = parts[1] value = int(parts[3]) worst = int(parts[4]) threshold = int(parts[5]) raw_value = parts[9] status = "正常" if value > threshold else "预警" return SmartAttribute( id=attr_id, name=name, value=value, worst=worst, threshold=threshold, raw_value=raw_value, status=status ) except (ValueError, IndexError): return None def get_all_disks_smart(self) -> Dict[str, SmartInfo]: """获取所有支持 SMART 的磁盘信息""" results = {} try: # 获取所有块设备 result = subprocess.run( ["lsblk", "-d", "-n", "-o", "NAME,TYPE,ROTA"], stdout=subprocess.PIPE, stderr=subprocess.PIPE, encoding='utf-8', check=True ) for line in result.stdout.strip().split('\n'): parts = line.split() if len(parts) >= 3: name = parts[0] device_type = parts[1] is_rotational = parts[2] == "1" # 只对物理磁盘检查 SMART if device_type in ["disk", "rom"] and is_rotational: device_path = f"/dev/{name}" smart_info = self.get_disk_smart_info(device_path) if smart_info: results[device_path] = smart_info except Exception as e: logger.error(f"获取磁盘列表失败: {e}") return results def get_health_score(self, smart_info: SmartInfo) -> int: """计算健康评分 (0-100)""" if not smart_info: return 0 score = 100 # 根据整体健康状态调整 if smart_info.health_status == HealthStatus.DANGER: score = 30 elif smart_info.health_status == HealthStatus.WARNING: score = 70 # 根据错误数量调整 error_count = len(smart_info.errors) if smart_info.errors else 0 score -= error_count * 10 # 确保分数在合理范围内 return max(0, min(100, score)) def get_temperature_status(self, temp: Optional[int]) -> Tuple[str, str]: """获取温度状态""" if temp is None: return "未知", "gray" elif temp < 35: return "正常", "green" elif temp < 45: return "偏高", "orange" elif temp < 55: return "高温", "red" else: return "危险", "red"