import tkinter as tk from tkinter import ttk, messagebox import json import os import sys import threading import time from functools import partial from datetime import datetime, timedelta import calendar import uuid import logging # 导入 NetworkManager 和 SettingsApp from network_manager import NetworkManager from settings_app import SettingsApp from scheduler_app import SchedulerApp # --- 修正点:从 utils.py 导入 center_toplevel_window --- from utils import center_toplevel_window # --- 新增:定义应用程序名称,用于创建用户配置目录下的子文件夹 --- APP_NAME = "DeviceControlApp" class MainControlApp(tk.Tk): def __init__(self): super().__init__() self.title("设备控制台") self.geometry("800x600") # config_file 和 log_dir 将在 _setup_user_dirs 中设置 self.config_file = None self.log_dir = None self.devices = [] self.scheduled_tasks = [] self.device_ui_elements = {} self.scheduler_window = None # 用于跟踪 SchedulerApp 实例 # --- 新增:在设置日志系统之前,先确定用户配置目录 --- self._setup_user_dirs() self._setup_logging() # 设置日志系统,现在会使用 self.log_dir self.network_manager = NetworkManager( log_callback=self.log_message, connection_status_callback=self.update_main_connection_status, ip_status_callback=self._update_ip_status_dot, tcp_status_callback=self._update_tcp_status_dot, app_logger=self.app_logger # <--- 修改点1:将 app_logger 传递给 NetworkManager ) self.create_widgets() self.load_config() self.refresh_device_panels() self.connection_check_interval = 5000 # 5秒 self._start_periodic_connection_check() self._scheduler_check_id = None self._start_scheduler_check() # 首次启动调度器检查 self.protocol("WM_DELETE_WINDOW", self.on_closing) def resource_path(self, relative_path): """获取资源文件的绝对路径,兼容 PyInstaller 打包""" try: base_path = sys._MEIPASS except Exception: base_path = os.path.abspath(".") return os.path.join(base_path, relative_path) # --- 新增:设置用户特定配置和日志目录的方法 --- def _setup_user_dirs(self): """ 根据操作系统确定用户特定的配置和日志目录。 Windows: %APPDATA%\APP_NAME macOS: ~/Library/Application Support/APP_NAME Linux: ~/.config/APP_NAME (遵循 XDG Base Directory Specification) """ base_dir = None if sys.platform == "win32": # Windows: %APPDATA% (通常是 C:\Users\\AppData\Roaming) base_dir = os.path.join(os.getenv('APPDATA'), APP_NAME) elif sys.platform == "darwin": # macOS: ~/Library/Application Support/ base_dir = os.path.join(os.path.expanduser('~/Library/Application Support'), APP_NAME) elif sys.platform.startswith("linux"): # Linux: 优先使用 XDG_CONFIG_HOME,否则默认 ~/.config xdg_config_home = os.getenv('XDG_CONFIG_HOME') if xdg_config_home: base_dir = os.path.join(xdg_config_home, APP_NAME) else: base_dir = os.path.join(os.path.expanduser('~/.config'), APP_NAME) else: # 对于其他或未知操作系统,回退到当前工作目录下的子文件夹 base_dir = os.path.join(os.path.abspath('.'), APP_NAME + "_data") # 确保基目录存在 os.makedirs(base_dir, exist_ok=True) self.config_file = os.path.join(base_dir, "config.json") self.log_dir = os.path.join(base_dir, "logs") # 日志文件放在应用数据目录下的 logs 子文件夹 # 确保日志目录存在 os.makedirs(self.log_dir, exist_ok=True) def _setup_logging(self): """配置日志系统,包括文件日志和控制台日志(如果需要)""" log_file_path = os.path.join(self.log_dir, "app_log.log") self.app_logger = logging.getLogger("DeviceControlApp") self.app_logger.setLevel(logging.DEBUG) if not self.app_logger.handlers: file_handler = logging.FileHandler(log_file_path, encoding='utf-8') file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s') file_handler.setFormatter(file_formatter) self.app_logger.addHandler(file_handler) # 如果需要控制台输出,可以取消注释以下行 # console_handler = logging.StreamHandler(sys.stdout) # console_handler.setFormatter(file_formatter) # self.app_logger.addHandler(console_handler) self.app_logger.info("日志系统已初始化。") self.app_logger.info(f"配置文件路径: {self.config_file}") self.app_logger.info(f"日志文件目录: {self.log_dir}") def create_widgets(self): self.main_frame = ttk.Frame(self, padding="10") self.main_frame.pack(fill=tk.BOTH, expand=True) top_buttons_frame = ttk.Frame(self.main_frame) top_buttons_frame.pack(side=tk.TOP, anchor=tk.NW, fill=tk.X, pady=5) self.settings_btn = ttk.Button(top_buttons_frame, text="设置", command=self.open_settings) self.settings_btn.pack(side=tk.LEFT, padx=5) self.scheduler_btn = ttk.Button(top_buttons_frame, text="定时任务", command=self.open_scheduler) self.scheduler_btn.pack(side=tk.LEFT, padx=5) # 新增:调度器状态显示标签 self.scheduler_status_var = tk.StringVar(value="调度器: 未启动") self.scheduler_status_label = ttk.Label(top_buttons_frame, textvariable=self.scheduler_status_var, font=("Arial", 9, "italic"), foreground="gray") self.scheduler_status_label.pack(side=tk.RIGHT, padx=10) self.device_panels_frame = ttk.Frame(self.main_frame) self.device_panels_frame.pack(fill=tk.BOTH, expand=True, pady=10) # 此帧将扩展以填充主框架的剩余空间 # 创建一个容器帧来管理 canvas 和滚动条的布局 canvas_container = ttk.Frame(self.device_panels_frame) canvas_container.pack(fill=tk.BOTH, expand=True) # 此容器帧将填充 device_panels_frame self.canvas = tk.Canvas(canvas_container) self.v_scrollbar = ttk.Scrollbar(canvas_container, orient="vertical", command=self.canvas.yview) self.h_scrollbar = ttk.Scrollbar(canvas_container, orient="horizontal", command=self.canvas.xview) # 先打包滚动条 self.v_scrollbar.pack(side="right", fill="y") self.h_scrollbar.pack(side="bottom", fill="x") # 再打包 canvas,使其填充剩余空间 self.canvas.pack(side="left", fill="both", expand=True) self.scrollable_frame = ttk.Frame(self.canvas) self.canvas.create_window((0, 0), window=self.scrollable_frame, anchor="nw", tags="scrollable_frame_window") self.canvas.configure(yscrollcommand=self.v_scrollbar.set, xscrollcommand=self.h_scrollbar.set) self.scrollable_frame.bind( "", self._on_scrollable_frame_configure ) # 绑定鼠标滚轮事件到 canvas 和 scrollable_frame self.canvas.bind("", self._on_mouse_wheel) self.canvas.bind("", self._on_mouse_wheel) # For Linux/macOS self.canvas.bind("", self._on_mouse_wheel) # For Linux/macOS self.scrollable_frame.bind("", self._on_mouse_wheel) self.scrollable_frame.bind("", self._on_mouse_wheel) # For Linux/macOS self.scrollable_frame.bind("", self._on_mouse_wheel) # For Linux/macOS self.log_frame = ttk.Frame(self.main_frame, padding="5", relief=tk.GROOVE, borderwidth=2) self.log_frame.pack(fill=tk.X, pady=5) # 移除 expand=True,让其只占据必要高度,并填充横向空间 ttk.Label(self.log_frame, text="日志和响应", font=("Arial", 10, "bold")).pack(anchor=tk.NW) # 增加日志文本框的高度 # 创建一个子框架来包含 Text 控件和 Scrollbar log_text_frame = ttk.Frame(self.log_frame) log_text_frame.pack(fill=tk.BOTH, expand=True, pady=(5,0)) self.log_text = tk.Text(log_text_frame, height=15, state=tk.DISABLED, wrap=tk.WORD) self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True) log_scrollbar = ttk.Scrollbar(log_text_frame, orient=tk.VERTICAL, command=self.log_text.yview) log_scrollbar.pack(side=tk.RIGHT, fill=tk.Y) self.log_text.config(yscrollcommand=log_scrollbar.set) def _on_scrollable_frame_configure(self, event): """当 scrollable_frame 大小改变时,更新 canvas 的 scrollregion 和 create_window 的大小""" # 更新 scrollregion,包括横向和纵向 bbox = self.canvas.bbox("all") if bbox: # 添加一些内边距 self.canvas.configure(scrollregion=(bbox[0]-10, bbox[1]-10, bbox[2]+10, bbox[3]+10)) # 确保滚动区域的宽度与 canvas 相同,避免不必要的横向滚动 canvas_width = self.canvas.winfo_width() if canvas_width > 1: # 确保canvas已经初始化 self.canvas.itemconfig("scrollable_frame_window", width=canvas_width) def load_config(self): """从 JSON 文件加载配置""" if os.path.exists(self.config_file): try: with open(self.config_file, 'r', encoding='utf-8') as f: config_data = json.load(f) self.devices = config_data.get("devices", []) self.scheduled_tasks = config_data.get("scheduled_tasks", []) self.log_message(f"从 {self.config_file} 加载配置成功。") except json.JSONDecodeError: self.log_message(f"错误:{self.config_file} 文件格式不正确。", "red") self.devices = [] self.scheduled_tasks = [] except Exception as e: self.log_message(f"错误:加载配置失败 - {e}", "red") self.devices = [] self.scheduled_tasks = [] else: self.log_message(f"未找到 {self.config_file},将创建新配置。", "orange") self.devices = [] self.scheduled_tasks = [] for device in self.devices: if "id" not in device: device["id"] = str(uuid.uuid4()) # --- 修正点:为可能缺失的设备信息字段添加默认值 --- if "name" not in device: device["name"] = "未知设备" if "type" not in device: device["type"] = "通用" if "protocol" not in device: device["protocol"] = "TCP" # 默认使用TCP # --- 修正点:迁移旧的 'ip' 字段到 'address' --- if "ip" in device and "address" not in device: device["address"] = device["ip"] del device["ip"] # 移除旧的 'ip' 字段 elif "address" not in device: device["address"] = "127.0.0.1" # 默认本地回环地址 # --- 修正点结束 --- if "port" not in device: device["port"] = 0 # 默认端口 # --- 修正点结束 --- if "keep_alive" not in device: device["keep_alive"] = False if "timeout" not in device: device["timeout"] = 5 for command in device.get("commands", []): if "data" not in command: command["data"] = command.get("command_data", "") if "type" not in command: command["type"] = "text" if "command_data" in command: del command["command_data"] new_scheduled_tasks = [] for task in self.scheduled_tasks: if "schedule_type" not in task: self.log_message(f"迁移旧格式定时任务 (ID: {task.get('id', 'N/A')})", "orange") migrated_task = { "id": task.get("id", str(uuid.uuid4())), "name": f"旧任务_{task.get('id', str(uuid.uuid4()))[:4]}", "schedule_type": "daily", "trigger": { "hour": int(task["time"].split(":")[0]), "minute": int(task["time"].split(":")[1]), "second": 0 }, "actions": [], "enabled": task.get("enabled", True), "last_run_time": None, "next_run_time": None } device_obj = None if 0 <= task.get("device_index", -1) < len(self.devices): device_obj = self.devices[task["device_index"]] if device_obj and task.get("command_indices"): for cmd_idx in task["command_indices"]: migrated_task["actions"].append({ "device_id": device_obj["id"], "command_index": cmd_idx, "delay_ms": int(task.get("delay_between_commands", 0) * 1000) }) new_scheduled_tasks.append(migrated_task) else: if task.get("last_run_time") and isinstance(task["last_run_time"], str): try: task["last_run_time"] = datetime.fromisoformat(task["last_run_time"]) except ValueError: task["last_run_time"] = None if task.get("next_run_time") and isinstance(task["next_run_time"], str): try: task["next_run_time"] = datetime.fromisoformat(task["next_run_time"]) except ValueError: task["next_run_time"] = None new_scheduled_tasks.append(task) self.scheduled_tasks = new_scheduled_tasks self.app_logger.debug(f"Loaded {len(self.scheduled_tasks)} scheduled tasks from config.") for task in self.scheduled_tasks: self.app_logger.debug(f" Task ID: {task.get('id')}, Name: {task.get('name')}, Type: {task.get('schedule_type')}, Next Run: {task.get('next_run_time')}") self._recalculate_all_next_run_times() def save_config(self): """将当前配置保存到 JSON 文件""" serializable_scheduled_tasks = [] for task in self.scheduled_tasks: temp_task = task.copy() if isinstance(temp_task.get("last_run_time"), datetime): temp_task["last_run_time"] = temp_task["last_run_time"].isoformat() if isinstance(temp_task.get("next_run_time"), datetime): temp_task["next_run_time"] = temp_task["next_run_time"].isoformat() serializable_scheduled_tasks.append(temp_task) config_data = { "devices": self.devices, "scheduled_tasks": serializable_scheduled_tasks } try: with open(self.config_file, 'w', encoding='utf-8') as f: json.dump(config_data, f, indent=4, ensure_ascii=False) self.log_message(f"配置保存到 {self.config_file} 成功。", "blue") self.app_logger.debug(f"Config saved to {self.config_file}.") except Exception as e: self.log_message(f"错误:保存配置失败 - {e}", "red") self.app_logger.error(f"Failed to save config - {e}") def refresh_device_panels(self): """清除并重新创建所有设备面板""" for widget in self.scrollable_frame.winfo_children(): widget.destroy() self.device_ui_elements.clear() # 确保至少有一个设备面板,即使设备列表为空 if not self.devices: empty_frame = ttk.Frame(self.scrollable_frame, padding="10", relief=tk.RIDGE, borderwidth=2) empty_frame.pack(fill=tk.X, expand=True, pady=5, padx=5) ttk.Label(empty_frame, text="暂无设备配置", font=("Arial", 10)).pack(pady=10) else: for i, device in enumerate(self.devices): self._create_device_panel(i, device) self._set_device_command_buttons_state(i, tk.NORMAL) self.scrollable_frame.update_idletasks() # 更新滚动区域 bbox = self.canvas.bbox("all") if bbox: self.canvas.config(scrollregion=bbox) def _create_device_panel(self, device_index, device_info): """为单个设备创建UI面板""" panel_frame = ttk.Frame(self.scrollable_frame, padding="10", relief=tk.RIDGE, borderwidth=2) panel_frame.pack(fill=tk.X, expand=True, pady=5, padx=5) # 添加一个背景色以便调试 panel_frame.configure(style="DevicePanel.TFrame") style = ttk.Style() style.configure("DevicePanel.TFrame", background="lightgray") # 绑定滚轮事件到 panel_frame 及其所有子组件 panel_frame.bind("", self._on_mouse_wheel) panel_frame.bind("", self._on_mouse_wheel) panel_frame.bind("", self._on_mouse_wheel) # 递归绑定子组件的滚轮事件 def bind_children_mouse_wheel(widget): for child in widget.winfo_children(): child.bind("", self._on_mouse_wheel) child.bind("", self._on_mouse_wheel) child.bind("", self._on_mouse_wheel) bind_children_mouse_wheel(child) bind_children_mouse_wheel(panel_frame) # 创建设备信息行(名称和状态指示灯) info_frame = ttk.Frame(panel_frame) info_frame.pack(fill=tk.X, pady=(0, 10)) device_name_label = ttk.Label(info_frame, text=device_info["name"], font=("Arial", 12, "bold")) device_name_label.pack(side=tk.LEFT, padx=5) themed_bg = ttk.Style().lookup("TFrame", "background") # 添加设备状态指示灯 ip_status_canvas = tk.Canvas(info_frame, width=15, height=15, bg=themed_bg, highlightthickness=0) ip_status_canvas.pack(side=tk.LEFT, padx=5) ip_dot_id = ip_status_canvas.create_oval(2, 2, 13, 13, fill="gray", outline="gray") tcp_status_canvas = tk.Canvas(info_frame, width=15, height=15, bg=themed_bg, highlightthickness=0) tcp_status_canvas.pack(side=tk.LEFT, padx=5) tcp_dot_id = tcp_status_canvas.create_oval(2, 2, 13, 13, fill="gray", outline="gray") self.device_ui_elements[device_index] = { "ip_status_canvas": ip_status_canvas, "ip_dot_id": ip_dot_id, "tcp_status_canvas": tcp_status_canvas, "tcp_dot_id": tcp_dot_id, "command_buttons": [], "is_sending": False } # 创建命令按钮容器,使用网格布局来换行显示按钮 commands_frame = ttk.Frame(panel_frame) commands_frame.pack(fill=tk.X, expand=True) # 添加命令按钮,使用网格布局,每行最多显示5个按钮 commands = device_info.get("commands", []) for cmd_index, command in enumerate(commands): btn = ttk.Button(commands_frame, text=command["name"], command=partial(self._send_command_from_button, device_index, cmd_index)) # 计算网格位置(每行5个按钮) row = cmd_index // 5 col = cmd_index % 5 btn.grid(row=row, column=col, padx=5, pady=2, sticky="w") self.device_ui_elements[device_index]["command_buttons"].append(btn) # 配置网格列权重,使按钮能够正确对齐 for i in range(5): commands_frame.columnconfigure(i, weight=1) # 强制更新布局 panel_frame.update_idletasks() def _send_command_from_button(self, device_index, command_index): """处理指令按钮点击事件,实现设备内互斥""" device = self.devices[device_index] command = device["commands"][command_index] if self.device_ui_elements[device_index]["is_sending"]: self.log_message(f"设备 '{device['name']}' 正在发送指令,请稍候。", "orange") self.app_logger.info(f"设备 '{device['name']}' 正在发送指令,请稍候。") return self.log_message(f"准备发送指令 '{command['name']}' 到设备 '{device['name']}'...", "orange") self.app_logger.info(f"准备发送指令 '{command['name']}' 到设备 '{device['name']}'...") self.device_ui_elements[device_index]["is_sending"] = True self._set_device_command_buttons_state(device_index, tk.DISABLED) def re_enable_device_buttons(): """发送完成后重新启用当前设备的指令按钮""" self.device_ui_elements[device_index]["is_sending"] = False self._set_device_command_buttons_state(device_index, tk.NORMAL) self.network_manager.send_command_async( device_index, device, command, lambda: self.after(0, re_enable_device_buttons) ) def _set_device_command_buttons_state(self, device_index, state): """设置指定设备的所有指令按钮的状态""" if device_index in self.device_ui_elements: for btn in self.device_ui_elements[device_index]["command_buttons"]: btn.config(state=state) def _on_mouse_wheel(self, event): """处理鼠标滚轮事件,实现滚动""" if event.delta: # Windows self.canvas.yview_scroll(int(-1*(event.delta/120)), "units") elif event.num == 4: # Linux/macOS scroll up self.canvas.yview_scroll(-1, "units") elif event.num == 5: # Linux/macOS scroll down self.canvas.yview_scroll(1, "units") def _draw_dot(self, canvas, dot_id, color): """在画布上绘制指定颜色的圆点""" canvas.itemconfig(dot_id, fill=color, outline=color) def _update_ip_status_dot(self, device_index, is_connected): """更新 IP 状态点颜色""" if device_index in self.device_ui_elements: canvas = self.device_ui_elements[device_index]["ip_status_canvas"] dot_id = self.device_ui_elements[device_index]["ip_dot_id"] color = "green" if is_connected else "red" self.after(0, lambda: self._draw_dot(canvas, dot_id, color)) def _update_tcp_status_dot(self, device_index, is_connected): """更新 TCP 连接状态点颜色""" if device_index in self.device_ui_elements: canvas = self.device_ui_elements[device_index]["tcp_status_canvas"] dot_id = self.device_ui_elements[device_index]["tcp_dot_id"] color = "green" if is_connected else "red" self.after(0, lambda: self._draw_dot(canvas, dot_id, color)) def _start_periodic_connection_check(self): """启动周期性连接检查""" if not hasattr(self, '_periodic_check_id'): self._periodic_check_id = None if self._periodic_check_id: self.after_cancel(self._periodic_check_id) self._check_all_connections_periodic() self._periodic_check_id = self.after(self.connection_check_interval, self._start_periodic_connection_check) def _check_all_connections_periodic(self): """ 检查所有设备的 IP 可达性和 TCP 连接状态。 优化:对于 keep_alive 的 TCP 设备,只进行一次 TCP 连接状态检查。 """ for i, device in enumerate(self.devices): if device["protocol"] == "TCP": if device.get("keep_alive", False): # 对于保持连接的 TCP 设备,通过检查 TCP 连接来判断可达性 self.network_manager.check_tcp_connection_async(i, device) # Note: check_tcp_connection_async 内部会同时更新 IP 和 TCP 状态点 else: # 对于非保持连接的 TCP 设备,进行 IP 可达性检查 self.network_manager.check_ip_reachability_async(i, device) # TCP 状态点应始终显示为未连接 self._update_tcp_status_dot(i, False) elif device["protocol"] == "UDP": # 对于 UDP 设备,使用 UDP 方式检查可达性 self.network_manager.check_udp_reachability_async(i, device) # TCP 状态点应始终显示为未连接 self._update_tcp_status_dot(i, False) def open_settings(self): """打开设置窗口""" settings_window = SettingsApp(self, on_save_callback=self.on_settings_saved, devices_data=self.devices) # --- 移除此行:center_toplevel_window(settings_window, self) --- # SettingsApp 内部会自行居中 def on_settings_saved(self, updated_devices): """当设置窗口保存配置后,由回调函数调用""" self.log_message("设置已保存,正在刷新主界面...", "blue") self.app_logger.info("设置已保存,正在刷新主界面...") self.devices = updated_devices self.save_config() self.refresh_device_panels() self._start_periodic_connection_check() self._recalculate_all_next_run_times() self._start_scheduler_check() # 确保调度器检查也重新启动以反映新配置 def open_scheduler(self): """打开定时任务管理窗口,确保只有一个实例""" if self.scheduler_window is None or not self.scheduler_window.winfo_exists(): self.scheduler_window = SchedulerApp(self, self.devices, self.scheduled_tasks, self.save_config, self.log_message) # --- 移除此行:center_toplevel_window(self.scheduler_window, self) --- # SchedulerApp 内部会自行居中 self.scheduler_window.protocol("WM_DELETE_WINDOW", self._on_scheduler_closing) else: self.scheduler_window.lift() # 将现有窗口带到前面 def _on_scheduler_closing(self): """处理 SchedulerApp 关闭事件""" if self.scheduler_window: self.scheduler_window.destroy() self.scheduler_window = None self.log_message("定时任务管理窗口已关闭。", "blue") self.app_logger.info("定时任务管理窗口已关闭。") def _start_scheduler_check(self): """启动周期性检查定时任务""" if self._scheduler_check_id: self.after_cancel(self._scheduler_check_id) self._check_schedules() # 立即执行一次检查 self._scheduler_check_id = self.after(1000, self._start_scheduler_check) def refresh_scheduler_display(self): """ 供 SchedulerApp 调用,强制刷新主界面的调度器状态显示。 """ self.app_logger.debug(f"refresh_scheduler_display called at {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')}") self._check_schedules() # 立即触发一次调度器检查以更新UI def _recalculate_all_next_run_times(self): """在配置加载或设备更新后,重新计算所有任务的下次运行时间""" now = datetime.now() self.app_logger.debug(f"Recalculating all next run times at {now.strftime('%Y-%m-%d %H:%M:%S.%f')}") for task in self.scheduled_tasks: task_name = task.get('name', task['id'][:8]) old_next_run = task.get("next_run_time") task["next_run_time"] = self._calculate_next_run_time(task, now) self.app_logger.debug(f" Task '{task_name}': Old next run: {old_next_run}, New next run: {task['next_run_time']}") self.save_config() def _check_schedules(self): """检查是否有定时任务到期""" now = datetime.now() self.app_logger.debug(f"\n--- 调度器检查 @ {now.strftime('%Y-%m-%d %H:%M:%S.%f')} ---") next_task_summary = "无" min_next_run_time = datetime.max tasks_to_trigger = [] # Collect tasks to trigger to avoid modifying list while iterating for task in self.scheduled_tasks: task_name = task.get('name', task['id'][:8]) self.app_logger.debug(f" 检查任务 '{task_name}' (启用: {task.get('enabled', True)})") if not task.get("enabled", True): self.app_logger.debug(f" 任务 '{task_name}' 已禁用,跳过。") continue current_next_run_time = task.get("next_run_time") if current_next_run_time: self.app_logger.debug(f" 任务 '{task_name}' 当前下次运行时间: {current_next_run_time.strftime('%Y-%m-%d %H:%M:%S.%f')}") # Check if it's time to trigger this task # Allow a small window for execution, e.g., within the last second if current_next_run_time <= now < (current_next_run_time + timedelta(seconds=1)): self.app_logger.info(f" !!! 任务 '{task_name}' 触发执行 !!!") tasks_to_trigger.append((task, now)) # Update next_task_summary based on the *current* next_run_time if current_next_run_time < min_next_run_time: min_next_run_time = current_next_run_time next_task_summary = f"'{task_name}' @ {min_next_run_time.strftime('%H:%M:%S')}" else: # If next_run_time is None, try to calculate it for the first time or if it was cleared self.app_logger.debug(f" 任务 '{task_name}' next_run_time 为 None,尝试计算。") new_next_run = self._calculate_next_run_time(task, now) if new_next_run: task["next_run_time"] = new_next_run self.app_logger.debug(f" 任务 '{task_name}' 首次计算下次运行时间: {new_next_run.strftime('%Y-%m-%d %H:%M:%S.%f')}") # Also check if this newly calculated time is due right now if new_next_run <= now < (new_next_run + timedelta(seconds=1)): self.app_logger.info(f" !!! 任务 '{task_name}' (首次计算后) 触发执行 !!!") tasks_to_trigger.append((task, now)) if new_next_run < min_next_run_time: min_next_run_time = new_next_run next_task_summary = f"'{task_name}' @ {min_next_run_time.strftime('%H:%M:%S')}" else: self.app_logger.debug(f" 任务 '{task_name}' 无法计算下次运行时间。") # Now trigger the collected tasks for task, trigger_time in tasks_to_trigger: self.log_message(f"调度器: 任务 '{task.get('name', task['id'][:8])}' 到期,准备执行。", "blue") self.app_logger.info(f"调度器: 任务 '{task.get('name', task['id'][:8])}' 到期,准备执行。") threading.Thread(target=self._execute_scheduled_commands, args=(task, trigger_time), name=f"SchedulerTask-{task.get('name', task['id'][:8])}").start() # Immediately update the task's last_run_time and recalculate next_run_time # This is crucial for 'once' tasks to prevent re-triggering self.after(0, lambda t=task, c_now=trigger_time: self._update_scheduled_task_after_execution(t, c_now)) # Update scheduler status label self.scheduler_status_var.set( f"调度器: 上次检查 {now.strftime('%H:%M:%S')} | 下个任务: {next_task_summary}" ) def _update_scheduled_task_after_execution(self, task, execution_time): """ 在主线程中更新定时任务的 last_run_time 和 next_run_time。 """ task_name = task.get("name", task["id"][:8]) self.app_logger.debug(f"_update_scheduled_task_after_execution for task '{task_name}' called at {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')}.") task["last_run_time"] = execution_time task["next_run_time"] = self._calculate_next_run_time(task, execution_time + timedelta(seconds=1)) self.save_config() # 如果定时任务管理窗口是打开的,刷新其显示 if self.scheduler_window and self.scheduler_window.winfo_exists(): self.scheduler_window._update_schedule_display() def _calculate_next_run_time(self, task, reference_time): """ 根据任务的调度类型和触发器计算下次运行时间。 reference_time: datetime对象,作为计算下次运行时间的基准。 """ schedule_type = task["schedule_type"] trigger = task["trigger"] next_run = None if schedule_type == "once": try: target_time = datetime( trigger["year"], trigger["month"], trigger["day"], trigger["hour"], trigger["minute"], trigger["second"] ) self.app_logger.debug(f" _calculate_next_run_time for 'once' task '{task.get('name', task['id'][:8])}': target_time={target_time}, reference_time={reference_time}, last_run_time={task.get('last_run_time')}") if task.get("last_run_time") is None and target_time >= reference_time: next_run = target_time except ValueError: self.app_logger.warning(f"任务 '{task.get('name', task['id'][:8])}' 的一次性触发时间无效。") self.log_message(f"任务 '{task.get('name', task['id'][:8])}' 的一次性触发时间无效。", "red") elif schedule_type == "daily": target_hour = trigger["hour"] target_minute = trigger["minute"] target_second = trigger["second"] # Start with today's target time potential_run_time = reference_time.replace( hour=target_hour, minute=target_minute, second=target_second, microsecond=0 ) # If target time is in the past, set it for tomorrow if potential_run_time <= reference_time: potential_run_time += timedelta(days=1) next_run = potential_run_time elif schedule_type == "weekly": target_hour = trigger["hour"] target_minute = trigger["minute"] target_second = trigger["second"] days_of_week = sorted(trigger["days_of_week"]) # 0=Mon, 6=Sun min_next_run = None for day_of_week_target in days_of_week: # Calculate days ahead to reach the target day of week days_ahead = (day_of_week_target - reference_time.weekday() + 7) % 7 potential_run_time = (reference_time + timedelta(days=days_ahead)).replace( hour=target_hour, minute=target_minute, second=target_second, microsecond=0 ) # If it's the same day of week, but the time has passed, move to next week if days_ahead == 0 and potential_run_time <= reference_time: potential_run_time += timedelta(weeks=1) if min_next_run is None or potential_run_time < min_next_run: min_next_run = potential_run_time next_run = min_next_run elif schedule_type == "monthly": target_hour = trigger["hour"] target_minute = trigger["minute"] target_second = trigger["second"] current_year = reference_time.year current_month = reference_time.month potential_runs = [] # Check current month and next month for month_offset in range(2): check_year = current_year check_month = current_month + month_offset if check_month > 12: check_month -= 12 check_year += 1 if "month_day" in trigger: target_day = trigger["month_day"] try: # Ensure target_day is valid for the month if target_day > calendar.monthrange(check_year, check_month)[1]: continue # Skip if day is invalid for this month (e.g., Feb 30) potential_date = datetime(check_year, check_month, target_day, target_hour, target_minute, target_second, microsecond=0) if potential_date > reference_time: potential_runs.append(potential_date) except ValueError: pass # Date might be invalid (e.g., Feb 30) elif "week_of_month" in trigger: target_week_of_month = trigger["week_of_month"] target_day_of_week = trigger["day_of_week"] count = 0 for day in range(1, calendar.monthrange(check_year, check_month)[1] + 1): d = datetime(check_year, check_month, day) if d.weekday() == target_day_of_week: count += 1 if count == target_week_of_month: potential_date = d.replace( hour=target_hour, minute=target_minute, second=target_second, microsecond=0 ) if potential_date > reference_time: potential_runs.append(potential_date) break if potential_runs: next_run = min(potential_runs) else: next_run = None self.app_logger.debug(f" _calculate_next_run_time for '{task.get('name', task['id'][:8])}' returns: {next_run}") return next_run def _execute_scheduled_commands(self, task, trigger_time): """ 执行定时任务中的指令序列。 此方法在后台线程中运行。 """ task_name = task.get("name", task["id"][:8]) self.app_logger.debug(f"[SchedulerThread-{threading.current_thread().name}] _execute_scheduled_commands for task '{task_name}' STARTED @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')}") time.sleep(0.05) # Give UI a moment to update log message self.log_message(f"调度器: 任务 '{task_name}':开始执行指令序列...", "blue") self.app_logger.info(f"调度器: 任务 '{task_name}':开始执行指令序列...") try: for i, action in enumerate(task["actions"]): device_id = action["device_id"] command_index = action["command_index"] # 获取当前指令的延迟时间 delay_ms = action.get("delay_ms", 0) device = next((d for d in self.devices if d["id"] == device_id), None) if not device: self.log_message(f"定时任务 '{task_name}' 失败: 设备ID '{device_id[:8]}...' 无效或不存在。", "red") self.app_logger.error(f"定时任务 '{task_name}' 失败: 设备ID '{device_id[:8]}...' 无效或不存在。") continue if not (0 <= command_index < len(device["commands"])): self.log_message(f"定时任务 '{task_name}' 失败: 设备 '{device['name']}' 的指令索引 {command_index} 无效。", "red") self.app_logger.error(f"定时任务 '{task_name}' 失败: 设备 '{device['name']}' 的指令索引 {command_index} 无效。") continue command = device["commands"][command_index] # 如果当前指令有延迟,则先等待 if delay_ms > 0: self.log_message(f"定时任务 '{task_name}':指令 '{command['name']}' 将延迟 {delay_ms} 毫秒后执行。", "gray") self.app_logger.info(f"定时任务 '{task_name}':指令 '{command['name']}' 将延迟 {delay_ms} 毫秒后执行。") time.sleep(delay_ms / 1000.0) self.log_message(f"定时任务 '{task_name}':执行指令 '{command['name']}' 到设备 '{device['name']}'...", "blue") self.app_logger.info(f"定时任务 '{task_name}':执行指令 '{command['name']}' 到设备 '{device['name']}'...") try: response = self.network_manager.send_command_sync_for_scheduler(device, command, is_scheduled_task=True) self.log_message(f"定时任务 '{task_name}':指令 '{command['name']}' 成功,响应: {response}", "green") self.app_logger.info(f"定时任务 '{task_name}':指令 '{command['name']}' 成功,响应: {response}") except Exception as e: self.log_message(f"定时任务 '{task_name}':指令 '{command['name']}' 执行失败: {e}", "red") self.app_logger.error(f"定时任务 '{task_name}':指令 '{command['name']}' 执行失败: {e}") self.log_message(f"定时任务 '{task_name}':指令序列执行完毕。", "blue") self.app_logger.info(f"定时任务 '{task_name}':指令序列执行完毕。") except Exception as e: self.log_message(f"定时任务 '{task_name}' 执行过程中发生未预期错误: {e}", "red") self.app_logger.critical(f"定时任务 '{task_name}' 执行过程中发生未预期错误: {e}") finally: self.app_logger.debug(f"[SchedulerThread-{threading.current_thread().name}] _execute_scheduled_commands for task '{task_name}' FINISHED @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')}") def update_main_connection_status(self, status, color="black"): """MainControlApp 的连接状态更新回调,这里仅用于日志""" self.log_message(f"全局连接状态更新: {status}", color) self.app_logger.info(f"全局连接状态更新: {status}") def log_message(self, message, color="black"): """向日志文本框添加消息 (在 UI 线程中执行) 并写入文件""" timestamp_str = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]") ui_message = f"{timestamp_str} {message}" # 这里的日志级别判断保持不变,因为它控制的是 UI 显示的颜色和 app_logger 的级别 if color == "red": self.app_logger.error(message) elif color == "orange": self.app_logger.warning(message) elif color == "blue": self.app_logger.info(message) elif color == "green": self.app_logger.info(message) elif color == "gray": # UI日志中,灰色通常是调试信息 self.app_logger.debug(message) else: self.app_logger.info(message) self.after(0, lambda: self._log_message_safe(ui_message, color)) def _log_message_safe(self, message, color): """安全的 UI 更新函数,确保在主线程中被调用""" self.log_text.config(state=tk.NORMAL) self.log_text.insert(tk.END, message + "\n", color) self.log_text.tag_config("red", foreground="red") self.log_text.tag_config("green", foreground="green") self.log_text.tag_config("blue", foreground="blue") self.log_text.tag_config("orange", foreground="orange") self.log_text.tag_config("gray", foreground="gray") self.log_text.see(tk.END) self.log_text.config(state=tk.DISABLED) def on_closing(self): """窗口关闭时保存配置并关闭所有活动套接字""" self.network_manager.close_all_sockets() if hasattr(self, '_periodic_check_id') and self._periodic_check_id: self.after_cancel(self._periodic_check_id) if hasattr(self, '_scheduler_check_id') and self._scheduler_check_id: self.after_cancel(self._scheduler_check_id) self.destroy() if __name__ == "__main__": app = MainControlApp() app.mainloop()