#!/usr/bin/env python3 """ ServerGuard - 服务器硬件健康诊断系统 主程序入口,负责命令行参数解析、模块调度和报告生成。 使用方法: sudo python3 main.py --quick # 快速检测 sudo python3 main.py --full # 全面诊断(含压力测试) sudo python3 main.py --module cpu # 仅检测 CPU sudo python3 main.py --full --format json --output report.json """ import argparse import sys import os from typing import Optional, Dict, Any from utils import setup_logging, check_root_privileges, get_file_timestamp from reporter import ReportGenerator def parse_arguments() -> argparse.Namespace: """ 解析命令行参数。 Returns: argparse.Namespace: 解析后的参数 """ parser = argparse.ArgumentParser( prog='ServerGuard', description='服务器硬件健康诊断系统 - 用于诊断 Linux 服务器硬件故障', formatter_class=argparse.RawDescriptionHelpFormatter, epilog=""" 示例: %(prog)s --quick # 快速硬件检测 %(prog)s --full # 全面诊断(含压力测试) %(prog)s --module cpu # 仅检测 CPU %(prog)s --module memory,storage # 检测内存和存储 %(prog)s --full --format json # 生成 JSON 格式报告 %(prog)s --list-modules # 列出所有可用模块 注意: 大多数诊断功能需要 root 权限,请使用 sudo 运行。 """ ) # 主要操作模式(互斥) mode_group = parser.add_mutually_exclusive_group(required=True) mode_group.add_argument( '--quick', '-q', action='store_true', help='快速检测模式(非侵入性,仅收集信息)' ) mode_group.add_argument( '--full', '-f', action='store_true', help='全面诊断模式(包含压力测试,耗时较长)' ) mode_group.add_argument( '--module', '-m', type=str, metavar='MODULE', help='运行指定模块,多个模块用逗号分隔 (cpu,memory,storage,sensors,gpu,logs)' ) mode_group.add_argument( '--list-modules', '-l', action='store_true', help='列出所有可用的检测模块' ) # 报告选项 parser.add_argument( '--format', type=str, choices=['text', 'json', 'csv', 'html'], default='text', help='报告格式 (默认: text)' ) parser.add_argument( '--output', '-o', type=str, metavar='FILE', help='输出文件路径(不指定则输出到控制台)' ) parser.add_argument( '--log', type=str, metavar='FILE', default='/var/log/serverguard.log', help='日志文件路径 (默认: /var/log/serverguard.log)' ) # 测试参数 parser.add_argument( '--stress-duration', type=int, default=300, metavar='SECONDS', help='压力测试持续时间,单位秒 (默认: 300)' ) parser.add_argument( '--verbose', '-v', action='store_true', help='显示详细输出' ) parser.add_argument( '--yes', '-y', action='store_true', help='自动确认所有警告提示(如压力测试警告)' ) return parser.parse_args() def list_available_modules(): """列出所有可用的检测模块。""" modules = { 'system': '系统信息概览', 'cpu': 'CPU 检测与压力测试', 'memory': '内存检测与压力测试', 'storage': '存储设备检测', 'sensors': '电源与传感器监控', 'gpu': '显卡检测', 'logs': '日志分析' } print("可用的检测模块:") print("-" * 40) for name, description in modules.items(): print(f" {name:12} - {description}") print("-" * 40) print("\n使用示例:") print(" sudo python3 main.py --module cpu") print(" sudo python3 main.py --module cpu,memory,storage") def confirm_stress_test(duration: int, auto_confirm: bool = False) -> bool: """ 确认是否执行压力测试。 Args: duration: 压力测试持续时间 auto_confirm: 是否自动确认 Returns: bool: 是否继续 """ if auto_confirm: return True print("\n" + "=" * 60) print("警告:即将执行压力测试") print("=" * 60) print(f"测试持续时间: {duration} 秒 ({duration // 60} 分钟)") print("此测试将占用大量系统资源,可能导致:") print(" - CPU 和内存使用率接近 100%") print(" - 系统响应变慢") print(" - 温度升高") print("建议在维护窗口期进行,并确保服务器可接受高负载。") print("=" * 60) try: response = input("\n是否继续? [y/N]: ").strip().lower() return response in ('y', 'yes') except KeyboardInterrupt: print("\n操作已取消") return False def run_module(module_name: str, stress_test: bool = False, stress_duration: int = 300, progress_logger=None) -> Dict[str, Any]: """ 运行指定的检测模块。 Args: module_name: 模块名称 stress_test: 是否执行压力测试 stress_duration: 压力测试持续时间 progress_logger: 进度日志记录器 Returns: Dict[str, Any]: 模块检测结果 """ import logging logger = logging.getLogger(__name__) module_map = { 'system': 'modules.system_info', 'cpu': 'modules.cpu', 'memory': 'modules.memory', 'storage': 'modules.storage', 'sensors': 'modules.sensors', 'gpu': 'modules.gpu', 'logs': 'modules.log_analyzer' } if module_name not in module_map: logger.error(f"未知模块: {module_name}") return {"status": "error", "error": f"未知模块: {module_name}"} # 记录开始 if progress_logger: progress_logger.start(f"模块检测: {module_name}") try: logger.info(f"[MODULE START] {module_name} - stress_test={stress_test}, duration={stress_duration}") module = __import__(module_map[module_name], fromlist=['']) result = None if module_name == 'system': if progress_logger: progress_logger.log("开始收集系统信息") result = module.get_system_info() elif module_name == 'cpu': if progress_logger: progress_logger.log(f"开始CPU检测 (stress_test={stress_test})") result = module.run_cpu_check(stress_test, stress_duration) elif module_name == 'memory': if progress_logger: progress_logger.log(f"开始内存检测 (stress_test={stress_test})") result = module.run_memory_check(stress_test, stress_duration) elif module_name == 'storage': if progress_logger: progress_logger.log("开始存储设备检测") result = module.run_storage_check() elif module_name == 'sensors': if progress_logger: progress_logger.log("开始传感器监控") result = module.run_sensors_check() elif module_name == 'gpu': if progress_logger: progress_logger.log("开始显卡检测") result = module.run_gpu_check() elif module_name == 'logs': if progress_logger: progress_logger.log("开始日志分析") result = module.analyze_logs() # 记录结果 status = result.get("status", "unknown") if result else "unknown" logger.info(f"[MODULE END] {module_name} - Status: {status}") # 如果结果中有错误信息,记录到日志 if result and result.get("error"): logger.error(f"[MODULE ERROR] {module_name}: {result['error']}") if progress_logger: progress_logger.end(status=status) return result except Exception as e: error_msg = f"运行模块 {module_name} 时出错: {e}" logger.exception(error_msg) if progress_logger: progress_logger.end(status="error", message=str(e)) return {"status": "error", "error": str(e)} def run_quick_check(progress_logger=None) -> Dict[str, Any]: """ 执行快速检测(非侵入性)。 Args: progress_logger: 进度日志记录器 Returns: Dict[str, Any]: 检测结果 """ import logging logger = logging.getLogger(__name__) # 记录检测开始 logger.info("=" * 70) logger.info("[DIAGNOSTIC START] 快速硬件检测") logger.info("=" * 70) if progress_logger: progress_logger.start("快速硬件检测") print("正在执行快速硬件检测...") print("-" * 60) results = { "scan_type": "quick", "timestamp": get_file_timestamp(), "modules": {} } modules_to_run = ['system', 'cpu', 'memory', 'storage', 'sensors', 'gpu', 'logs'] total_modules = len(modules_to_run) for idx, module_name in enumerate(modules_to_run, 1): logger.info(f"[PROGRESS] 模块 {idx}/{total_modules}: {module_name}") print(f"正在检测: {module_name}...", end=' ', flush=True) try: result = run_module(module_name, stress_test=False, progress_logger=progress_logger) results["modules"][module_name] = result status = result.get("status", "unknown") if status == "success": print("[完成]") logger.info(f"[MODULE SUCCESS] {module_name}") elif status == "warning": print("[警告]") logger.warning(f"[MODULE WARNING] {module_name}") elif status == "error": print("[错误]") logger.error(f"[MODULE ERROR] {module_name}: {result.get('error', 'Unknown error')}") else: print(f"[{status}]") logger.info(f"[MODULE {status.upper()}] {module_name}") except Exception as e: error_msg = f"模块 {module_name} 执行失败: {e}" logger.exception(error_msg) results["modules"][module_name] = {"status": "error", "error": str(e)} print("[失败]") print("-" * 60) logger.info("[DIAGNOSTIC END] 快速硬件检测完成") logger.info("=" * 70) if progress_logger: progress_logger.end(status="success") return results def run_full_diagnostic(stress_duration: int, auto_confirm: bool = False, progress_logger=None) -> Dict[str, Any]: """ 执行全面诊断(包含压力测试)。 Args: stress_duration: 压力测试持续时间 auto_confirm: 是否自动确认 progress_logger: 进度日志记录器 Returns: Dict[str, Any]: 检测结果 """ import logging logger = logging.getLogger(__name__) if not confirm_stress_test(stress_duration, auto_confirm): print("诊断已取消") sys.exit(0) # 记录诊断开始 logger.info("=" * 70) logger.info(f"[DIAGNOSTIC START] 全面硬件诊断 (stress_duration={stress_duration}s)") logger.info("=" * 70) if progress_logger: progress_logger.start("全面硬件诊断") print("\n正在执行全面硬件诊断...") print("=" * 60) results = { "scan_type": "full", "timestamp": get_file_timestamp(), "stress_duration": stress_duration, "modules": {} } # 先执行快速检测 modules_to_run = ['system', 'cpu', 'memory', 'storage', 'sensors', 'gpu', 'logs'] total_modules = len(modules_to_run) for idx, module_name in enumerate(modules_to_run, 1): logger.info(f"[PROGRESS] 模块 {idx}/{total_modules}: {module_name}") print(f"\n正在检测: {module_name}...") try: # CPU 和内存执行压力测试 do_stress = module_name in ['cpu', 'memory'] if do_stress: logger.warning(f"[STRESS TEST] {module_name} 压力测试即将开始 (duration={stress_duration}s)") print(f" ⚠ 即将执行 {module_name} 压力测试,持续时间 {stress_duration} 秒") result = run_module(module_name, stress_test=do_stress, stress_duration=stress_duration, progress_logger=progress_logger) results["modules"][module_name] = result status = result.get("status", "unknown") print(f" 状态: {status}") if status == "success": logger.info(f"[MODULE SUCCESS] {module_name}") elif status == "warning": logger.warning(f"[MODULE WARNING] {module_name}") elif status == "error": logger.error(f"[MODULE ERROR] {module_name}: {result.get('error', 'Unknown error')}") except Exception as e: error_msg = f"模块 {module_name} 执行失败: {e}" logger.exception(error_msg) results["modules"][module_name] = {"status": "error", "error": str(e)} print(f" 状态: 失败 - {e}") print("\n" + "=" * 60) logger.info("[DIAGNOSTIC END] 全面硬件诊断完成") logger.info("=" * 70) if progress_logger: progress_logger.end(status="success") return results def run_specific_modules(module_list: str, stress_duration: int, progress_logger=None) -> Dict[str, Any]: """ 运行指定的模块列表。 Args: module_list: 逗号分隔的模块名称 stress_duration: 压力测试持续时间 progress_logger: 进度日志记录器 Returns: Dict[str, Any]: 检测结果 """ import logging logger = logging.getLogger(__name__) modules = [m.strip() for m in module_list.split(',')] logger.info("=" * 70) logger.info(f"[DIAGNOSTIC START] 自定义模块检测: {', '.join(modules)}") logger.info("=" * 70) if progress_logger: progress_logger.start(f"自定义模块检测: {', '.join(modules)}") results = { "scan_type": "custom", "timestamp": get_file_timestamp(), "modules": {} } print(f"正在执行自定义模块检测: {', '.join(modules)}") print("-" * 60) total_modules = len(modules) for idx, module_name in enumerate(modules, 1): logger.info(f"[PROGRESS] 模块 {idx}/{total_modules}: {module_name}") print(f"正在检测: {module_name}...", end=' ', flush=True) try: result = run_module(module_name, stress_test=False, progress_logger=progress_logger) results["modules"][module_name] = result status = result.get("status", "unknown") print(f"[{status}]") if status == "error": logger.error(f"[MODULE ERROR] {module_name}: {result.get('error', 'Unknown error')}") elif status == "warning": logger.warning(f"[MODULE WARNING] {module_name}") else: logger.info(f"[MODULE SUCCESS] {module_name}") except Exception as e: error_msg = f"模块 {module_name} 执行失败: {e}" logger.exception(error_msg) results["modules"][module_name] = {"status": "error", "error": str(e)} print(f"[失败: {e}]") print("-" * 60) logger.info("[DIAGNOSTIC END] 自定义模块检测完成") logger.info("=" * 70) if progress_logger: progress_logger.end(status="success") return results def main(): """程序主入口。""" args = parse_arguments() # 设置日志 log_level = logging.DEBUG if args.verbose else logging.INFO setup_logging( log_file=args.log if check_root_privileges() else None, level=log_level, console_output=True ) logger = logging.getLogger(__name__) # 创建进度日志记录器 from utils import ProgressLogger progress_logger = ProgressLogger(log_file=args.log) # 记录程序启动信息 logger.info("=" * 70) logger.info("ServerGuard 启动") logger.info(f"命令行参数: {' '.join(sys.argv)}") logger.info(f"工作目录: {os.getcwd()}") logger.info(f"Python版本: {sys.version}") logger.info(f"用户ID: {os.getuid()}, 是否为root: {check_root_privileges()}") logger.info("=" * 70) # 列出模块 if args.list_modules: list_available_modules() sys.exit(0) # 检查 root 权限警告 if not check_root_privileges(): logger.warning("未以 root 权限运行,部分功能可能受限") print("警告: 未检测到 root 权限,部分硬件信息可能无法获取") print("建议: 使用 sudo 运行以获得完整的诊断信息\n") # 执行诊断 try: progress_logger.start("ServerGuard 诊断任务") if args.quick: logger.info("执行模式: 快速检测") results = run_quick_check(progress_logger=progress_logger) elif args.full: logger.info("执行模式: 全面诊断") results = run_full_diagnostic(args.stress_duration, args.yes, progress_logger=progress_logger) elif args.module: logger.info(f"执行模式: 自定义模块 - {args.module}") results = run_specific_modules(args.module, args.stress_duration, progress_logger=progress_logger) else: print("请指定操作模式: --quick, --full, --module 或 --list-modules") sys.exit(1) # 记录诊断完成 progress_logger.end(status="success") logger.info("诊断执行完成,正在生成报告...") # 生成报告 generator = ReportGenerator() if args.output: generator.save_report(results, args.format, args.output) logger.info(f"报告已保存至: {args.output}") print(f"\n报告已保存至: {args.output}") else: report = generator.generate_report(results, args.format) print("\n" + "=" * 60) print("诊断报告") print("=" * 60) print(report) # 返回退出码:如果有错误则返回 1 has_errors = any( m.get("status") == "error" for m in results.get("modules", {}).values() ) logger.info("=" * 70) logger.info(f"ServerGuard 正常结束 - 退出码: {1 if has_errors else 0}") logger.info("=" * 70) sys.exit(1 if has_errors else 0) except KeyboardInterrupt: logger.warning("操作已被用户中断 (KeyboardInterrupt)") print("\n\n操作已被用户中断") sys.exit(130) except Exception as e: error_msg = f"程序执行过程中发生严重错误: {e}" logger.exception(error_msg) logger.error("=" * 70) logger.error(f"异常类型: {type(e).__name__}") logger.error(f"异常信息: {e}") logger.error("=" * 70) # 尝试记录当前进度 if progress_logger and progress_logger.current_step: logger.error(f"错误发生时正在执行的步骤: {progress_logger.current_step}") print(f"\n错误: {e}") print(f"\n详细错误信息已记录到日志: {args.log}") sys.exit(1) if __name__ == '__main__': import logging main()