Files
ServerGuard/main.py
2026-03-02 15:50:51 +08:00

592 lines
20 KiB
Python
Executable File

#!/usr/bin/env python3
"""
ServerGuard - 服务器硬件健康诊断系统
主程序入口,负责命令行参数解析、模块调度和报告生成。
使用方法:
sudo python3 main.py --quick # 快速检测
sudo python3 main.py --full # 全面诊断(含压力测试)
sudo python3 main.py --module cpu # 仅检测 CPU
sudo python3 main.py --full --format json --output report.json
"""
import argparse
import sys
import os
from typing import Optional, Dict, Any
from utils import setup_logging, check_root_privileges, get_file_timestamp
from reporter import ReportGenerator
def parse_arguments() -> argparse.Namespace:
"""
解析命令行参数。
Returns:
argparse.Namespace: 解析后的参数
"""
parser = argparse.ArgumentParser(
prog='ServerGuard',
description='服务器硬件健康诊断系统 - 用于诊断 Linux 服务器硬件故障',
formatter_class=argparse.RawDescriptionHelpFormatter,
epilog="""
示例:
%(prog)s --quick # 快速硬件检测
%(prog)s --full # 全面诊断(含压力测试)
%(prog)s --module cpu # 仅检测 CPU
%(prog)s --module memory,storage # 检测内存和存储
%(prog)s --full --format json # 生成 JSON 格式报告
%(prog)s --list-modules # 列出所有可用模块
注意: 大多数诊断功能需要 root 权限,请使用 sudo 运行。
"""
)
# 主要操作模式(互斥)
mode_group = parser.add_mutually_exclusive_group(required=True)
mode_group.add_argument(
'--quick', '-q',
action='store_true',
help='快速检测模式(非侵入性,仅收集信息)'
)
mode_group.add_argument(
'--full', '-f',
action='store_true',
help='全面诊断模式(包含压力测试,耗时较长)'
)
mode_group.add_argument(
'--module', '-m',
type=str,
metavar='MODULE',
help='运行指定模块,多个模块用逗号分隔 (cpu,memory,storage,sensors,gpu,logs)'
)
mode_group.add_argument(
'--list-modules', '-l',
action='store_true',
help='列出所有可用的检测模块'
)
# 报告选项
parser.add_argument(
'--format',
type=str,
choices=['text', 'json', 'csv', 'html'],
default='text',
help='报告格式 (默认: text)'
)
parser.add_argument(
'--output', '-o',
type=str,
metavar='FILE',
help='输出文件路径(不指定则输出到控制台)'
)
parser.add_argument(
'--log',
type=str,
metavar='FILE',
default='/var/log/serverguard.log',
help='日志文件路径 (默认: /var/log/serverguard.log)'
)
# 测试参数
parser.add_argument(
'--stress-duration',
type=int,
default=300,
metavar='SECONDS',
help='压力测试持续时间,单位秒 (默认: 300)'
)
parser.add_argument(
'--verbose', '-v',
action='store_true',
help='显示详细输出'
)
parser.add_argument(
'--yes', '-y',
action='store_true',
help='自动确认所有警告提示(如压力测试警告)'
)
return parser.parse_args()
def list_available_modules():
"""列出所有可用的检测模块。"""
modules = {
'system': '系统信息概览',
'cpu': 'CPU 检测与压力测试',
'memory': '内存检测与压力测试',
'storage': '存储设备检测',
'sensors': '电源与传感器监控',
'gpu': '显卡检测',
'logs': '日志分析'
}
print("可用的检测模块:")
print("-" * 40)
for name, description in modules.items():
print(f" {name:12} - {description}")
print("-" * 40)
print("\n使用示例:")
print(" sudo python3 main.py --module cpu")
print(" sudo python3 main.py --module cpu,memory,storage")
def confirm_stress_test(duration: int, auto_confirm: bool = False) -> bool:
"""
确认是否执行压力测试。
Args:
duration: 压力测试持续时间
auto_confirm: 是否自动确认
Returns:
bool: 是否继续
"""
if auto_confirm:
return True
print("\n" + "=" * 60)
print("警告:即将执行压力测试")
print("=" * 60)
print(f"测试持续时间: {duration} 秒 ({duration // 60} 分钟)")
print("此测试将占用大量系统资源,可能导致:")
print(" - CPU 和内存使用率接近 100%")
print(" - 系统响应变慢")
print(" - 温度升高")
print("建议在维护窗口期进行,并确保服务器可接受高负载。")
print("=" * 60)
try:
response = input("\n是否继续? [y/N]: ").strip().lower()
return response in ('y', 'yes')
except KeyboardInterrupt:
print("\n操作已取消")
return False
def run_module(module_name: str, stress_test: bool = False, stress_duration: int = 300, progress_logger=None) -> Dict[str, Any]:
"""
运行指定的检测模块。
Args:
module_name: 模块名称
stress_test: 是否执行压力测试
stress_duration: 压力测试持续时间
progress_logger: 进度日志记录器
Returns:
Dict[str, Any]: 模块检测结果
"""
import logging
logger = logging.getLogger(__name__)
module_map = {
'system': 'modules.system_info',
'cpu': 'modules.cpu',
'memory': 'modules.memory',
'storage': 'modules.storage',
'sensors': 'modules.sensors',
'gpu': 'modules.gpu',
'logs': 'modules.log_analyzer'
}
if module_name not in module_map:
logger.error(f"未知模块: {module_name}")
return {"status": "error", "error": f"未知模块: {module_name}"}
# 记录开始
if progress_logger:
progress_logger.start(f"模块检测: {module_name}")
try:
logger.info(f"[MODULE START] {module_name} - stress_test={stress_test}, duration={stress_duration}")
module = __import__(module_map[module_name], fromlist=[''])
result = None
if module_name == 'system':
if progress_logger:
progress_logger.log("开始收集系统信息")
result = module.get_system_info()
elif module_name == 'cpu':
if progress_logger:
progress_logger.log(f"开始CPU检测 (stress_test={stress_test})")
result = module.run_cpu_check(stress_test, stress_duration)
elif module_name == 'memory':
if progress_logger:
progress_logger.log(f"开始内存检测 (stress_test={stress_test})")
result = module.run_memory_check(stress_test, stress_duration)
elif module_name == 'storage':
if progress_logger:
progress_logger.log("开始存储设备检测")
result = module.run_storage_check()
elif module_name == 'sensors':
if progress_logger:
progress_logger.log("开始传感器监控")
result = module.run_sensors_check()
elif module_name == 'gpu':
if progress_logger:
progress_logger.log("开始显卡检测")
result = module.run_gpu_check()
elif module_name == 'logs':
if progress_logger:
progress_logger.log("开始日志分析")
result = module.analyze_logs()
# 记录结果
status = result.get("status", "unknown") if result else "unknown"
logger.info(f"[MODULE END] {module_name} - Status: {status}")
# 如果结果中有错误信息,记录到日志
if result and result.get("error"):
logger.error(f"[MODULE ERROR] {module_name}: {result['error']}")
if progress_logger:
progress_logger.end(status=status)
return result
except Exception as e:
error_msg = f"运行模块 {module_name} 时出错: {e}"
logger.exception(error_msg)
if progress_logger:
progress_logger.end(status="error", message=str(e))
return {"status": "error", "error": str(e)}
def run_quick_check(progress_logger=None) -> Dict[str, Any]:
"""
执行快速检测(非侵入性)。
Args:
progress_logger: 进度日志记录器
Returns:
Dict[str, Any]: 检测结果
"""
import logging
logger = logging.getLogger(__name__)
# 记录检测开始
logger.info("=" * 70)
logger.info("[DIAGNOSTIC START] 快速硬件检测")
logger.info("=" * 70)
if progress_logger:
progress_logger.start("快速硬件检测")
print("正在执行快速硬件检测...")
print("-" * 60)
results = {
"scan_type": "quick",
"timestamp": get_file_timestamp(),
"modules": {}
}
modules_to_run = ['system', 'cpu', 'memory', 'storage', 'sensors', 'gpu', 'logs']
total_modules = len(modules_to_run)
for idx, module_name in enumerate(modules_to_run, 1):
logger.info(f"[PROGRESS] 模块 {idx}/{total_modules}: {module_name}")
print(f"正在检测: {module_name}...", end=' ', flush=True)
try:
result = run_module(module_name, stress_test=False, progress_logger=progress_logger)
results["modules"][module_name] = result
status = result.get("status", "unknown")
if status == "success":
print("[完成]")
logger.info(f"[MODULE SUCCESS] {module_name}")
elif status == "warning":
print("[警告]")
logger.warning(f"[MODULE WARNING] {module_name}")
elif status == "error":
print("[错误]")
logger.error(f"[MODULE ERROR] {module_name}: {result.get('error', 'Unknown error')}")
else:
print(f"[{status}]")
logger.info(f"[MODULE {status.upper()}] {module_name}")
except Exception as e:
error_msg = f"模块 {module_name} 执行失败: {e}"
logger.exception(error_msg)
results["modules"][module_name] = {"status": "error", "error": str(e)}
print("[失败]")
print("-" * 60)
logger.info("[DIAGNOSTIC END] 快速硬件检测完成")
logger.info("=" * 70)
if progress_logger:
progress_logger.end(status="success")
return results
def run_full_diagnostic(stress_duration: int, auto_confirm: bool = False, progress_logger=None) -> Dict[str, Any]:
"""
执行全面诊断(包含压力测试)。
Args:
stress_duration: 压力测试持续时间
auto_confirm: 是否自动确认
progress_logger: 进度日志记录器
Returns:
Dict[str, Any]: 检测结果
"""
import logging
logger = logging.getLogger(__name__)
if not confirm_stress_test(stress_duration, auto_confirm):
print("诊断已取消")
sys.exit(0)
# 记录诊断开始
logger.info("=" * 70)
logger.info(f"[DIAGNOSTIC START] 全面硬件诊断 (stress_duration={stress_duration}s)")
logger.info("=" * 70)
if progress_logger:
progress_logger.start("全面硬件诊断")
print("\n正在执行全面硬件诊断...")
print("=" * 60)
results = {
"scan_type": "full",
"timestamp": get_file_timestamp(),
"stress_duration": stress_duration,
"modules": {}
}
# 先执行快速检测
modules_to_run = ['system', 'cpu', 'memory', 'storage', 'sensors', 'gpu', 'logs']
total_modules = len(modules_to_run)
for idx, module_name in enumerate(modules_to_run, 1):
logger.info(f"[PROGRESS] 模块 {idx}/{total_modules}: {module_name}")
print(f"\n正在检测: {module_name}...")
try:
# CPU 和内存执行压力测试
do_stress = module_name in ['cpu', 'memory']
if do_stress:
logger.warning(f"[STRESS TEST] {module_name} 压力测试即将开始 (duration={stress_duration}s)")
print(f" ⚠ 即将执行 {module_name} 压力测试,持续时间 {stress_duration}")
result = run_module(module_name, stress_test=do_stress, stress_duration=stress_duration, progress_logger=progress_logger)
results["modules"][module_name] = result
status = result.get("status", "unknown")
print(f" 状态: {status}")
if status == "success":
logger.info(f"[MODULE SUCCESS] {module_name}")
elif status == "warning":
logger.warning(f"[MODULE WARNING] {module_name}")
elif status == "error":
logger.error(f"[MODULE ERROR] {module_name}: {result.get('error', 'Unknown error')}")
except Exception as e:
error_msg = f"模块 {module_name} 执行失败: {e}"
logger.exception(error_msg)
results["modules"][module_name] = {"status": "error", "error": str(e)}
print(f" 状态: 失败 - {e}")
print("\n" + "=" * 60)
logger.info("[DIAGNOSTIC END] 全面硬件诊断完成")
logger.info("=" * 70)
if progress_logger:
progress_logger.end(status="success")
return results
def run_specific_modules(module_list: str, stress_duration: int, progress_logger=None) -> Dict[str, Any]:
"""
运行指定的模块列表。
Args:
module_list: 逗号分隔的模块名称
stress_duration: 压力测试持续时间
progress_logger: 进度日志记录器
Returns:
Dict[str, Any]: 检测结果
"""
import logging
logger = logging.getLogger(__name__)
modules = [m.strip() for m in module_list.split(',')]
logger.info("=" * 70)
logger.info(f"[DIAGNOSTIC START] 自定义模块检测: {', '.join(modules)}")
logger.info("=" * 70)
if progress_logger:
progress_logger.start(f"自定义模块检测: {', '.join(modules)}")
results = {
"scan_type": "custom",
"timestamp": get_file_timestamp(),
"modules": {}
}
print(f"正在执行自定义模块检测: {', '.join(modules)}")
print("-" * 60)
total_modules = len(modules)
for idx, module_name in enumerate(modules, 1):
logger.info(f"[PROGRESS] 模块 {idx}/{total_modules}: {module_name}")
print(f"正在检测: {module_name}...", end=' ', flush=True)
try:
result = run_module(module_name, stress_test=False, progress_logger=progress_logger)
results["modules"][module_name] = result
status = result.get("status", "unknown")
print(f"[{status}]")
if status == "error":
logger.error(f"[MODULE ERROR] {module_name}: {result.get('error', 'Unknown error')}")
elif status == "warning":
logger.warning(f"[MODULE WARNING] {module_name}")
else:
logger.info(f"[MODULE SUCCESS] {module_name}")
except Exception as e:
error_msg = f"模块 {module_name} 执行失败: {e}"
logger.exception(error_msg)
results["modules"][module_name] = {"status": "error", "error": str(e)}
print(f"[失败: {e}]")
print("-" * 60)
logger.info("[DIAGNOSTIC END] 自定义模块检测完成")
logger.info("=" * 70)
if progress_logger:
progress_logger.end(status="success")
return results
def main():
"""程序主入口。"""
args = parse_arguments()
# 设置日志
log_level = logging.DEBUG if args.verbose else logging.INFO
setup_logging(
log_file=args.log if check_root_privileges() else None,
level=log_level,
console_output=True
)
logger = logging.getLogger(__name__)
# 创建进度日志记录器
from utils import ProgressLogger
progress_logger = ProgressLogger(log_file=args.log)
# 记录程序启动信息
logger.info("=" * 70)
logger.info("ServerGuard 启动")
logger.info(f"命令行参数: {' '.join(sys.argv)}")
logger.info(f"工作目录: {os.getcwd()}")
logger.info(f"Python版本: {sys.version}")
logger.info(f"用户ID: {os.getuid()}, 是否为root: {check_root_privileges()}")
logger.info("=" * 70)
# 列出模块
if args.list_modules:
list_available_modules()
sys.exit(0)
# 检查 root 权限警告
if not check_root_privileges():
logger.warning("未以 root 权限运行,部分功能可能受限")
print("警告: 未检测到 root 权限,部分硬件信息可能无法获取")
print("建议: 使用 sudo 运行以获得完整的诊断信息\n")
# 执行诊断
try:
progress_logger.start("ServerGuard 诊断任务")
if args.quick:
logger.info("执行模式: 快速检测")
results = run_quick_check(progress_logger=progress_logger)
elif args.full:
logger.info("执行模式: 全面诊断")
results = run_full_diagnostic(args.stress_duration, args.yes, progress_logger=progress_logger)
elif args.module:
logger.info(f"执行模式: 自定义模块 - {args.module}")
results = run_specific_modules(args.module, args.stress_duration, progress_logger=progress_logger)
else:
print("请指定操作模式: --quick, --full, --module 或 --list-modules")
sys.exit(1)
# 记录诊断完成
progress_logger.end(status="success")
logger.info("诊断执行完成,正在生成报告...")
# 生成报告
generator = ReportGenerator()
if args.output:
generator.save_report(results, args.format, args.output)
logger.info(f"报告已保存至: {args.output}")
print(f"\n报告已保存至: {args.output}")
else:
report = generator.generate_report(results, args.format)
print("\n" + "=" * 60)
print("诊断报告")
print("=" * 60)
print(report)
# 返回退出码:如果有错误则返回 1
has_errors = any(
m.get("status") == "error"
for m in results.get("modules", {}).values()
)
logger.info("=" * 70)
logger.info(f"ServerGuard 正常结束 - 退出码: {1 if has_errors else 0}")
logger.info("=" * 70)
sys.exit(1 if has_errors else 0)
except KeyboardInterrupt:
logger.warning("操作已被用户中断 (KeyboardInterrupt)")
print("\n\n操作已被用户中断")
sys.exit(130)
except Exception as e:
error_msg = f"程序执行过程中发生严重错误: {e}"
logger.exception(error_msg)
logger.error("=" * 70)
logger.error(f"异常类型: {type(e).__name__}")
logger.error(f"异常信息: {e}")
logger.error("=" * 70)
# 尝试记录当前进度
if progress_logger and progress_logger.current_step:
logger.error(f"错误发生时正在执行的步骤: {progress_logger.current_step}")
print(f"\n错误: {e}")
print(f"\n详细错误信息已记录到日志: {args.log}")
sys.exit(1)
if __name__ == '__main__':
import logging
main()