""" ServerGuard - 报告生成模块 负责将检测结果格式化为各种输出格式。 """ import json import csv import os from typing import Dict, Any, List from datetime import datetime from io import StringIO try: from rich.console import Console from rich.table import Table from rich.panel import Panel from rich import box HAS_RICH = True except ImportError: HAS_RICH = False class ReportGenerator: """报告生成器类""" def __init__(self): self.console = Console() if HAS_RICH else None def generate_report(self, data: Dict[str, Any], format_type: str = 'text') -> str: """ 根据指定格式生成报告。 Args: data: 检测结果数据 format_type: 报告格式 (text, json, csv, html) Returns: str: 格式化的报告内容 """ if format_type == 'json': return self._format_json_report(data) elif format_type == 'csv': return self._format_csv_report(data) elif format_type == 'html': return self._format_html_report(data) else: return self._format_text_report(data) def save_report(self, data: Dict[str, Any], format_type: str, filepath: str): """ 保存报告到文件。 Args: data: 检测结果数据 format_type: 报告格式 filepath: 输出文件路径 """ report = self.generate_report(data, format_type) # 确保目录存在 os.makedirs(os.path.dirname(filepath) or '.', exist_ok=True) with open(filepath, 'w', encoding='utf-8') as f: f.write(report) def _format_json_report(self, data: Dict[str, Any]) -> str: """生成 JSON 格式报告。""" return json.dumps(data, indent=2, ensure_ascii=False, default=str) def _format_csv_report(self, data: Dict[str, Any]) -> str: """生成 CSV 格式报告。""" output = StringIO() writer = csv.writer(output) # 写入基本信息 writer.writerow(['ServerGuard Diagnostic Report']) writer.writerow(['Scan Type', data.get('scan_type', 'unknown')]) writer.writerow(['Timestamp', data.get('timestamp', '')]) writer.writerow([]) # 写入各模块数据 for module_name, module_data in data.get('modules', {}).items(): writer.writerow([f'Module: {module_name.upper()}']) writer.writerow(['Status', module_data.get('status', 'unknown')]) # 展平嵌套字典 self._write_dict_to_csv(writer, module_data, prefix='') writer.writerow([]) return output.getvalue() def _write_dict_to_csv(self, writer, data: Dict[str, Any], prefix: str = ''): """辅助函数:将字典写入 CSV""" for key, value in data.items(): if key == 'status': continue full_key = f"{prefix}.{key}" if prefix else key if isinstance(value, dict): self._write_dict_to_csv(writer, value, full_key) elif isinstance(value, list): writer.writerow([full_key, ', '.join(str(v) for v in value)]) else: writer.writerow([full_key, value]) def _format_text_report(self, data: Dict[str, Any]) -> str: """生成纯文本格式报告。""" lines = [] # 报告头部 lines.append("=" * 70) lines.append("ServerGuard 硬件健康诊断报告") lines.append("=" * 70) lines.append(f"扫描类型: {data.get('scan_type', 'unknown').upper()}") lines.append(f"生成时间: {data.get('timestamp', '')}") if 'stress_duration' in data: lines.append(f"压力测试时长: {data['stress_duration']} 秒") lines.append("=" * 70) lines.append("") # 各模块结果 for module_name, module_data in data.get('modules', {}).items(): lines.append(f"\n[{module_name.upper()}]") lines.append("-" * 70) status = module_data.get('status', 'unknown') status_symbol = '✓' if status == 'success' else '⚠' if status == 'warning' else '✗' lines.append(f"状态: {status_symbol} {status.upper()}") if 'error' in module_data: lines.append(f"错误: {module_data['error']}") # 格式化模块特定数据 self._format_module_text(lines, module_name, module_data) lines.append("") # 报告尾部 lines.append("=" * 70) lines.append("报告结束") lines.append("=" * 70) return '\n'.join(lines) def _format_module_text(self, lines: List[str], module_name: str, data: Dict[str, Any]): """格式化特定模块的文本输出""" if module_name == 'system': if 'cpu' in data: cpu = data['cpu'] lines.append(f"CPU: {cpu.get('model', 'N/A')}") lines.append(f" 核心数: {cpu.get('cores', 'N/A')} 核 / {cpu.get('threads', 'N/A')} 线程") if 'memory' in data: mem = data['memory'] lines.append(f"内存: 总计 {mem.get('total_gb', 'N/A')} GB, {mem.get('slots_used', 'N/A')} 个插槽") if 'storage' in data: lines.append(f"存储设备: {len(data['storage'])} 个设备") elif module_name == 'cpu': if 'temperature' in data: temp = data['temperature'] lines.append(f"CPU 温度: {temp.get('current_c', 'N/A')}°C") if 'mce_errors' in data: mce = data['mce_errors'] lines.append(f"MCE 错误: {mce.get('count', 0)} 个") if 'stress_test' in data: stress = data['stress_test'] lines.append(f"压力测试: {'通过' if stress.get('passed') else '失败'}") lines.append(f" 运行时长: {stress.get('duration_seconds', 'N/A')} 秒") elif module_name == 'memory': if 'ecc_status' in data: ecc = data['ecc_status'] lines.append(f"ECC 支持: {'是' if ecc.get('supported') else '否'}") if ecc.get('errors', 0) > 0: lines.append(f"ECC 错误: {ecc['errors']} 个") if 'stress_test' in data: st = data['stress_test'] lines.append(f"内存压力测试: {'通过' if st.get('passed') else '失败'}") if st.get('tool'): lines.append(f" 使用工具: {st.get('tool')}") if st.get('size_mb'): lines.append(f" 测试大小: {st.get('size_mb')} MB") elif module_name == 'storage': for device in data.get('devices', []): lines.append(f"设备 {device.get('name', 'N/A')}:") lines.append(f" 型号: {device.get('model', 'N/A')}") lines.append(f" 健康状态: {device.get('health', 'N/A')}") if 'smart_status' in device: smart = device['smart_status'] lines.append(f" SMART: {smart.get('overall', 'N/A')}") elif module_name == 'sensors': if 'temperatures' in data: lines.append("温度传感器:") for name, value in data['temperatures'].items(): lines.append(f" {name}: {value}°C") if 'voltages' in data: lines.append("电压传感器:") for name, value in data['voltages'].items(): lines.append(f" {name}: {value}V") elif module_name == 'logs': # 显示错误统计 if 'hardware_errors' in data: errors = data['hardware_errors'] total = sum(errors.values()) lines.append(f"硬件错误总计: {total} 个") for error_type, count in errors.items(): if count > 0: lines.append(f" {error_type}: {count} 个") # 显示关键事件详情 if 'critical_events' in data and data['critical_events']: lines.append("") lines.append("关键事件 (按严重程度排序):") lines.append("-" * 60) for idx, event in enumerate(data['critical_events'][:5], 1): # 只显示前5个 severity = event.get('severity', 'unknown') description = event.get('description', 'Unknown') message = event.get('message', '')[:150] # 限制长度 source = event.get('source', 'unknown') severity_symbol = '🔴' if severity == 'critical' else '🟡' lines.append(f"{severity_symbol} [{idx}] {description} ({source})") lines.append(f" {message}") lines.append("") # 显示详细的错误信息 if 'error_details' in data and data['error_details']: lines.append("") lines.append("详细错误日志:") lines.append("-" * 60) # 优先显示关键类型的错误 priority_types = ['kernel_panics', 'cpu_errors', 'memory_errors', 'power_errors'] for error_type in priority_types: if error_type in data['error_details'] and data['error_details'][error_type]: lines.append(f"\n[{error_type}]:") for idx, err in enumerate(data['error_details'][error_type][:3], 1): message = err.get('message', '')[:200] source = err.get('source', 'unknown') lines.append(f" {idx}. [{source}] {message}") def _format_html_report(self, data: Dict[str, Any]) -> str: """生成 HTML 格式报告。""" html_parts = [] # HTML 头部 html_parts.append(""" ServerGuard 诊断报告 """) # 报告头部 html_parts.append(f"""

🔧 ServerGuard 硬件健康诊断报告

扫描类型: {data.get('scan_type', 'unknown').upper()} | 生成时间: {data.get('timestamp', '')}
""") # 各模块结果 for module_name, module_data in data.get('modules', {}).items(): status = module_data.get('status', 'unknown') status_class = f'status-{status}' html_parts.append(f"""
{module_name.upper()} {status.upper()}
""") if 'error' in module_data: html_parts.append(f"""
错误: {module_data['error']}
""") else: html_parts.append('
') self._format_module_html(html_parts, module_name, module_data) html_parts.append('
') html_parts.append('
') # 报告尾部 html_parts.append(""" """) return '\n'.join(html_parts) def _format_module_html(self, html_parts: List[str], module_name: str, data: Dict[str, Any]): """格式化特定模块的 HTML 输出""" for key, value in data.items(): if key == 'status': continue display_key = key.replace('_', ' ').title() if isinstance(value, dict): html_parts.append(f"""
{display_key}
{len(value)} 项数据
""") elif isinstance(value, list): html_parts.append(f"""
{display_key}
{len(value)} 个项目
""") else: html_parts.append(f"""
{display_key}
{value}
""")