diff --git a/modules/log_analyzer.py b/modules/log_analyzer.py index 47cd4f6..55ceeef 100644 --- a/modules/log_analyzer.py +++ b/modules/log_analyzer.py @@ -84,7 +84,6 @@ HARDWARE_ERROR_PATTERNS = { r'under.*voltage', r'over.*voltage', r'brownout', - r'power.*button', ], "kernel_panics": [ r'Kernel panic', @@ -116,6 +115,7 @@ def analyze_logs() -> Dict[str, Any]: "dmesg_analysis": {}, "journal_analysis": {}, "hardware_errors": {}, + "error_details": {}, # 新增:详细的错误信息 "critical_events": [], "summary": {} } @@ -130,6 +130,9 @@ def analyze_logs() -> Dict[str, Any]: # 汇总错误统计 result["hardware_errors"] = summarize_errors(result) + # 收集详细的错误信息 + result["error_details"] = collect_error_details(result) + # 识别关键事件 result["critical_events"] = identify_critical_events(result) @@ -152,6 +155,46 @@ def analyze_logs() -> Dict[str, Any]: return result +def is_false_positive(line: str, error_type: str) -> bool: + """ + 判断是否为误报的正常日志。 + + Args: + line: 日志行内容 + error_type: 错误类型 + + Returns: + bool: 是否为误报 + """ + line_lower = line.lower() + + # kernel_panics 类型的误报 + if error_type == "kernel_panics": + # "Call Trace:" 本身不是错误,需要结合上下文 + # 如果只出现 Call Trace 但没有其他错误关键字,可能是正常的堆栈跟踪 + if 'call trace:' in line_lower: + # 检查是否包含真正的错误关键字 + real_error_keywords = ['oops', 'panic', 'bug:', 'warning:', 'error:'] + if not any(kw in line_lower for kw in real_error_keywords): + return True + + # power_errors 类型的误报 + if error_type == "power_errors": + # 电源按钮输入是正常的 ACPI 事件 + if 'power button' in line_lower and 'input:' in line_lower: + return True + if 'pwrf' in line_lower: # ACPI Power Button + return True + + # cpu_errors 类型的误报 + if error_type == "cpu_errors": + # CPU 温度信息是正常的传感器读数 + if 'temperature' in line_lower and 'above' not in line_lower and 'critical' not in line_lower: + return True + + return False + + def analyze_dmesg() -> Dict[str, Any]: """分析 dmesg 输出。""" result = { @@ -192,15 +235,21 @@ def analyze_dmesg() -> Dict[str, Any]: for error_type, patterns in HARDWARE_ERROR_PATTERNS.items(): for pattern in patterns: if re.search(pattern, line, re.IGNORECASE): + # 过滤误报 + if is_false_positive(line, error_type): + continue + result["error_counts"][error_type] += 1 - # 保存最近的一些错误 + # 保存最近的一些错误(限制数量避免输出过多) if len(result["recent_errors"]) < 50: error_entry = { "type": error_type, "message": line.strip(), - "pattern": pattern + "pattern": pattern, + "source": "dmesg" } + # 去重 if error_entry not in result["recent_errors"]: result["recent_errors"].append(error_entry) break @@ -250,13 +299,19 @@ def analyze_journalctl() -> Dict[str, Any]: for error_type, patterns in HARDWARE_ERROR_PATTERNS.items(): for pattern in patterns: if re.search(pattern, line, re.IGNORECASE): + # 过滤误报 + if is_false_positive(line, error_type): + continue + result["error_counts"][error_type] += 1 if len(result["recent_errors"]) < 50: error_entry = { "type": error_type, - "message": line.strip() + "message": line.strip(), + "source": "journalctl" } + # 去重 if error_entry not in result["recent_errors"]: result["recent_errors"].append(error_entry) break @@ -330,6 +385,37 @@ def summarize_errors(analysis_result: Dict[str, Any]) -> Dict[str, int]: return summary +def collect_error_details(analysis_result: Dict[str, Any]) -> Dict[str, List[Dict[str, str]]]: + """ + 收集详细的错误信息,按类型分类。 + + Returns: + Dict[str, List[Dict]]: 按错误类型分类的详细错误信息 + """ + error_details = {} + + # 收集所有错误 + all_errors = [] + all_errors.extend(analysis_result.get("dmesg_analysis", {}).get("recent_errors", [])) + all_errors.extend(analysis_result.get("journal_analysis", {}).get("recent_errors", [])) + + # 按类型分类 + for error_type in HARDWARE_ERROR_PATTERNS.keys(): + type_errors = [e for e in all_errors if e["type"] == error_type] + if type_errors: + # 去重并限制数量 + seen_messages = set() + unique_errors = [] + for err in type_errors: + msg = err["message"] + if msg not in seen_messages and len(unique_errors) < 10: + seen_messages.add(msg) + unique_errors.append(err) + error_details[error_type] = unique_errors + + return error_details + + def identify_critical_events(analysis_result: Dict[str, Any]) -> List[Dict[str, Any]]: """识别需要立即关注的关键事件。""" critical_events = [] @@ -341,34 +427,42 @@ def identify_critical_events(analysis_result: Dict[str, Any]) -> List[Dict[str, # 定义关键错误模式 critical_patterns = [ - (r'Kernel panic', 'kernel_panic', '内核崩溃'), - (r'hardlockup', 'hard_lockup', 'CPU 硬死锁'), - (r'softlockup', 'soft_lockup', 'CPU 软死锁'), - (r'thermal.*shutdown', 'thermal_shutdown', '过热关机'), - (r'Hardware Error', 'hardware_error', '硬件错误'), - (r'Fatal.*PCIe', 'pcie_fatal', 'PCIe 致命错误'), - (r'I/O error.*sector', 'disk_io_error', '磁盘 I/O 错误'), - (r'Uncorrectable.*error', 'uncorrectable_error', '不可纠正错误'), - (r'out of memory.*kill', 'oom_kill', 'OOM 进程杀死'), - (r'GPU.*fallen.*bus', 'gpu_disconnect', 'GPU 断开连接'), + (r'Kernel panic', 'kernel_panic', '内核崩溃', 'critical'), + (r'hardlockup', 'hard_lockup', 'CPU 硬死锁', 'critical'), + (r'softlockup', 'soft_lockup', 'CPU 软死锁', 'critical'), + (r'thermal.*shutdown', 'thermal_shutdown', '过热关机', 'critical'), + (r'Hardware Error', 'hardware_error', '硬件错误', 'critical'), + (r'Fatal.*PCIe', 'pcie_fatal', 'PCIe 致命错误', 'critical'), + (r'I/O error.*sector', 'disk_io_error', '磁盘 I/O 错误', 'warning'), + (r'Uncorrectable.*error', 'uncorrectable_error', '不可纠正错误', 'warning'), + (r'out of memory.*kill', 'oom_kill', 'OOM 进程杀死', 'warning'), + (r'GPU.*fallen.*bus', 'gpu_disconnect', 'GPU 断开连接', 'warning'), + (r'Machine check', 'mce_error', '机器检查错误(MCE)', 'critical'), + (r'EDAC.*error', 'edac_error', '内存 EDAC 错误', 'warning'), ] for error in all_errors: message = error.get("message", "") - for pattern, event_type, description in critical_patterns: + for pattern, event_type, description, severity in critical_patterns: if re.search(pattern, message, re.IGNORECASE): event = { "type": event_type, "description": description, - "message": message[:200], # 限制长度 - "source": "dmesg" if error in analysis_result.get("dmesg_analysis", {}).get("recent_errors", []) else "journal" + "severity": severity, + "message": message[:300], # 限制长度 + "source": error.get("source", "unknown"), + "error_category": error.get("type", "unknown") } # 避免重复 if event not in critical_events: critical_events.append(event) - return critical_events + # 按严重程度排序 + severity_order = {'critical': 0, 'warning': 1, 'info': 2} + critical_events.sort(key=lambda x: severity_order.get(x.get('severity', 'info'), 3)) + + return critical_events[:20] # 限制数量 def get_kernel_panic_logs() -> List[Dict[str, str]]: @@ -409,7 +503,7 @@ def get_kernel_panic_logs() -> List[Dict[str, str]]: return panics -def get_hardware_error_logs() -> Dict[str, List[str]]: +def get_hardware_error_logs() -> Dict[str, List[Dict[str, str]]]: """获取特定类型的硬件错误日志。""" result = { "mce_errors": [], @@ -425,25 +519,37 @@ def get_hardware_error_logs() -> Dict[str, List[str]]: for line in stdout.split('\n'): # MCE 错误 if re.search(r'Machine check|CMCI|hardware error', line, re.IGNORECASE): - result["mce_errors"].append(line.strip()) + result["mce_errors"].append({ + "message": line.strip(), + "type": "mce" + }) # ECC 错误 if re.search(r'ECC|EDAC|memory error', line, re.IGNORECASE): - result["ecc_errors"].append(line.strip()) + result["ecc_errors"].append({ + "message": line.strip(), + "type": "ecc" + }) # I/O 错误 if re.search(r'I/O error|ata.*error|blk_update', line, re.IGNORECASE): - result["io_errors"].append(line.strip()) + result["io_errors"].append({ + "message": line.strip(), + "type": "io" + }) # 热错误 if re.search(r'thermal|overheat|critical temp', line, re.IGNORECASE): - result["thermal_errors"].append(line.strip()) + result["thermal_errors"].append({ + "message": line.strip(), + "type": "thermal" + }) except: pass # 限制数量 for key in result: - result[key] = result[key][:20] + result[key] = result[key][:10] return result diff --git a/reporter.py b/reporter.py index 7ae2afd..6b7c9b0 100644 --- a/reporter.py +++ b/reporter.py @@ -204,6 +204,7 @@ class ReportGenerator: lines.append(f" {name}: {value}V") elif module_name == 'logs': + # 显示错误统计 if 'hardware_errors' in data: errors = data['hardware_errors'] total = sum(errors.values()) @@ -211,6 +212,38 @@ class ReportGenerator: for error_type, count in errors.items(): if count > 0: lines.append(f" {error_type}: {count} 个") + + # 显示关键事件详情 + if 'critical_events' in data and data['critical_events']: + lines.append("") + lines.append("关键事件 (按严重程度排序):") + lines.append("-" * 60) + for idx, event in enumerate(data['critical_events'][:5], 1): # 只显示前5个 + severity = event.get('severity', 'unknown') + description = event.get('description', 'Unknown') + message = event.get('message', '')[:150] # 限制长度 + source = event.get('source', 'unknown') + + severity_symbol = '🔴' if severity == 'critical' else '🟡' + lines.append(f"{severity_symbol} [{idx}] {description} ({source})") + lines.append(f" {message}") + lines.append("") + + # 显示详细的错误信息 + if 'error_details' in data and data['error_details']: + lines.append("") + lines.append("详细错误日志:") + lines.append("-" * 60) + + # 优先显示关键类型的错误 + priority_types = ['kernel_panics', 'cpu_errors', 'memory_errors', 'power_errors'] + for error_type in priority_types: + if error_type in data['error_details'] and data['error_details'][error_type]: + lines.append(f"\n[{error_type}]:") + for idx, err in enumerate(data['error_details'][error_type][:3], 1): + message = err.get('message', '')[:200] + source = err.get('source', 'unknown') + lines.append(f" {idx}. [{source}] {message}") def _format_html_report(self, data: Dict[str, Any]) -> str: """生成 HTML 格式报告。"""