Files
ServerGuard/reporter.py
2026-03-02 16:40:22 +08:00

421 lines
15 KiB
Python

"""
ServerGuard - 报告生成模块
负责将检测结果格式化为各种输出格式。
"""
import json
import csv
import os
from typing import Dict, Any, List
from datetime import datetime
from io import StringIO
try:
from rich.console import Console
from rich.table import Table
from rich.panel import Panel
from rich import box
HAS_RICH = True
except ImportError:
HAS_RICH = False
class ReportGenerator:
"""报告生成器类"""
def __init__(self):
self.console = Console() if HAS_RICH else None
def generate_report(self, data: Dict[str, Any], format_type: str = 'text') -> str:
"""
根据指定格式生成报告。
Args:
data: 检测结果数据
format_type: 报告格式 (text, json, csv, html)
Returns:
str: 格式化的报告内容
"""
if format_type == 'json':
return self._format_json_report(data)
elif format_type == 'csv':
return self._format_csv_report(data)
elif format_type == 'html':
return self._format_html_report(data)
else:
return self._format_text_report(data)
def save_report(self, data: Dict[str, Any], format_type: str, filepath: str):
"""
保存报告到文件。
Args:
data: 检测结果数据
format_type: 报告格式
filepath: 输出文件路径
"""
report = self.generate_report(data, format_type)
# 确保目录存在
os.makedirs(os.path.dirname(filepath) or '.', exist_ok=True)
with open(filepath, 'w', encoding='utf-8') as f:
f.write(report)
def _format_json_report(self, data: Dict[str, Any]) -> str:
"""生成 JSON 格式报告。"""
return json.dumps(data, indent=2, ensure_ascii=False, default=str)
def _format_csv_report(self, data: Dict[str, Any]) -> str:
"""生成 CSV 格式报告。"""
output = StringIO()
writer = csv.writer(output)
# 写入基本信息
writer.writerow(['ServerGuard Diagnostic Report'])
writer.writerow(['Scan Type', data.get('scan_type', 'unknown')])
writer.writerow(['Timestamp', data.get('timestamp', '')])
writer.writerow([])
# 写入各模块数据
for module_name, module_data in data.get('modules', {}).items():
writer.writerow([f'Module: {module_name.upper()}'])
writer.writerow(['Status', module_data.get('status', 'unknown')])
# 展平嵌套字典
self._write_dict_to_csv(writer, module_data, prefix='')
writer.writerow([])
return output.getvalue()
def _write_dict_to_csv(self, writer, data: Dict[str, Any], prefix: str = ''):
"""辅助函数:将字典写入 CSV"""
for key, value in data.items():
if key == 'status':
continue
full_key = f"{prefix}.{key}" if prefix else key
if isinstance(value, dict):
self._write_dict_to_csv(writer, value, full_key)
elif isinstance(value, list):
writer.writerow([full_key, ', '.join(str(v) for v in value)])
else:
writer.writerow([full_key, value])
def _format_text_report(self, data: Dict[str, Any]) -> str:
"""生成纯文本格式报告。"""
lines = []
# 报告头部
lines.append("=" * 70)
lines.append("ServerGuard 硬件健康诊断报告")
lines.append("=" * 70)
lines.append(f"扫描类型: {data.get('scan_type', 'unknown').upper()}")
lines.append(f"生成时间: {data.get('timestamp', '')}")
if 'stress_duration' in data:
lines.append(f"压力测试时长: {data['stress_duration']}")
lines.append("=" * 70)
lines.append("")
# 各模块结果
for module_name, module_data in data.get('modules', {}).items():
lines.append(f"\n[{module_name.upper()}]")
lines.append("-" * 70)
status = module_data.get('status', 'unknown')
status_symbol = '' if status == 'success' else '' if status == 'warning' else ''
lines.append(f"状态: {status_symbol} {status.upper()}")
if 'error' in module_data:
lines.append(f"错误: {module_data['error']}")
# 格式化模块特定数据
self._format_module_text(lines, module_name, module_data)
lines.append("")
# 报告尾部
lines.append("=" * 70)
lines.append("报告结束")
lines.append("=" * 70)
return '\n'.join(lines)
def _format_module_text(self, lines: List[str], module_name: str, data: Dict[str, Any]):
"""格式化特定模块的文本输出"""
if module_name == 'system':
if 'cpu' in data:
cpu = data['cpu']
lines.append(f"CPU: {cpu.get('model', 'N/A')}")
lines.append(f" 核心数: {cpu.get('cores', 'N/A')} 核 / {cpu.get('threads', 'N/A')} 线程")
if 'memory' in data:
mem = data['memory']
lines.append(f"内存: 总计 {mem.get('total_gb', 'N/A')} GB, {mem.get('slots_used', 'N/A')} 个插槽")
if 'storage' in data:
lines.append(f"存储设备: {len(data['storage'])} 个设备")
elif module_name == 'cpu':
if 'temperature' in data:
temp = data['temperature']
lines.append(f"CPU 温度: {temp.get('current_c', 'N/A')}°C")
if 'mce_errors' in data:
mce = data['mce_errors']
lines.append(f"MCE 错误: {mce.get('count', 0)}")
if 'stress_test' in data:
stress = data['stress_test']
lines.append(f"压力测试: {'通过' if stress.get('passed') else '失败'}")
lines.append(f" 运行时长: {stress.get('duration_seconds', 'N/A')}")
elif module_name == 'memory':
if 'ecc_status' in data:
ecc = data['ecc_status']
lines.append(f"ECC 支持: {'' if ecc.get('supported') else ''}")
if ecc.get('errors', 0) > 0:
lines.append(f"ECC 错误: {ecc['errors']}")
if 'stress_test' in data:
st = data['stress_test']
lines.append(f"内存压力测试: {'通过' if st.get('passed') else '失败'}")
if st.get('tool'):
lines.append(f" 使用工具: {st.get('tool')}")
if st.get('size_mb'):
lines.append(f" 测试大小: {st.get('size_mb')} MB")
elif module_name == 'storage':
for device in data.get('devices', []):
lines.append(f"设备 {device.get('name', 'N/A')}:")
lines.append(f" 型号: {device.get('model', 'N/A')}")
lines.append(f" 健康状态: {device.get('health', 'N/A')}")
if 'smart_status' in device:
smart = device['smart_status']
lines.append(f" SMART: {smart.get('overall', 'N/A')}")
elif module_name == 'sensors':
if 'temperatures' in data:
lines.append("温度传感器:")
for name, value in data['temperatures'].items():
lines.append(f" {name}: {value}°C")
if 'voltages' in data:
lines.append("电压传感器:")
for name, value in data['voltages'].items():
lines.append(f" {name}: {value}V")
elif module_name == 'logs':
# 显示错误统计
if 'hardware_errors' in data:
errors = data['hardware_errors']
total = sum(errors.values())
lines.append(f"硬件错误总计: {total}")
for error_type, count in errors.items():
if count > 0:
lines.append(f" {error_type}: {count}")
# 显示关键事件详情
if 'critical_events' in data and data['critical_events']:
lines.append("")
lines.append("关键事件 (按严重程度排序):")
lines.append("-" * 60)
for idx, event in enumerate(data['critical_events'][:5], 1): # 只显示前5个
severity = event.get('severity', 'unknown')
description = event.get('description', 'Unknown')
message = event.get('message', '')[:150] # 限制长度
source = event.get('source', 'unknown')
severity_symbol = '🔴' if severity == 'critical' else '🟡'
lines.append(f"{severity_symbol} [{idx}] {description} ({source})")
lines.append(f" {message}")
lines.append("")
# 显示详细的错误信息
if 'error_details' in data and data['error_details']:
lines.append("")
lines.append("详细错误日志:")
lines.append("-" * 60)
# 优先显示关键类型的错误
priority_types = ['kernel_panics', 'cpu_errors', 'memory_errors', 'power_errors']
for error_type in priority_types:
if error_type in data['error_details'] and data['error_details'][error_type]:
lines.append(f"\n[{error_type}]:")
for idx, err in enumerate(data['error_details'][error_type][:3], 1):
message = err.get('message', '')[:200]
source = err.get('source', 'unknown')
lines.append(f" {idx}. [{source}] {message}")
def _format_html_report(self, data: Dict[str, Any]) -> str:
"""生成 HTML 格式报告。"""
html_parts = []
# HTML 头部
html_parts.append("""<!DOCTYPE html>
<html lang="zh-CN">
<head>
<meta charset="UTF-8">
<meta name="viewport" content="width=device-width, initial-scale=1.0">
<title>ServerGuard 诊断报告</title>
<style>
body {
font-family: -apple-system, BlinkMacSystemFont, 'Segoe UI', Roboto, 'Helvetica Neue', Arial, sans-serif;
line-height: 1.6;
color: #333;
max-width: 1200px;
margin: 0 auto;
padding: 20px;
background: #f5f5f5;
}
.header {
background: linear-gradient(135deg, #667eea 0%, #764ba2 100%);
color: white;
padding: 30px;
border-radius: 10px;
margin-bottom: 20px;
}
.header h1 {
margin: 0;
font-size: 2em;
}
.header .meta {
margin-top: 10px;
opacity: 0.9;
}
.module {
background: white;
border-radius: 8px;
padding: 20px;
margin-bottom: 20px;
box-shadow: 0 2px 4px rgba(0,0,0,0.1);
}
.module-header {
display: flex;
justify-content: space-between;
align-items: center;
border-bottom: 2px solid #eee;
padding-bottom: 10px;
margin-bottom: 15px;
}
.module-title {
font-size: 1.5em;
font-weight: bold;
color: #444;
}
.status {
padding: 5px 15px;
border-radius: 20px;
font-weight: bold;
font-size: 0.9em;
}
.status-success { background: #d4edda; color: #155724; }
.status-warning { background: #fff3cd; color: #856404; }
.status-error { background: #f8d7da; color: #721c24; }
.status-unknown { background: #e2e3e5; color: #383d41; }
.info-grid {
display: grid;
grid-template-columns: repeat(auto-fit, minmax(300px, 1fr));
gap: 15px;
}
.info-item {
padding: 10px;
background: #f8f9fa;
border-radius: 5px;
}
.info-label {
font-weight: bold;
color: #666;
font-size: 0.9em;
}
.info-value {
margin-top: 5px;
font-size: 1.1em;
}
.footer {
text-align: center;
color: #666;
margin-top: 30px;
padding: 20px;
}
.error-box {
background: #f8d7da;
border: 1px solid #f5c6cb;
color: #721c24;
padding: 15px;
border-radius: 5px;
margin: 10px 0;
}
</style>
</head>
<body>""")
# 报告头部
html_parts.append(f"""
<div class="header">
<h1>🔧 ServerGuard 硬件健康诊断报告</h1>
<div class="meta">
扫描类型: {data.get('scan_type', 'unknown').upper()} |
生成时间: {data.get('timestamp', '')}
</div>
</div>""")
# 各模块结果
for module_name, module_data in data.get('modules', {}).items():
status = module_data.get('status', 'unknown')
status_class = f'status-{status}'
html_parts.append(f"""
<div class="module">
<div class="module-header">
<span class="module-title">{module_name.upper()}</span>
<span class="status {status_class}">{status.upper()}</span>
</div>""")
if 'error' in module_data:
html_parts.append(f"""
<div class="error-box">
<strong>错误:</strong> {module_data['error']}
</div>""")
else:
html_parts.append(' <div class="info-grid">')
self._format_module_html(html_parts, module_name, module_data)
html_parts.append(' </div>')
html_parts.append(' </div>')
# 报告尾部
html_parts.append("""
<div class="footer">
<p>由 ServerGuard 生成</p>
</div>
</body>
</html>""")
return '\n'.join(html_parts)
def _format_module_html(self, html_parts: List[str], module_name: str, data: Dict[str, Any]):
"""格式化特定模块的 HTML 输出"""
for key, value in data.items():
if key == 'status':
continue
display_key = key.replace('_', ' ').title()
if isinstance(value, dict):
html_parts.append(f"""
<div class="info-item">
<div class="info-label">{display_key}</div>
<div class="info-value">{len(value)} 项数据</div>
</div>""")
elif isinstance(value, list):
html_parts.append(f"""
<div class="info-item">
<div class="info-label">{display_key}</div>
<div class="info-value">{len(value)} 个项目</div>
</div>""")
else:
html_parts.append(f"""
<div class="info-item">
<div class="info-label">{display_key}</div>
<div class="info-value">{value}</div>
</div>""")