first commit

This commit is contained in:
zj
2026-02-04 21:16:33 +08:00
commit 1104e7dc13
36 changed files with 18789 additions and 0 deletions

911
main_control_app.py Normal file
View File

@@ -0,0 +1,911 @@
import tkinter as tk
from tkinter import ttk, messagebox
import json
import os
import sys
import threading
import time
from functools import partial
from datetime import datetime, timedelta
import calendar
import uuid
import logging
# 导入 NetworkManager 和 SettingsApp
from network_manager import NetworkManager
from settings_app import SettingsApp
from scheduler_app import SchedulerApp
# --- 修正点:从 utils.py 导入 center_toplevel_window ---
from utils import center_toplevel_window
# --- 新增:定义应用程序名称,用于创建用户配置目录下的子文件夹 ---
APP_NAME = "DeviceControlApp"
class MainControlApp(tk.Tk):
def __init__(self):
super().__init__()
self.title("设备控制台")
self.geometry("800x600")
# config_file 和 log_dir 将在 _setup_user_dirs 中设置
self.config_file = None
self.log_dir = None
self.devices = []
self.scheduled_tasks = []
self.device_ui_elements = {}
self.scheduler_window = None # 用于跟踪 SchedulerApp 实例
# --- 新增:在设置日志系统之前,先确定用户配置目录 ---
self._setup_user_dirs()
self._setup_logging() # 设置日志系统,现在会使用 self.log_dir
self.network_manager = NetworkManager(
log_callback=self.log_message,
connection_status_callback=self.update_main_connection_status,
ip_status_callback=self._update_ip_status_dot,
tcp_status_callback=self._update_tcp_status_dot,
app_logger=self.app_logger # <--- 修改点1将 app_logger 传递给 NetworkManager
)
self.create_widgets()
self.load_config()
self.refresh_device_panels()
self.connection_check_interval = 5000 # 5秒
self._start_periodic_connection_check()
self._scheduler_check_id = None
self._start_scheduler_check() # 首次启动调度器检查
self.protocol("WM_DELETE_WINDOW", self.on_closing)
def resource_path(self, relative_path):
"""获取资源文件的绝对路径,兼容 PyInstaller 打包"""
try:
base_path = sys._MEIPASS
except Exception:
base_path = os.path.abspath(".")
return os.path.join(base_path, relative_path)
# --- 新增:设置用户特定配置和日志目录的方法 ---
def _setup_user_dirs(self):
"""
根据操作系统确定用户特定的配置和日志目录。
Windows: %APPDATA%\APP_NAME
macOS: ~/Library/Application Support/APP_NAME
Linux: ~/.config/APP_NAME (遵循 XDG Base Directory Specification)
"""
base_dir = None
if sys.platform == "win32":
# Windows: %APPDATA% (通常是 C:\Users\<User>\AppData\Roaming)
base_dir = os.path.join(os.getenv('APPDATA'), APP_NAME)
elif sys.platform == "darwin":
# macOS: ~/Library/Application Support/
base_dir = os.path.join(os.path.expanduser('~/Library/Application Support'), APP_NAME)
elif sys.platform.startswith("linux"):
# Linux: 优先使用 XDG_CONFIG_HOME否则默认 ~/.config
xdg_config_home = os.getenv('XDG_CONFIG_HOME')
if xdg_config_home:
base_dir = os.path.join(xdg_config_home, APP_NAME)
else:
base_dir = os.path.join(os.path.expanduser('~/.config'), APP_NAME)
else:
# 对于其他或未知操作系统,回退到当前工作目录下的子文件夹
base_dir = os.path.join(os.path.abspath('.'), APP_NAME + "_data")
# 确保基目录存在
os.makedirs(base_dir, exist_ok=True)
self.config_file = os.path.join(base_dir, "config.json")
self.log_dir = os.path.join(base_dir, "logs") # 日志文件放在应用数据目录下的 logs 子文件夹
# 确保日志目录存在
os.makedirs(self.log_dir, exist_ok=True)
def _setup_logging(self):
"""配置日志系统,包括文件日志和控制台日志(如果需要)"""
log_file_path = os.path.join(self.log_dir, "app_log.log")
self.app_logger = logging.getLogger("DeviceControlApp")
self.app_logger.setLevel(logging.DEBUG)
if not self.app_logger.handlers:
file_handler = logging.FileHandler(log_file_path, encoding='utf-8')
file_formatter = logging.Formatter('%(asctime)s - %(levelname)s - %(message)s')
file_handler.setFormatter(file_formatter)
self.app_logger.addHandler(file_handler)
# 如果需要控制台输出,可以取消注释以下行
# console_handler = logging.StreamHandler(sys.stdout)
# console_handler.setFormatter(file_formatter)
# self.app_logger.addHandler(console_handler)
self.app_logger.info("日志系统已初始化。")
self.app_logger.info(f"配置文件路径: {self.config_file}")
self.app_logger.info(f"日志文件目录: {self.log_dir}")
def create_widgets(self):
self.main_frame = ttk.Frame(self, padding="10")
self.main_frame.pack(fill=tk.BOTH, expand=True)
top_buttons_frame = ttk.Frame(self.main_frame)
top_buttons_frame.pack(side=tk.TOP, anchor=tk.NW, fill=tk.X, pady=5)
self.settings_btn = ttk.Button(top_buttons_frame, text="设置", command=self.open_settings)
self.settings_btn.pack(side=tk.LEFT, padx=5)
self.scheduler_btn = ttk.Button(top_buttons_frame, text="定时任务", command=self.open_scheduler)
self.scheduler_btn.pack(side=tk.LEFT, padx=5)
# 新增:调度器状态显示标签
self.scheduler_status_var = tk.StringVar(value="调度器: 未启动")
self.scheduler_status_label = ttk.Label(top_buttons_frame, textvariable=self.scheduler_status_var,
font=("Arial", 9, "italic"), foreground="gray")
self.scheduler_status_label.pack(side=tk.RIGHT, padx=10)
self.device_panels_frame = ttk.Frame(self.main_frame)
self.device_panels_frame.pack(fill=tk.BOTH, expand=True, pady=10) # 此帧将扩展以填充主框架的剩余空间
# 创建一个容器帧来管理 canvas 和滚动条的布局
canvas_container = ttk.Frame(self.device_panels_frame)
canvas_container.pack(fill=tk.BOTH, expand=True) # 此容器帧将填充 device_panels_frame
self.canvas = tk.Canvas(canvas_container)
self.v_scrollbar = ttk.Scrollbar(canvas_container, orient="vertical", command=self.canvas.yview)
self.h_scrollbar = ttk.Scrollbar(canvas_container, orient="horizontal", command=self.canvas.xview)
# 先打包滚动条
self.v_scrollbar.pack(side="right", fill="y")
self.h_scrollbar.pack(side="bottom", fill="x")
# 再打包 canvas使其填充剩余空间
self.canvas.pack(side="left", fill="both", expand=True)
self.scrollable_frame = ttk.Frame(self.canvas)
self.canvas.create_window((0, 0), window=self.scrollable_frame, anchor="nw", tags="scrollable_frame_window")
self.canvas.configure(yscrollcommand=self.v_scrollbar.set, xscrollcommand=self.h_scrollbar.set)
self.scrollable_frame.bind(
"<Configure>",
self._on_scrollable_frame_configure
)
# 绑定鼠标滚轮事件到 canvas 和 scrollable_frame
self.canvas.bind("<MouseWheel>", self._on_mouse_wheel)
self.canvas.bind("<Button-4>", self._on_mouse_wheel) # For Linux/macOS
self.canvas.bind("<Button-5>", self._on_mouse_wheel) # For Linux/macOS
self.scrollable_frame.bind("<MouseWheel>", self._on_mouse_wheel)
self.scrollable_frame.bind("<Button-4>", self._on_mouse_wheel) # For Linux/macOS
self.scrollable_frame.bind("<Button-5>", self._on_mouse_wheel) # For Linux/macOS
self.log_frame = ttk.Frame(self.main_frame, padding="5", relief=tk.GROOVE, borderwidth=2)
self.log_frame.pack(fill=tk.X, pady=5) # 移除 expand=True让其只占据必要高度并填充横向空间
ttk.Label(self.log_frame, text="日志和响应", font=("Arial", 10, "bold")).pack(anchor=tk.NW)
# 增加日志文本框的高度
# 创建一个子框架来包含 Text 控件和 Scrollbar
log_text_frame = ttk.Frame(self.log_frame)
log_text_frame.pack(fill=tk.BOTH, expand=True, pady=(5,0))
self.log_text = tk.Text(log_text_frame, height=15, state=tk.DISABLED, wrap=tk.WORD)
self.log_text.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
log_scrollbar = ttk.Scrollbar(log_text_frame, orient=tk.VERTICAL, command=self.log_text.yview)
log_scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.log_text.config(yscrollcommand=log_scrollbar.set)
def _on_scrollable_frame_configure(self, event):
"""当 scrollable_frame 大小改变时,更新 canvas 的 scrollregion 和 create_window 的大小"""
# 更新 scrollregion包括横向和纵向
bbox = self.canvas.bbox("all")
if bbox:
# 添加一些内边距
self.canvas.configure(scrollregion=(bbox[0]-10, bbox[1]-10, bbox[2]+10, bbox[3]+10))
# 确保滚动区域的宽度与 canvas 相同,避免不必要的横向滚动
canvas_width = self.canvas.winfo_width()
if canvas_width > 1: # 确保canvas已经初始化
self.canvas.itemconfig("scrollable_frame_window", width=canvas_width)
def load_config(self):
"""从 JSON 文件加载配置"""
if os.path.exists(self.config_file):
try:
with open(self.config_file, 'r', encoding='utf-8') as f:
config_data = json.load(f)
self.devices = config_data.get("devices", [])
self.scheduled_tasks = config_data.get("scheduled_tasks", [])
self.log_message(f"{self.config_file} 加载配置成功。")
except json.JSONDecodeError:
self.log_message(f"错误:{self.config_file} 文件格式不正确。", "red")
self.devices = []
self.scheduled_tasks = []
except Exception as e:
self.log_message(f"错误:加载配置失败 - {e}", "red")
self.devices = []
self.scheduled_tasks = []
else:
self.log_message(f"未找到 {self.config_file},将创建新配置。", "orange")
self.devices = []
self.scheduled_tasks = []
for device in self.devices:
if "id" not in device:
device["id"] = str(uuid.uuid4())
# --- 修正点:为可能缺失的设备信息字段添加默认值 ---
if "name" not in device:
device["name"] = "未知设备"
if "type" not in device:
device["type"] = "通用"
if "protocol" not in device:
device["protocol"] = "TCP" # 默认使用TCP
# --- 修正点:迁移旧的 'ip' 字段到 'address' ---
if "ip" in device and "address" not in device:
device["address"] = device["ip"]
del device["ip"] # 移除旧的 'ip' 字段
elif "address" not in device:
device["address"] = "127.0.0.1" # 默认本地回环地址
# --- 修正点结束 ---
if "port" not in device:
device["port"] = 0 # 默认端口
# --- 修正点结束 ---
if "keep_alive" not in device:
device["keep_alive"] = False
if "timeout" not in device:
device["timeout"] = 5
for command in device.get("commands", []):
if "data" not in command:
command["data"] = command.get("command_data", "")
if "type" not in command:
command["type"] = "text"
if "command_data" in command:
del command["command_data"]
new_scheduled_tasks = []
for task in self.scheduled_tasks:
if "schedule_type" not in task:
self.log_message(f"迁移旧格式定时任务 (ID: {task.get('id', 'N/A')})", "orange")
migrated_task = {
"id": task.get("id", str(uuid.uuid4())),
"name": f"旧任务_{task.get('id', str(uuid.uuid4()))[:4]}",
"schedule_type": "daily",
"trigger": {
"hour": int(task["time"].split(":")[0]),
"minute": int(task["time"].split(":")[1]),
"second": 0
},
"actions": [],
"enabled": task.get("enabled", True),
"last_run_time": None,
"next_run_time": None
}
device_obj = None
if 0 <= task.get("device_index", -1) < len(self.devices):
device_obj = self.devices[task["device_index"]]
if device_obj and task.get("command_indices"):
for cmd_idx in task["command_indices"]:
migrated_task["actions"].append({
"device_id": device_obj["id"],
"command_index": cmd_idx,
"delay_ms": int(task.get("delay_between_commands", 0) * 1000)
})
new_scheduled_tasks.append(migrated_task)
else:
if task.get("last_run_time") and isinstance(task["last_run_time"], str):
try:
task["last_run_time"] = datetime.fromisoformat(task["last_run_time"])
except ValueError:
task["last_run_time"] = None
if task.get("next_run_time") and isinstance(task["next_run_time"], str):
try:
task["next_run_time"] = datetime.fromisoformat(task["next_run_time"])
except ValueError:
task["next_run_time"] = None
new_scheduled_tasks.append(task)
self.scheduled_tasks = new_scheduled_tasks
self.app_logger.debug(f"Loaded {len(self.scheduled_tasks)} scheduled tasks from config.")
for task in self.scheduled_tasks:
self.app_logger.debug(f" Task ID: {task.get('id')}, Name: {task.get('name')}, Type: {task.get('schedule_type')}, Next Run: {task.get('next_run_time')}")
self._recalculate_all_next_run_times()
def save_config(self):
"""将当前配置保存到 JSON 文件"""
serializable_scheduled_tasks = []
for task in self.scheduled_tasks:
temp_task = task.copy()
if isinstance(temp_task.get("last_run_time"), datetime):
temp_task["last_run_time"] = temp_task["last_run_time"].isoformat()
if isinstance(temp_task.get("next_run_time"), datetime):
temp_task["next_run_time"] = temp_task["next_run_time"].isoformat()
serializable_scheduled_tasks.append(temp_task)
config_data = {
"devices": self.devices,
"scheduled_tasks": serializable_scheduled_tasks
}
try:
with open(self.config_file, 'w', encoding='utf-8') as f:
json.dump(config_data, f, indent=4, ensure_ascii=False)
self.log_message(f"配置保存到 {self.config_file} 成功。", "blue")
self.app_logger.debug(f"Config saved to {self.config_file}.")
except Exception as e:
self.log_message(f"错误:保存配置失败 - {e}", "red")
self.app_logger.error(f"Failed to save config - {e}")
def refresh_device_panels(self):
"""清除并重新创建所有设备面板"""
for widget in self.scrollable_frame.winfo_children():
widget.destroy()
self.device_ui_elements.clear()
# 确保至少有一个设备面板,即使设备列表为空
if not self.devices:
empty_frame = ttk.Frame(self.scrollable_frame, padding="10", relief=tk.RIDGE, borderwidth=2)
empty_frame.pack(fill=tk.X, expand=True, pady=5, padx=5)
ttk.Label(empty_frame, text="暂无设备配置", font=("Arial", 10)).pack(pady=10)
else:
for i, device in enumerate(self.devices):
self._create_device_panel(i, device)
self._set_device_command_buttons_state(i, tk.NORMAL)
self.scrollable_frame.update_idletasks()
# 更新滚动区域
bbox = self.canvas.bbox("all")
if bbox:
self.canvas.config(scrollregion=bbox)
def _create_device_panel(self, device_index, device_info):
"""为单个设备创建UI面板"""
panel_frame = ttk.Frame(self.scrollable_frame, padding="10", relief=tk.RIDGE, borderwidth=2)
panel_frame.pack(fill=tk.X, expand=True, pady=5, padx=5)
# 添加一个背景色以便调试
panel_frame.configure(style="DevicePanel.TFrame")
style = ttk.Style()
style.configure("DevicePanel.TFrame", background="lightgray")
# 绑定滚轮事件到 panel_frame 及其所有子组件
panel_frame.bind("<MouseWheel>", self._on_mouse_wheel)
panel_frame.bind("<Button-4>", self._on_mouse_wheel)
panel_frame.bind("<Button-5>", self._on_mouse_wheel)
# 递归绑定子组件的滚轮事件
def bind_children_mouse_wheel(widget):
for child in widget.winfo_children():
child.bind("<MouseWheel>", self._on_mouse_wheel)
child.bind("<Button-4>", self._on_mouse_wheel)
child.bind("<Button-5>", self._on_mouse_wheel)
bind_children_mouse_wheel(child)
bind_children_mouse_wheel(panel_frame)
# 创建设备信息行(名称和状态指示灯)
info_frame = ttk.Frame(panel_frame)
info_frame.pack(fill=tk.X, pady=(0, 10))
device_name_label = ttk.Label(info_frame, text=device_info["name"], font=("Arial", 12, "bold"))
device_name_label.pack(side=tk.LEFT, padx=5)
themed_bg = ttk.Style().lookup("TFrame", "background")
# 添加设备状态指示灯
ip_status_canvas = tk.Canvas(info_frame, width=15, height=15, bg=themed_bg, highlightthickness=0)
ip_status_canvas.pack(side=tk.LEFT, padx=5)
ip_dot_id = ip_status_canvas.create_oval(2, 2, 13, 13, fill="gray", outline="gray")
tcp_status_canvas = tk.Canvas(info_frame, width=15, height=15, bg=themed_bg, highlightthickness=0)
tcp_status_canvas.pack(side=tk.LEFT, padx=5)
tcp_dot_id = tcp_status_canvas.create_oval(2, 2, 13, 13, fill="gray", outline="gray")
self.device_ui_elements[device_index] = {
"ip_status_canvas": ip_status_canvas,
"ip_dot_id": ip_dot_id,
"tcp_status_canvas": tcp_status_canvas,
"tcp_dot_id": tcp_dot_id,
"command_buttons": [],
"is_sending": False
}
# 创建命令按钮容器,使用网格布局来换行显示按钮
commands_frame = ttk.Frame(panel_frame)
commands_frame.pack(fill=tk.X, expand=True)
# 添加命令按钮使用网格布局每行最多显示5个按钮
commands = device_info.get("commands", [])
for cmd_index, command in enumerate(commands):
btn = ttk.Button(commands_frame, text=command["name"],
command=partial(self._send_command_from_button, device_index, cmd_index))
# 计算网格位置每行5个按钮
row = cmd_index // 5
col = cmd_index % 5
btn.grid(row=row, column=col, padx=5, pady=2, sticky="w")
self.device_ui_elements[device_index]["command_buttons"].append(btn)
# 配置网格列权重,使按钮能够正确对齐
for i in range(5):
commands_frame.columnconfigure(i, weight=1)
# 强制更新布局
panel_frame.update_idletasks()
def _send_command_from_button(self, device_index, command_index):
"""处理指令按钮点击事件,实现设备内互斥"""
device = self.devices[device_index]
command = device["commands"][command_index]
if self.device_ui_elements[device_index]["is_sending"]:
self.log_message(f"设备 '{device['name']}' 正在发送指令,请稍候。", "orange")
self.app_logger.info(f"设备 '{device['name']}' 正在发送指令,请稍候。")
return
self.log_message(f"准备发送指令 '{command['name']}' 到设备 '{device['name']}'...", "orange")
self.app_logger.info(f"准备发送指令 '{command['name']}' 到设备 '{device['name']}'...")
self.device_ui_elements[device_index]["is_sending"] = True
self._set_device_command_buttons_state(device_index, tk.DISABLED)
def re_enable_device_buttons():
"""发送完成后重新启用当前设备的指令按钮"""
self.device_ui_elements[device_index]["is_sending"] = False
self._set_device_command_buttons_state(device_index, tk.NORMAL)
self.network_manager.send_command_async(
device_index,
device,
command,
lambda: self.after(0, re_enable_device_buttons)
)
def _set_device_command_buttons_state(self, device_index, state):
"""设置指定设备的所有指令按钮的状态"""
if device_index in self.device_ui_elements:
for btn in self.device_ui_elements[device_index]["command_buttons"]:
btn.config(state=state)
def _on_mouse_wheel(self, event):
"""处理鼠标滚轮事件,实现滚动"""
if event.delta: # Windows
self.canvas.yview_scroll(int(-1*(event.delta/120)), "units")
elif event.num == 4: # Linux/macOS scroll up
self.canvas.yview_scroll(-1, "units")
elif event.num == 5: # Linux/macOS scroll down
self.canvas.yview_scroll(1, "units")
def _draw_dot(self, canvas, dot_id, color):
"""在画布上绘制指定颜色的圆点"""
canvas.itemconfig(dot_id, fill=color, outline=color)
def _update_ip_status_dot(self, device_index, is_connected):
"""更新 IP 状态点颜色"""
if device_index in self.device_ui_elements:
canvas = self.device_ui_elements[device_index]["ip_status_canvas"]
dot_id = self.device_ui_elements[device_index]["ip_dot_id"]
color = "green" if is_connected else "red"
self.after(0, lambda: self._draw_dot(canvas, dot_id, color))
def _update_tcp_status_dot(self, device_index, is_connected):
"""更新 TCP 连接状态点颜色"""
if device_index in self.device_ui_elements:
canvas = self.device_ui_elements[device_index]["tcp_status_canvas"]
dot_id = self.device_ui_elements[device_index]["tcp_dot_id"]
color = "green" if is_connected else "red"
self.after(0, lambda: self._draw_dot(canvas, dot_id, color))
def _start_periodic_connection_check(self):
"""启动周期性连接检查"""
if not hasattr(self, '_periodic_check_id'):
self._periodic_check_id = None
if self._periodic_check_id:
self.after_cancel(self._periodic_check_id)
self._check_all_connections_periodic()
self._periodic_check_id = self.after(self.connection_check_interval, self._start_periodic_connection_check)
def _check_all_connections_periodic(self):
"""
检查所有设备的 IP 可达性和 TCP 连接状态。
优化:对于 keep_alive 的 TCP 设备,只进行一次 TCP 连接状态检查。
"""
for i, device in enumerate(self.devices):
if device["protocol"] == "TCP":
if device.get("keep_alive", False):
# 对于保持连接的 TCP 设备,通过检查 TCP 连接来判断可达性
self.network_manager.check_tcp_connection_async(i, device)
# Note: check_tcp_connection_async 内部会同时更新 IP 和 TCP 状态点
else:
# 对于非保持连接的 TCP 设备,进行 IP 可达性检查
self.network_manager.check_ip_reachability_async(i, device)
# TCP 状态点应始终显示为未连接
self._update_tcp_status_dot(i, False)
elif device["protocol"] == "UDP":
# 对于 UDP 设备,使用 UDP 方式检查可达性
self.network_manager.check_udp_reachability_async(i, device)
# TCP 状态点应始终显示为未连接
self._update_tcp_status_dot(i, False)
def open_settings(self):
"""打开设置窗口"""
settings_window = SettingsApp(self, on_save_callback=self.on_settings_saved, devices_data=self.devices)
# --- 移除此行center_toplevel_window(settings_window, self) ---
# SettingsApp 内部会自行居中
def on_settings_saved(self, updated_devices):
"""当设置窗口保存配置后,由回调函数调用"""
self.log_message("设置已保存,正在刷新主界面...", "blue")
self.app_logger.info("设置已保存,正在刷新主界面...")
self.devices = updated_devices
self.save_config()
self.refresh_device_panels()
self._start_periodic_connection_check()
self._recalculate_all_next_run_times()
self._start_scheduler_check() # 确保调度器检查也重新启动以反映新配置
def open_scheduler(self):
"""打开定时任务管理窗口,确保只有一个实例"""
if self.scheduler_window is None or not self.scheduler_window.winfo_exists():
self.scheduler_window = SchedulerApp(self, self.devices, self.scheduled_tasks, self.save_config, self.log_message)
# --- 移除此行center_toplevel_window(self.scheduler_window, self) ---
# SchedulerApp 内部会自行居中
self.scheduler_window.protocol("WM_DELETE_WINDOW", self._on_scheduler_closing)
else:
self.scheduler_window.lift() # 将现有窗口带到前面
def _on_scheduler_closing(self):
"""处理 SchedulerApp 关闭事件"""
if self.scheduler_window:
self.scheduler_window.destroy()
self.scheduler_window = None
self.log_message("定时任务管理窗口已关闭。", "blue")
self.app_logger.info("定时任务管理窗口已关闭。")
def _start_scheduler_check(self):
"""启动周期性检查定时任务"""
if self._scheduler_check_id:
self.after_cancel(self._scheduler_check_id)
self._check_schedules() # 立即执行一次检查
self._scheduler_check_id = self.after(1000, self._start_scheduler_check)
def refresh_scheduler_display(self):
"""
供 SchedulerApp 调用,强制刷新主界面的调度器状态显示。
"""
self.app_logger.debug(f"refresh_scheduler_display called at {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')}")
self._check_schedules() # 立即触发一次调度器检查以更新UI
def _recalculate_all_next_run_times(self):
"""在配置加载或设备更新后,重新计算所有任务的下次运行时间"""
now = datetime.now()
self.app_logger.debug(f"Recalculating all next run times at {now.strftime('%Y-%m-%d %H:%M:%S.%f')}")
for task in self.scheduled_tasks:
task_name = task.get('name', task['id'][:8])
old_next_run = task.get("next_run_time")
task["next_run_time"] = self._calculate_next_run_time(task, now)
self.app_logger.debug(f" Task '{task_name}': Old next run: {old_next_run}, New next run: {task['next_run_time']}")
self.save_config()
def _check_schedules(self):
"""检查是否有定时任务到期"""
now = datetime.now()
self.app_logger.debug(f"\n--- 调度器检查 @ {now.strftime('%Y-%m-%d %H:%M:%S.%f')} ---")
next_task_summary = ""
min_next_run_time = datetime.max
tasks_to_trigger = [] # Collect tasks to trigger to avoid modifying list while iterating
for task in self.scheduled_tasks:
task_name = task.get('name', task['id'][:8])
self.app_logger.debug(f" 检查任务 '{task_name}' (启用: {task.get('enabled', True)})")
if not task.get("enabled", True):
self.app_logger.debug(f" 任务 '{task_name}' 已禁用,跳过。")
continue
current_next_run_time = task.get("next_run_time")
if current_next_run_time:
self.app_logger.debug(f" 任务 '{task_name}' 当前下次运行时间: {current_next_run_time.strftime('%Y-%m-%d %H:%M:%S.%f')}")
# Check if it's time to trigger this task
# Allow a small window for execution, e.g., within the last second
if current_next_run_time <= now < (current_next_run_time + timedelta(seconds=1)):
self.app_logger.info(f" !!! 任务 '{task_name}' 触发执行 !!!")
tasks_to_trigger.append((task, now))
# Update next_task_summary based on the *current* next_run_time
if current_next_run_time < min_next_run_time:
min_next_run_time = current_next_run_time
next_task_summary = f"'{task_name}' @ {min_next_run_time.strftime('%H:%M:%S')}"
else:
# If next_run_time is None, try to calculate it for the first time or if it was cleared
self.app_logger.debug(f" 任务 '{task_name}' next_run_time 为 None尝试计算。")
new_next_run = self._calculate_next_run_time(task, now)
if new_next_run:
task["next_run_time"] = new_next_run
self.app_logger.debug(f" 任务 '{task_name}' 首次计算下次运行时间: {new_next_run.strftime('%Y-%m-%d %H:%M:%S.%f')}")
# Also check if this newly calculated time is due right now
if new_next_run <= now < (new_next_run + timedelta(seconds=1)):
self.app_logger.info(f" !!! 任务 '{task_name}' (首次计算后) 触发执行 !!!")
tasks_to_trigger.append((task, now))
if new_next_run < min_next_run_time:
min_next_run_time = new_next_run
next_task_summary = f"'{task_name}' @ {min_next_run_time.strftime('%H:%M:%S')}"
else:
self.app_logger.debug(f" 任务 '{task_name}' 无法计算下次运行时间。")
# Now trigger the collected tasks
for task, trigger_time in tasks_to_trigger:
self.log_message(f"调度器: 任务 '{task.get('name', task['id'][:8])}' 到期,准备执行。", "blue")
self.app_logger.info(f"调度器: 任务 '{task.get('name', task['id'][:8])}' 到期,准备执行。")
threading.Thread(target=self._execute_scheduled_commands, args=(task, trigger_time), name=f"SchedulerTask-{task.get('name', task['id'][:8])}").start()
# Immediately update the task's last_run_time and recalculate next_run_time
# This is crucial for 'once' tasks to prevent re-triggering
self.after(0, lambda t=task, c_now=trigger_time: self._update_scheduled_task_after_execution(t, c_now))
# Update scheduler status label
self.scheduler_status_var.set(
f"调度器: 上次检查 {now.strftime('%H:%M:%S')} | 下个任务: {next_task_summary}"
)
def _update_scheduled_task_after_execution(self, task, execution_time):
"""
在主线程中更新定时任务的 last_run_time 和 next_run_time。
"""
task_name = task.get("name", task["id"][:8])
self.app_logger.debug(f"_update_scheduled_task_after_execution for task '{task_name}' called at {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')}.")
task["last_run_time"] = execution_time
task["next_run_time"] = self._calculate_next_run_time(task, execution_time + timedelta(seconds=1))
self.save_config()
# 如果定时任务管理窗口是打开的,刷新其显示
if self.scheduler_window and self.scheduler_window.winfo_exists():
self.scheduler_window._update_schedule_display()
def _calculate_next_run_time(self, task, reference_time):
"""
根据任务的调度类型和触发器计算下次运行时间。
reference_time: datetime对象作为计算下次运行时间的基准。
"""
schedule_type = task["schedule_type"]
trigger = task["trigger"]
next_run = None
if schedule_type == "once":
try:
target_time = datetime(
trigger["year"], trigger["month"], trigger["day"],
trigger["hour"], trigger["minute"], trigger["second"]
)
self.app_logger.debug(f" _calculate_next_run_time for 'once' task '{task.get('name', task['id'][:8])}': target_time={target_time}, reference_time={reference_time}, last_run_time={task.get('last_run_time')}")
if task.get("last_run_time") is None and target_time >= reference_time:
next_run = target_time
except ValueError:
self.app_logger.warning(f"任务 '{task.get('name', task['id'][:8])}' 的一次性触发时间无效。")
self.log_message(f"任务 '{task.get('name', task['id'][:8])}' 的一次性触发时间无效。", "red")
elif schedule_type == "daily":
target_hour = trigger["hour"]
target_minute = trigger["minute"]
target_second = trigger["second"]
# Start with today's target time
potential_run_time = reference_time.replace(
hour=target_hour,
minute=target_minute,
second=target_second,
microsecond=0
)
# If target time is in the past, set it for tomorrow
if potential_run_time <= reference_time:
potential_run_time += timedelta(days=1)
next_run = potential_run_time
elif schedule_type == "weekly":
target_hour = trigger["hour"]
target_minute = trigger["minute"]
target_second = trigger["second"]
days_of_week = sorted(trigger["days_of_week"]) # 0=Mon, 6=Sun
min_next_run = None
for day_of_week_target in days_of_week:
# Calculate days ahead to reach the target day of week
days_ahead = (day_of_week_target - reference_time.weekday() + 7) % 7
potential_run_time = (reference_time + timedelta(days=days_ahead)).replace(
hour=target_hour, minute=target_minute, second=target_second, microsecond=0
)
# If it's the same day of week, but the time has passed, move to next week
if days_ahead == 0 and potential_run_time <= reference_time:
potential_run_time += timedelta(weeks=1)
if min_next_run is None or potential_run_time < min_next_run:
min_next_run = potential_run_time
next_run = min_next_run
elif schedule_type == "monthly":
target_hour = trigger["hour"]
target_minute = trigger["minute"]
target_second = trigger["second"]
current_year = reference_time.year
current_month = reference_time.month
potential_runs = []
# Check current month and next month
for month_offset in range(2):
check_year = current_year
check_month = current_month + month_offset
if check_month > 12:
check_month -= 12
check_year += 1
if "month_day" in trigger:
target_day = trigger["month_day"]
try:
# Ensure target_day is valid for the month
if target_day > calendar.monthrange(check_year, check_month)[1]:
continue # Skip if day is invalid for this month (e.g., Feb 30)
potential_date = datetime(check_year, check_month, target_day,
target_hour, target_minute, target_second, microsecond=0)
if potential_date > reference_time:
potential_runs.append(potential_date)
except ValueError:
pass # Date might be invalid (e.g., Feb 30)
elif "week_of_month" in trigger:
target_week_of_month = trigger["week_of_month"]
target_day_of_week = trigger["day_of_week"]
count = 0
for day in range(1, calendar.monthrange(check_year, check_month)[1] + 1):
d = datetime(check_year, check_month, day)
if d.weekday() == target_day_of_week:
count += 1
if count == target_week_of_month:
potential_date = d.replace(
hour=target_hour, minute=target_minute, second=target_second, microsecond=0
)
if potential_date > reference_time:
potential_runs.append(potential_date)
break
if potential_runs:
next_run = min(potential_runs)
else:
next_run = None
self.app_logger.debug(f" _calculate_next_run_time for '{task.get('name', task['id'][:8])}' returns: {next_run}")
return next_run
def _execute_scheduled_commands(self, task, trigger_time):
"""
执行定时任务中的指令序列。
此方法在后台线程中运行。
"""
task_name = task.get("name", task["id"][:8])
self.app_logger.debug(f"[SchedulerThread-{threading.current_thread().name}] _execute_scheduled_commands for task '{task_name}' STARTED @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')}")
time.sleep(0.05) # Give UI a moment to update log message
self.log_message(f"调度器: 任务 '{task_name}':开始执行指令序列...", "blue")
self.app_logger.info(f"调度器: 任务 '{task_name}':开始执行指令序列...")
try:
for i, action in enumerate(task["actions"]):
device_id = action["device_id"]
command_index = action["command_index"]
# 获取当前指令的延迟时间
delay_ms = action.get("delay_ms", 0)
device = next((d for d in self.devices if d["id"] == device_id), None)
if not device:
self.log_message(f"定时任务 '{task_name}' 失败: 设备ID '{device_id[:8]}...' 无效或不存在。", "red")
self.app_logger.error(f"定时任务 '{task_name}' 失败: 设备ID '{device_id[:8]}...' 无效或不存在。")
continue
if not (0 <= command_index < len(device["commands"])):
self.log_message(f"定时任务 '{task_name}' 失败: 设备 '{device['name']}' 的指令索引 {command_index} 无效。", "red")
self.app_logger.error(f"定时任务 '{task_name}' 失败: 设备 '{device['name']}' 的指令索引 {command_index} 无效。")
continue
command = device["commands"][command_index]
# 如果当前指令有延迟,则先等待
if delay_ms > 0:
self.log_message(f"定时任务 '{task_name}':指令 '{command['name']}' 将延迟 {delay_ms} 毫秒后执行。", "gray")
self.app_logger.info(f"定时任务 '{task_name}':指令 '{command['name']}' 将延迟 {delay_ms} 毫秒后执行。")
time.sleep(delay_ms / 1000.0)
self.log_message(f"定时任务 '{task_name}':执行指令 '{command['name']}' 到设备 '{device['name']}'...", "blue")
self.app_logger.info(f"定时任务 '{task_name}':执行指令 '{command['name']}' 到设备 '{device['name']}'...")
try:
response = self.network_manager.send_command_sync_for_scheduler(device, command, is_scheduled_task=True)
self.log_message(f"定时任务 '{task_name}':指令 '{command['name']}' 成功,响应: {response}", "green")
self.app_logger.info(f"定时任务 '{task_name}':指令 '{command['name']}' 成功,响应: {response}")
except Exception as e:
self.log_message(f"定时任务 '{task_name}':指令 '{command['name']}' 执行失败: {e}", "red")
self.app_logger.error(f"定时任务 '{task_name}':指令 '{command['name']}' 执行失败: {e}")
self.log_message(f"定时任务 '{task_name}':指令序列执行完毕。", "blue")
self.app_logger.info(f"定时任务 '{task_name}':指令序列执行完毕。")
except Exception as e:
self.log_message(f"定时任务 '{task_name}' 执行过程中发生未预期错误: {e}", "red")
self.app_logger.critical(f"定时任务 '{task_name}' 执行过程中发生未预期错误: {e}")
finally:
self.app_logger.debug(f"[SchedulerThread-{threading.current_thread().name}] _execute_scheduled_commands for task '{task_name}' FINISHED @ {datetime.now().strftime('%Y-%m-%d %H:%M:%S.%f')}")
def update_main_connection_status(self, status, color="black"):
"""MainControlApp 的连接状态更新回调,这里仅用于日志"""
self.log_message(f"全局连接状态更新: {status}", color)
self.app_logger.info(f"全局连接状态更新: {status}")
def log_message(self, message, color="black"):
"""向日志文本框添加消息 (在 UI 线程中执行) 并写入文件"""
timestamp_str = datetime.now().strftime("[%Y-%m-%d %H:%M:%S]")
ui_message = f"{timestamp_str} {message}"
# 这里的日志级别判断保持不变,因为它控制的是 UI 显示的颜色和 app_logger 的级别
if color == "red":
self.app_logger.error(message)
elif color == "orange":
self.app_logger.warning(message)
elif color == "blue":
self.app_logger.info(message)
elif color == "green":
self.app_logger.info(message)
elif color == "gray": # UI日志中灰色通常是调试信息
self.app_logger.debug(message)
else:
self.app_logger.info(message)
self.after(0, lambda: self._log_message_safe(ui_message, color))
def _log_message_safe(self, message, color):
"""安全的 UI 更新函数,确保在主线程中被调用"""
self.log_text.config(state=tk.NORMAL)
self.log_text.insert(tk.END, message + "\n", color)
self.log_text.tag_config("red", foreground="red")
self.log_text.tag_config("green", foreground="green")
self.log_text.tag_config("blue", foreground="blue")
self.log_text.tag_config("orange", foreground="orange")
self.log_text.tag_config("gray", foreground="gray")
self.log_text.see(tk.END)
self.log_text.config(state=tk.DISABLED)
def on_closing(self):
"""窗口关闭时保存配置并关闭所有活动套接字"""
self.network_manager.close_all_sockets()
if hasattr(self, '_periodic_check_id') and self._periodic_check_id:
self.after_cancel(self._periodic_check_id)
if hasattr(self, '_scheduler_check_id') and self._scheduler_check_id:
self.after_cancel(self._scheduler_check_id)
self.destroy()
if __name__ == "__main__":
app = MainControlApp()
app.mainloop()