From deb4fa0e7940a34b9ee9dd76019c0c863439d2c8 Mon Sep 17 00:00:00 2001 From: zj <1052308357@qq.com> Date: Mon, 2 Mar 2026 15:50:51 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A2=9E=E5=BC=BA=E6=97=A5=E5=BF=97?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- README.md | 57 ++++++++++++ main.py | 220 +++++++++++++++++++++++++++++++++++++++++----- modules/cpu.py | 26 ++++++ modules/memory.py | 19 ++++ utils.py | 87 +++++++++++++++++- 5 files changed, 384 insertions(+), 25 deletions(-) diff --git a/README.md b/README.md index bcce43c..92af609 100644 --- a/README.md +++ b/README.md @@ -19,6 +19,7 @@ ServerGuard 是一款基于 Python 的 Linux 命令行工具,用于诊断服 - **Python**: 3.6 或更高版本 - **权限**: root 权限(大多数硬件诊断功能需要) - **架构**: x86_64 (AMD64) +- **磁盘空间**: 至少 100MB 可用空间(用于日志和报告) ## 克隆及安装方法 @@ -313,6 +314,62 @@ python3 quick_test.py python3 -m unittest discover tests/ -v ``` +## 日志记录 + +ServerGuard 会实时记录详细的测试日志,方便排查问题: + +### 日志文件位置 + +默认日志文件路径:`/var/log/serverguard.log` + +```bash +# 查看实时日志 +tail -f /var/log/serverguard.log + +# 查看最近 100 行日志 +tail -n 100 /var/log/serverguard.log +``` + +### 日志内容说明 + +日志包含以下关键信息: +- **启动信息**: 程序启动时间、命令行参数、Python版本等 +- **进度记录**: 每个模块的检测开始和结束时间 +- **详细步骤**: 压力测试前后的温度、执行的命令等 +- **错误信息**: 详细的异常信息和堆栈跟踪 + +### 日志示例 + +``` +2026-03-02 15:41:28 - ServerGuard 启动 +2026-03-02 15:41:28 - [DIAGNOSTIC START] 全面硬件诊断 +2026-03-02 15:41:28 - [PROGRESS] 模块 1/7: system +2026-03-02 15:41:28 - [MODULE START] cpu - stress_test=True, duration=300 +2026-03-02 15:41:28 - [CPU STRESS TEST] 测试前温度: 45°C +2026-03-02 15:46:28 - [CPU STRESS TEST] 测试后温度: 78°C +2026-03-02 15:46:28 - [CPU STRESS TEST] 压力测试通过 +``` + +### 排查机器重启/关机问题 + +如果测试过程中机器意外关机或重启,查看日志文件: + +```bash +# 查找最后的日志记录 +tail -n 50 /var/log/serverguard.log + +# 查找压力测试相关的日志 +grep "STRESS TEST" /var/log/serverguard.log + +# 查找错误信息 +grep -i "error\|exception\|failed" /var/log/serverguard.log +``` + +**常见情况分析:** +- 如果在 `[CPU STRESS TEST]` 或 `[MEMORY STRESS TEST]` 后没有 `[END]` 记录,说明机器在压力测试中出现问题 +- 查看压力测试前后的温度记录,判断是否因过热导致关机 +- 查看系统日志 `dmesg` 或 `/var/log/messages` 确认硬件错误 + ## 故障排除 ### 1. 提示 "未找到压力测试工具" diff --git a/main.py b/main.py index 86a11a5..99ceeff 100755 --- a/main.py +++ b/main.py @@ -167,7 +167,7 @@ def confirm_stress_test(duration: int, auto_confirm: bool = False) -> bool: return False -def run_module(module_name: str, stress_test: bool = False, stress_duration: int = 300) -> Dict[str, Any]: +def run_module(module_name: str, stress_test: bool = False, stress_duration: int = 300, progress_logger=None) -> Dict[str, Any]: """ 运行指定的检测模块。 @@ -175,6 +175,7 @@ def run_module(module_name: str, stress_test: bool = False, stress_duration: int module_name: 模块名称 stress_test: 是否执行压力测试 stress_duration: 压力测试持续时间 + progress_logger: 进度日志记录器 Returns: Dict[str, Any]: 模块检测结果 @@ -196,39 +197,89 @@ def run_module(module_name: str, stress_test: bool = False, stress_duration: int logger.error(f"未知模块: {module_name}") return {"status": "error", "error": f"未知模块: {module_name}"} + # 记录开始 + if progress_logger: + progress_logger.start(f"模块检测: {module_name}") + try: + logger.info(f"[MODULE START] {module_name} - stress_test={stress_test}, duration={stress_duration}") + module = __import__(module_map[module_name], fromlist=['']) + result = None if module_name == 'system': - return module.get_system_info() + if progress_logger: + progress_logger.log("开始收集系统信息") + result = module.get_system_info() elif module_name == 'cpu': - return module.run_cpu_check(stress_test, stress_duration) + if progress_logger: + progress_logger.log(f"开始CPU检测 (stress_test={stress_test})") + result = module.run_cpu_check(stress_test, stress_duration) elif module_name == 'memory': - return module.run_memory_check(stress_test, stress_duration) + if progress_logger: + progress_logger.log(f"开始内存检测 (stress_test={stress_test})") + result = module.run_memory_check(stress_test, stress_duration) elif module_name == 'storage': - return module.run_storage_check() + if progress_logger: + progress_logger.log("开始存储设备检测") + result = module.run_storage_check() elif module_name == 'sensors': - return module.run_sensors_check() + if progress_logger: + progress_logger.log("开始传感器监控") + result = module.run_sensors_check() elif module_name == 'gpu': - return module.run_gpu_check() + if progress_logger: + progress_logger.log("开始显卡检测") + result = module.run_gpu_check() elif module_name == 'logs': - return module.analyze_logs() + if progress_logger: + progress_logger.log("开始日志分析") + result = module.analyze_logs() + + # 记录结果 + status = result.get("status", "unknown") if result else "unknown" + logger.info(f"[MODULE END] {module_name} - Status: {status}") + + # 如果结果中有错误信息,记录到日志 + if result and result.get("error"): + logger.error(f"[MODULE ERROR] {module_name}: {result['error']}") + + if progress_logger: + progress_logger.end(status=status) + + return result except Exception as e: - logger.error(f"运行模块 {module_name} 时出错: {e}") + error_msg = f"运行模块 {module_name} 时出错: {e}" + logger.exception(error_msg) + + if progress_logger: + progress_logger.end(status="error", message=str(e)) + return {"status": "error", "error": str(e)} -def run_quick_check() -> Dict[str, Any]: +def run_quick_check(progress_logger=None) -> Dict[str, Any]: """ 执行快速检测(非侵入性)。 + Args: + progress_logger: 进度日志记录器 + Returns: Dict[str, Any]: 检测结果 """ import logging logger = logging.getLogger(__name__) + # 记录检测开始 + logger.info("=" * 70) + logger.info("[DIAGNOSTIC START] 快速硬件检测") + logger.info("=" * 70) + + if progress_logger: + progress_logger.start("快速硬件检测") + print("正在执行快速硬件检测...") print("-" * 60) @@ -239,37 +290,54 @@ def run_quick_check() -> Dict[str, Any]: } modules_to_run = ['system', 'cpu', 'memory', 'storage', 'sensors', 'gpu', 'logs'] + total_modules = len(modules_to_run) - for module_name in modules_to_run: + for idx, module_name in enumerate(modules_to_run, 1): + logger.info(f"[PROGRESS] 模块 {idx}/{total_modules}: {module_name}") print(f"正在检测: {module_name}...", end=' ', flush=True) + try: - result = run_module(module_name, stress_test=False) + result = run_module(module_name, stress_test=False, progress_logger=progress_logger) results["modules"][module_name] = result status = result.get("status", "unknown") + if status == "success": print("[完成]") + logger.info(f"[MODULE SUCCESS] {module_name}") elif status == "warning": print("[警告]") + logger.warning(f"[MODULE WARNING] {module_name}") elif status == "error": print("[错误]") + logger.error(f"[MODULE ERROR] {module_name}: {result.get('error', 'Unknown error')}") else: print(f"[{status}]") + logger.info(f"[MODULE {status.upper()}] {module_name}") + except Exception as e: - logger.error(f"模块 {module_name} 执行失败: {e}") + error_msg = f"模块 {module_name} 执行失败: {e}" + logger.exception(error_msg) results["modules"][module_name] = {"status": "error", "error": str(e)} print("[失败]") print("-" * 60) + logger.info("[DIAGNOSTIC END] 快速硬件检测完成") + logger.info("=" * 70) + + if progress_logger: + progress_logger.end(status="success") + return results -def run_full_diagnostic(stress_duration: int, auto_confirm: bool = False) -> Dict[str, Any]: +def run_full_diagnostic(stress_duration: int, auto_confirm: bool = False, progress_logger=None) -> Dict[str, Any]: """ 执行全面诊断(包含压力测试)。 Args: stress_duration: 压力测试持续时间 auto_confirm: 是否自动确认 + progress_logger: 进度日志记录器 Returns: Dict[str, Any]: 检测结果 @@ -281,6 +349,14 @@ def run_full_diagnostic(stress_duration: int, auto_confirm: bool = False) -> Dic print("诊断已取消") sys.exit(0) + # 记录诊断开始 + logger.info("=" * 70) + logger.info(f"[DIAGNOSTIC START] 全面硬件诊断 (stress_duration={stress_duration}s)") + logger.info("=" * 70) + + if progress_logger: + progress_logger.start("全面硬件诊断") + print("\n正在执行全面硬件诊断...") print("=" * 60) @@ -293,38 +369,73 @@ def run_full_diagnostic(stress_duration: int, auto_confirm: bool = False) -> Dic # 先执行快速检测 modules_to_run = ['system', 'cpu', 'memory', 'storage', 'sensors', 'gpu', 'logs'] + total_modules = len(modules_to_run) - for module_name in modules_to_run: + for idx, module_name in enumerate(modules_to_run, 1): + logger.info(f"[PROGRESS] 模块 {idx}/{total_modules}: {module_name}") print(f"\n正在检测: {module_name}...") + try: # CPU 和内存执行压力测试 do_stress = module_name in ['cpu', 'memory'] - result = run_module(module_name, stress_test=do_stress, stress_duration=stress_duration) + + if do_stress: + logger.warning(f"[STRESS TEST] {module_name} 压力测试即将开始 (duration={stress_duration}s)") + print(f" ⚠ 即将执行 {module_name} 压力测试,持续时间 {stress_duration} 秒") + + result = run_module(module_name, stress_test=do_stress, stress_duration=stress_duration, progress_logger=progress_logger) results["modules"][module_name] = result status = result.get("status", "unknown") + print(f" 状态: {status}") + + if status == "success": + logger.info(f"[MODULE SUCCESS] {module_name}") + elif status == "warning": + logger.warning(f"[MODULE WARNING] {module_name}") + elif status == "error": + logger.error(f"[MODULE ERROR] {module_name}: {result.get('error', 'Unknown error')}") + except Exception as e: - logger.error(f"模块 {module_name} 执行失败: {e}") + error_msg = f"模块 {module_name} 执行失败: {e}" + logger.exception(error_msg) results["modules"][module_name] = {"status": "error", "error": str(e)} print(f" 状态: 失败 - {e}") print("\n" + "=" * 60) + logger.info("[DIAGNOSTIC END] 全面硬件诊断完成") + logger.info("=" * 70) + + if progress_logger: + progress_logger.end(status="success") + return results -def run_specific_modules(module_list: str, stress_duration: int) -> Dict[str, Any]: +def run_specific_modules(module_list: str, stress_duration: int, progress_logger=None) -> Dict[str, Any]: """ 运行指定的模块列表。 Args: module_list: 逗号分隔的模块名称 stress_duration: 压力测试持续时间 + progress_logger: 进度日志记录器 Returns: Dict[str, Any]: 检测结果 """ + import logging + logger = logging.getLogger(__name__) + modules = [m.strip() for m in module_list.split(',')] + logger.info("=" * 70) + logger.info(f"[DIAGNOSTIC START] 自定义模块检测: {', '.join(modules)}") + logger.info("=" * 70) + + if progress_logger: + progress_logger.start(f"自定义模块检测: {', '.join(modules)}") + results = { "scan_type": "custom", "timestamp": get_file_timestamp(), @@ -334,18 +445,38 @@ def run_specific_modules(module_list: str, stress_duration: int) -> Dict[str, An print(f"正在执行自定义模块检测: {', '.join(modules)}") print("-" * 60) - for module_name in modules: + total_modules = len(modules) + + for idx, module_name in enumerate(modules, 1): + logger.info(f"[PROGRESS] 模块 {idx}/{total_modules}: {module_name}") print(f"正在检测: {module_name}...", end=' ', flush=True) + try: - result = run_module(module_name, stress_test=False) + result = run_module(module_name, stress_test=False, progress_logger=progress_logger) results["modules"][module_name] = result status = result.get("status", "unknown") print(f"[{status}]") + + if status == "error": + logger.error(f"[MODULE ERROR] {module_name}: {result.get('error', 'Unknown error')}") + elif status == "warning": + logger.warning(f"[MODULE WARNING] {module_name}") + else: + logger.info(f"[MODULE SUCCESS] {module_name}") + except Exception as e: + error_msg = f"模块 {module_name} 执行失败: {e}" + logger.exception(error_msg) results["modules"][module_name] = {"status": "error", "error": str(e)} print(f"[失败: {e}]") print("-" * 60) + logger.info("[DIAGNOSTIC END] 自定义模块检测完成") + logger.info("=" * 70) + + if progress_logger: + progress_logger.end(status="success") + return results @@ -362,6 +493,19 @@ def main(): ) logger = logging.getLogger(__name__) + # 创建进度日志记录器 + from utils import ProgressLogger + progress_logger = ProgressLogger(log_file=args.log) + + # 记录程序启动信息 + logger.info("=" * 70) + logger.info("ServerGuard 启动") + logger.info(f"命令行参数: {' '.join(sys.argv)}") + logger.info(f"工作目录: {os.getcwd()}") + logger.info(f"Python版本: {sys.version}") + logger.info(f"用户ID: {os.getuid()}, 是否为root: {check_root_privileges()}") + logger.info("=" * 70) + # 列出模块 if args.list_modules: list_available_modules() @@ -375,21 +519,31 @@ def main(): # 执行诊断 try: + progress_logger.start("ServerGuard 诊断任务") + if args.quick: - results = run_quick_check() + logger.info("执行模式: 快速检测") + results = run_quick_check(progress_logger=progress_logger) elif args.full: - results = run_full_diagnostic(args.stress_duration, args.yes) + logger.info("执行模式: 全面诊断") + results = run_full_diagnostic(args.stress_duration, args.yes, progress_logger=progress_logger) elif args.module: - results = run_specific_modules(args.module, args.stress_duration) + logger.info(f"执行模式: 自定义模块 - {args.module}") + results = run_specific_modules(args.module, args.stress_duration, progress_logger=progress_logger) else: print("请指定操作模式: --quick, --full, --module 或 --list-modules") sys.exit(1) + # 记录诊断完成 + progress_logger.end(status="success") + logger.info("诊断执行完成,正在生成报告...") + # 生成报告 generator = ReportGenerator() if args.output: generator.save_report(results, args.format, args.output) + logger.info(f"报告已保存至: {args.output}") print(f"\n报告已保存至: {args.output}") else: report = generator.generate_report(results, args.format) @@ -403,14 +557,32 @@ def main(): m.get("status") == "error" for m in results.get("modules", {}).values() ) + + logger.info("=" * 70) + logger.info(f"ServerGuard 正常结束 - 退出码: {1 if has_errors else 0}") + logger.info("=" * 70) + sys.exit(1 if has_errors else 0) except KeyboardInterrupt: + logger.warning("操作已被用户中断 (KeyboardInterrupt)") print("\n\n操作已被用户中断") sys.exit(130) + except Exception as e: - logger.exception("程序执行过程中发生错误") + error_msg = f"程序执行过程中发生严重错误: {e}" + logger.exception(error_msg) + logger.error("=" * 70) + logger.error(f"异常类型: {type(e).__name__}") + logger.error(f"异常信息: {e}") + logger.error("=" * 70) + + # 尝试记录当前进度 + if progress_logger and progress_logger.current_step: + logger.error(f"错误发生时正在执行的步骤: {progress_logger.current_step}") + print(f"\n错误: {e}") + print(f"\n详细错误信息已记录到日志: {args.log}") sys.exit(1) diff --git a/modules/cpu.py b/modules/cpu.py index baf738d..ec62d14 100644 --- a/modules/cpu.py +++ b/modules/cpu.py @@ -402,6 +402,9 @@ def run_cpu_stress_test(duration: int = 300) -> Dict[str, Any]: Returns: Dict[str, Any]: 测试结果 """ + import logging + logger = logging.getLogger(__name__) + result = { "passed": False, "duration_seconds": duration, @@ -417,10 +420,13 @@ def run_cpu_stress_test(duration: int = 300) -> Dict[str, Any]: if check_command_exists('stress-ng'): result["tool_used"] = "stress-ng" try: + logger.info(f"[CPU STRESS TEST] 开始使用 stress-ng 进行压力测试,持续时间: {duration}秒") result["start_time"] = time.strftime('%Y-%m-%d %H:%M:%S') # 获取测试前温度 temp_before = get_cpu_temperature() + temp_before_val = temp_before.get("max_c", "N/A") + logger.info(f"[CPU STRESS TEST] 测试前温度: {temp_before_val}°C") # 运行 stress-ng # --cpu 0 使用所有 CPU 核心 @@ -433,6 +439,8 @@ def run_cpu_stress_test(duration: int = 300) -> Dict[str, Any]: '--metrics-brief' ] + logger.info(f"[CPU STRESS TEST] 执行命令: {' '.join(cmd)}") + _, stdout, stderr = execute_command( cmd, timeout=duration + 30, # 给一些额外时间 @@ -440,9 +448,12 @@ def run_cpu_stress_test(duration: int = 300) -> Dict[str, Any]: ) result["end_time"] = time.strftime('%Y-%m-%d %H:%M:%S') + logger.info("[CPU STRESS TEST] stress-ng 执行完成") # 获取测试后温度 temp_after = get_cpu_temperature() + temp_after_val = temp_after.get("max_c", "N/A") + logger.info(f"[CPU STRESS TEST] 测试后温度: {temp_after_val}°C") # 分析输出 output = stdout + stderr @@ -451,13 +462,16 @@ def run_cpu_stress_test(duration: int = 300) -> Dict[str, Any]: if 'error' in output.lower() or 'fail' in output.lower(): result["passed"] = False result["errors"].append("压力测试过程中发现错误") + logger.error("[CPU STRESS TEST] 压力测试执行过程中发现错误") else: result["passed"] = True + logger.info("[CPU STRESS TEST] 压力测试通过") # 提取性能指标 bogo_ops = re.search(r'stress-ng:\s+cpu:\s+(\d+)\s+bogo ops', output) if bogo_ops: result["bogo_ops"] = safe_int(bogo_ops.group(1)) + logger.info(f"[CPU STRESS TEST] Bogo ops: {result['bogo_ops']}") bogo_ops_per_sec = re.search(r'(\d+\.\d+)\s+bogo ops per second', output) if bogo_ops_per_sec: @@ -475,16 +489,22 @@ def run_cpu_stress_test(duration: int = 300) -> Dict[str, Any]: except Exception as e: result["passed"] = False result["errors"].append(str(e)) + logger.exception(f"[CPU STRESS TEST] stress-ng 执行异常: {e}") # 备选: 使用 stress elif check_command_exists('stress'): result["tool_used"] = "stress" try: + logger.info(f"[CPU STRESS TEST] 开始使用 stress 进行压力测试,持续时间: {duration}秒") result["start_time"] = time.strftime('%Y-%m-%d %H:%M:%S') temp_before = get_cpu_temperature() + temp_before_val = temp_before.get("max_c", "N/A") + logger.info(f"[CPU STRESS TEST] 测试前温度: {temp_before_val}°C") num_cores = os.cpu_count() or 1 + logger.info(f"[CPU STRESS TEST] 使用 {num_cores} 个 CPU 核心") + _, stdout, stderr = execute_command( ['stress', '--cpu', str(num_cores), '--timeout', str(duration)], timeout=duration + 30, @@ -492,7 +512,11 @@ def run_cpu_stress_test(duration: int = 300) -> Dict[str, Any]: ) result["end_time"] = time.strftime('%Y-%m-%d %H:%M:%S') + logger.info("[CPU STRESS TEST] stress 执行完成") + temp_after = get_cpu_temperature() + temp_after_val = temp_after.get("max_c", "N/A") + logger.info(f"[CPU STRESS TEST] 测试后温度: {temp_after_val}°C") result["passed"] = True result["temperature_before"] = temp_before @@ -504,11 +528,13 @@ def run_cpu_stress_test(duration: int = 300) -> Dict[str, Any]: except Exception as e: result["passed"] = False result["errors"].append(str(e)) + logger.exception(f"[CPU STRESS TEST] stress 执行异常: {e}") else: result["passed"] = False result["errors"].append("未找到压力测试工具 (stress-ng 或 stress)") result["note"] = "请安装 stress-ng 或 stress: yum install stress / apt install stress-ng" + logger.error("[CPU STRESS TEST] 未找到压力测试工具 (stress-ng 或 stress)") return result diff --git a/modules/memory.py b/modules/memory.py index 25e0b3b..5696dfc 100644 --- a/modules/memory.py +++ b/modules/memory.py @@ -358,6 +358,9 @@ def run_memtester(duration: int = 300) -> Dict[str, Any]: Returns: Dict[str, Any]: 测试结果 """ + import logging + logger = logging.getLogger(__name__) + result = { "passed": False, "size_mb": 0, @@ -371,9 +374,12 @@ def run_memtester(duration: int = 300) -> Dict[str, Any]: if not check_command_exists('memtester'): result["errors"].append("memtester 未安装") + logger.warning("[MEMORY STRESS TEST] memtester 未安装") return result try: + logger.info("[MEMORY STRESS TEST] 开始使用 memtester 进行内存测试") + # 计算测试内存大小 # 留出一些内存给系统和 stress-ng 使用 with open('/proc/meminfo', 'r') as f: @@ -391,8 +397,11 @@ def run_memtester(duration: int = 300) -> Dict[str, Any]: result["start_time"] = time.strftime('%Y-%m-%d %H:%M:%S') start_ts = time.time() + logger.info(f"[MEMORY STRESS TEST] 测试内存大小: {test_size_mb}MB") + # 运行 memtester cmd = ['memtester', f'{test_size_mb}M', '1'] + logger.info(f"[MEMORY STRESS TEST] 执行命令: {' '.join(cmd)}") _, stdout, stderr = execute_command( cmd, @@ -403,25 +412,32 @@ def run_memtester(duration: int = 300) -> Dict[str, Any]: result["end_time"] = time.strftime('%Y-%m-%d %H:%M:%S') result["duration_seconds"] = round(time.time() - start_ts, 2) + logger.info(f"[MEMORY STRESS TEST] memtester 执行完成,耗时: {result['duration_seconds']}秒") + output = stdout + stderr result["raw_output"] = output[:2000] # 保存部分原始输出 # 分析结果 if 'FAILURE' in output.upper(): result["passed"] = False + logger.error("[MEMORY STRESS TEST] 测试失败: 发现 FAILURE") # 提取错误信息 for line in output.split('\n'): if 'FAILURE' in line.upper() or 'error' in line.lower(): result["errors"].append(line.strip()) + logger.error(f"[MEMORY STRESS TEST] 错误详情: {line.strip()}") elif 'SUCCESS' in output.upper() or 'ok' in output.lower() or 'finished' in output.lower(): result["passed"] = True + logger.info("[MEMORY STRESS TEST] 测试通过") else: # 检查是否完成所有测试 if 'Done' in output or 'finished' in output.lower(): result["passed"] = True + logger.info("[MEMORY STRESS TEST] 测试完成") else: result["passed"] = False result["errors"].append("测试可能未完成") + logger.warning("[MEMORY STRESS TEST] 测试可能未完成") # 提取运行的测试 test_names = [ @@ -436,9 +452,12 @@ def run_memtester(duration: int = 300) -> Dict[str, Any]: if test in output: result["tests_run"].append(test) + logger.info(f"[MEMORY STRESS TEST] 执行的测试项: {', '.join(result['tests_run'])}") + except Exception as e: result["passed"] = False result["errors"].append(str(e)) + logger.exception(f"[MEMORY STRESS TEST] memtester 执行异常: {e}") return result diff --git a/utils.py b/utils.py index 8a835b4..f9e04a4 100644 --- a/utils.py +++ b/utils.py @@ -161,13 +161,98 @@ def setup_logging( if log_file: os.makedirs(os.path.dirname(log_file) or '.', exist_ok=True) - file_handler = logging.FileHandler(log_file) + # 使用 FileHandler 并设置立即刷新 + file_handler = logging.FileHandler(log_file, mode='a') file_handler.setFormatter(formatter) + # 确保每次日志写入后立即刷新到磁盘 + file_handler.flush = lambda: file_handler.stream.flush() logger.addHandler(file_handler) return logger +class ProgressLogger: + """ + 进度日志记录器 - 用于记录测试进度,便于中断后排查问题。 + """ + + def __init__(self, log_file: Optional[str] = None): + self.logger = logging.getLogger(__name__) + self.log_file = log_file + self.steps = [] + self.current_step = None + self.start_time = None + + def start(self, operation: str): + """开始一个操作步骤。""" + from datetime import datetime + self.current_step = { + "operation": operation, + "start_time": datetime.now().isoformat(), + "status": "running" + } + self.start_time = datetime.now() + msg = f"[START] {operation}" + self.logger.info(msg) + self._flush_log() + + def end(self, status: str = "success", message: str = ""): + """结束当前操作步骤。""" + from datetime import datetime + if self.current_step: + end_time = datetime.now() + duration = (end_time - self.start_time).total_seconds() if self.start_time else 0 + + self.current_step["end_time"] = end_time.isoformat() + self.current_step["status"] = status + self.current_step["duration_seconds"] = duration + self.current_step["message"] = message + + self.steps.append(self.current_step) + + msg = f"[END] {self.current_step['operation']} - Status: {status}" + if message: + msg += f" - {message}" + msg += f" (Duration: {duration:.2f}s)" + + if status == "error": + self.logger.error(msg) + elif status == "warning": + self.logger.warning(msg) + else: + self.logger.info(msg) + + self._flush_log() + self.current_step = None + + def log(self, message: str, level: str = "info"): + """记录中间日志。""" + msg = f"[PROGRESS] {self.current_step['operation'] if self.current_step else 'UNKNOWN'} - {message}" + if level == "error": + self.logger.error(msg) + elif level == "warning": + self.logger.warning(msg) + elif level == "debug": + self.logger.debug(msg) + else: + self.logger.info(msg) + self._flush_log() + + def _flush_log(self): + """强制刷新日志到磁盘。""" + for handler in self.logger.handlers: + if hasattr(handler, 'flush'): + handler.flush() + + def get_summary(self) -> Dict[str, Any]: + """获取执行摘要。""" + return { + "total_steps": len(self.steps), + "steps": self.steps, + "current_running": self.current_step + } + + def parse_key_value_output(text: str, delimiter: str = ':') -> Dict[str, str]: """ 解析 key: value 格式的文本输出。