Files
TCPUDP/scheduler_app.py
2026-02-04 21:16:33 +08:00

677 lines
36 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import tkinter as tk
from tkinter import ttk, messagebox
import json
from datetime import datetime, timedelta
import calendar
import uuid
from utils import center_toplevel_window
class SchedulerApp(tk.Toplevel):
def __init__(self, master, devices_data, scheduled_tasks, save_config_callback, log_message_callback):
super().__init__(master)
self.master = master # master在这里是MainControlApp的实例
self.title("定时任务管理")
self.geometry("800x600")
self.devices = devices_data
self.scheduled_tasks = scheduled_tasks
self.save_config_callback = save_config_callback
self.log_message_callback = log_message_callback
self.create_widgets()
self._update_schedule_display() # Initial display of tasks
# --- 修正点:在所有部件创建并加载数据后,再居中窗口 ---
center_toplevel_window(self, master)
# --- 修正点结束 ---
self.protocol("WM_DELETE_WINDOW", self.on_closing)
def create_widgets(self):
# Frame for buttons
button_frame = ttk.Frame(self)
button_frame.pack(side=tk.TOP, fill=tk.X, padx=10, pady=5)
ttk.Button(button_frame, text="添加任务", command=self.add_task).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="保存并关闭", command=self.on_closing).pack(side=tk.RIGHT, padx=5)
# Frame for task list
task_list_frame = ttk.Frame(self)
task_list_frame.pack(fill=tk.BOTH, expand=True, padx=10, pady=5)
self.task_tree = ttk.Treeview(task_list_frame, columns=("Name", "Type", "Trigger", "Actions", "Enabled", "Last Run", "Next Run"), show="headings")
self.task_tree.heading("Name", text="任务名称")
self.task_tree.heading("Type", text="类型")
self.task_tree.heading("Trigger", text="触发条件")
self.task_tree.heading("Actions", text="执行指令")
self.task_tree.heading("Enabled", text="启用")
self.task_tree.heading("Last Run", text="上次运行")
self.task_tree.heading("Next Run", text="下次运行")
self.task_tree.column("Name", width=120, anchor=tk.W)
self.task_tree.column("Type", width=80, anchor=tk.CENTER)
self.task_tree.column("Trigger", width=150, anchor=tk.W)
self.task_tree.column("Actions", width=150, anchor=tk.W)
self.task_tree.column("Enabled", width=50, anchor=tk.CENTER)
self.task_tree.column("Last Run", width=120, anchor=tk.CENTER)
self.task_tree.column("Next Run", width=120, anchor=tk.CENTER)
self.task_tree.pack(side=tk.LEFT, fill=tk.BOTH, expand=True)
scrollbar = ttk.Scrollbar(task_list_frame, orient="vertical", command=self.task_tree.yview)
scrollbar.pack(side=tk.RIGHT, fill=tk.Y)
self.task_tree.config(yscrollcommand=scrollbar.set)
self.task_tree.bind("<Double-1>", self.edit_selected_task)
self.task_tree.bind("<<TreeviewSelect>>", self.on_task_select)
# Context menu for tasks
self.context_menu = tk.Menu(self, tearoff=0)
self.context_menu.add_command(label="编辑任务", command=self.edit_selected_task)
self.context_menu.add_command(label="删除任务", command=self.delete_selected_task)
self.context_menu.add_command(label="启用/禁用任务", command=self.toggle_task_enabled)
self.task_tree.bind("<Button-3>", self.show_context_menu)
def on_task_select(self, event):
# This function is called when a task is selected, useful if you want to
# enable/disable buttons based on selection.
pass
def show_context_menu(self, event):
try:
self.task_tree.selection_set(self.task_tree.identify_row(event.y))
self.context_menu.tk_popup(event.x_root, event.y_root)
finally:
self.context_menu.grab_release()
def _update_schedule_display(self):
"""刷新任务列表显示"""
for i in self.task_tree.get_children():
self.task_tree.delete(i)
for task in self.scheduled_tasks:
task_name = task.get("name", "无名称")
# 调度类型显示为中文
schedule_type_display = {
"once": "一次性",
"daily": "每天",
"weekly": "每周",
"monthly": "每月"
}.get(task["schedule_type"], "未知")
trigger_str = self._get_trigger_string(task)
actions_str = self._get_actions_string(task)
enabled_str = "" if task["enabled"] else ""
last_run_str = task["last_run_time"].strftime("%Y-%m-%d %H:%M:%S") if task["last_run_time"] else "N/A"
next_run_str = task["next_run_time"].strftime("%Y-%m-%d %H:%M:%S") if task["next_run_time"] else "N/A"
self.task_tree.insert("", tk.END, iid=task["id"],
values=(task_name, schedule_type_display, trigger_str, actions_str, enabled_str, last_run_str, next_run_str))
def _get_trigger_string(self, task):
trigger = task["trigger"]
if task["schedule_type"] == "once":
return f"{trigger['year']}-{trigger['month']:02d}-{trigger['day']:02d} {trigger['hour']:02d}:{trigger['minute']:02d}:{trigger['second']:02d}"
elif task["schedule_type"] == "daily":
return f"每天 {trigger['hour']:02d}:{trigger['minute']:02d}:{trigger['second']:02d}"
elif task["schedule_type"] == "weekly":
day_names = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
days = ", ".join([day_names[d] for d in sorted(trigger["days_of_week"])])
return f"{days} {trigger['hour']:02d}:{trigger['minute']:02d}:{trigger['second']:02d}"
elif task["schedule_type"] == "monthly":
if "month_day" in trigger:
return f"每月 {trigger['month_day']}{trigger['hour']:02d}:{trigger['minute']:02d}:{trigger['second']:02d}"
elif "week_of_month" in trigger:
week_of_month_map = {1: "第一个", 2: "第二个", 3: "第三个", 4: "第四个", -1: "最后一个"}
day_names = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
return f"每月 {week_of_month_map.get(trigger['week_of_month'], '')}{day_names[trigger['day_of_week']]} {trigger['hour']:02d}:{trigger['minute']:02d}:{trigger['second']:02d}"
return "N/A"
def _get_actions_string(self, task):
if not task["actions"]:
return "无指令"
action_names = []
for action in task["actions"]:
device = next((d for d in self.devices if d["id"] == action["device_id"]), None)
if device and 0 <= action["command_index"] < len(device["commands"]):
cmd_name = device["commands"][action["command_index"]]["name"]
action_names.append(f"{device['name']}:{cmd_name}")
else:
action_names.append("未知设备/指令")
return ", ".join(action_names)
def add_task(self):
"""打开一个新窗口以添加或编辑任务"""
self.TaskEditor(self, None) # self是SchedulerApp实例作为TaskEditor的master_app
def edit_selected_task(self, event=None):
"""编辑选中的任务"""
selected_item = self.task_tree.focus()
if selected_item:
task_id = selected_item
task = next((t for t in self.scheduled_tasks if t["id"] == task_id), None)
if task:
self.TaskEditor(self, task)
else:
messagebox.showerror("错误", "未找到选中的任务。")
else:
messagebox.showwarning("提示", "请选择一个任务进行编辑。")
def delete_selected_task(self):
"""删除选中的任务"""
selected_item = self.task_tree.focus()
if selected_item:
if messagebox.askyesno("确认删除", "确定要删除选中的任务吗?"):
task_id = selected_item
self.scheduled_tasks[:] = [t for t in self.scheduled_tasks if t["id"] != task_id]
self.save_config_callback()
self.master._recalculate_all_next_run_times() # 重新计算所有任务的下次运行时间
self.master.refresh_scheduler_display() # 立即刷新主界面的调度器状态
self._update_schedule_display()
self.log_message_callback(f"定时任务 (ID: {task_id[:8]}...) 已删除。", "blue")
else:
messagebox.showwarning("提示", "请选择一个任务进行删除。")
def toggle_task_enabled(self):
"""切换选中任务的启用/禁用状态"""
selected_item = self.task_tree.focus()
if selected_item:
task_id = selected_item
task = next((t for t in self.scheduled_tasks if t["id"] == task_id), None)
if task:
task["enabled"] = not task["enabled"]
self.save_config_callback()
self.master._recalculate_all_next_run_times() # 重新计算所有任务的下次运行时间
self.master.refresh_scheduler_display() # 立即刷新主界面的调度器状态
self._update_schedule_display()
self.log_message_callback(f"定时任务 '{task.get('name', task_id[:8])}' (ID: {task_id[:8]}...) 已更新。", "blue")
else:
messagebox.showwarning("提示", "请选择一个任务进行操作。")
def on_closing(self):
"""关闭窗口时保存配置"""
self.save_config_callback()
self.master._recalculate_all_next_run_times() # 重新计算所有任务的下次运行时间
self.master.refresh_scheduler_display() # 立即刷新主界面的调度器状态
self.destroy()
class TaskEditor(tk.Toplevel):
def __init__(self, master_app, task_data=None):
super().__init__(master_app)
self.master_app = master_app # master_app在这里是SchedulerApp的实例
self.title("编辑定时任务" if task_data else "添加定时任务")
self.geometry("500x700")
self.task_data = task_data if task_data else self._create_new_task_template()
self.original_task_id = self.task_data["id"] # Store original ID for updates
# 定义调度类型的中英文映射
self.schedule_type_map = {
"once": "一次性",
"daily": "每天",
"weekly": "每周",
"monthly": "每月"
}
self.schedule_type_reverse_map = {v: k for k, v in self.schedule_type_map.items()}
self.create_widgets()
self._load_task_data()
# --- 修正点:在所有部件创建并加载数据后,再居中窗口 ---
center_toplevel_window(self, master_app)
# --- 修正点结束 ---
self.transient(master_app) # Make this window a transient window of the master
self.grab_set() # Grab all events until this window is destroyed
self.wait_window(self) # Wait until this window is destroyed
def _create_new_task_template(self):
now = datetime.now()
return {
"id": str(uuid.uuid4()),
"name": "新任务",
"schedule_type": "daily", # 保持英文
"trigger": {"hour": now.hour, "minute": now.minute + 1, "second": 0}, # Default to 1 min from now
"actions": [],
"enabled": True,
"last_run_time": None,
"next_run_time": None
}
def create_widgets(self):
main_frame = ttk.Frame(self, padding="10")
main_frame.pack(fill=tk.BOTH, expand=True)
# Task Name
ttk.Label(main_frame, text="任务名称:").grid(row=0, column=0, sticky=tk.W, pady=2)
self.name_entry = ttk.Entry(main_frame)
self.name_entry.grid(row=0, column=1, sticky=(tk.W, tk.E), pady=2)
# Schedule Type
ttk.Label(main_frame, text="调度类型:").grid(row=1, column=0, sticky=tk.W, pady=2)
# 内部变量,存储英文键 (用于JSON)
self.schedule_type_internal_var = tk.StringVar(self)
# 显示变量,存储中文文本 (用于OptionMenu)
self.schedule_type_display_var = tk.StringVar(self)
# 初始化显示变量,默认显示“每天”
self.schedule_type_display_var.set(self.schedule_type_map.get(self.task_data["schedule_type"], "每天"))
self.schedule_type_optionmenu = ttk.OptionMenu(main_frame, self.schedule_type_display_var,
self.schedule_type_display_var.get(), # 默认显示值
*self.schedule_type_map.values(), # 选项为中文显示文本
command=self._on_schedule_type_change)
self.schedule_type_optionmenu.grid(row=1, column=1, sticky=(tk.W, tk.E), pady=2)
# Trigger Frame
self.trigger_frame = ttk.LabelFrame(main_frame, text="触发条件", padding="5")
self.trigger_frame.grid(row=2, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=5)
# 初始创建触发条件部件,此时 schedule_type_internal_var 尚未完全加载,但在 _load_task_data 会再次调用
self._create_trigger_widgets(self.trigger_frame)
# Actions Frame
self.actions_frame = ttk.LabelFrame(main_frame, text="执行指令", padding="5")
self.actions_frame.grid(row=3, column=0, columnspan=2, sticky=(tk.W, tk.E), pady=5)
self._create_actions_widgets(self.actions_frame)
# Enabled Checkbutton
self.enabled_var = tk.BooleanVar(self)
self.enabled_checkbutton = ttk.Checkbutton(main_frame, text="启用任务", variable=self.enabled_var)
self.enabled_checkbutton.grid(row=4, column=0, columnspan=2, sticky=tk.W, pady=5)
# Save/Cancel Buttons
button_frame = ttk.Frame(main_frame)
button_frame.grid(row=5, column=0, columnspan=2, sticky=tk.E, pady=10)
ttk.Button(button_frame, text="保存", command=self._save_task).pack(side=tk.LEFT, padx=5)
ttk.Button(button_frame, text="取消", command=self.destroy).pack(side=tk.LEFT, padx=5)
main_frame.columnconfigure(1, weight=1)
def _create_trigger_widgets(self, parent_frame):
# Clear existing trigger widgets
for widget in parent_frame.winfo_children():
widget.destroy()
# Time inputs (common to all)
ttk.Label(parent_frame, text="时间 (HH:MM:SS):").grid(row=0, column=0, sticky=tk.W, pady=2)
self.hour_var = tk.StringVar(self)
self.minute_var = tk.StringVar(self)
self.second_var = tk.StringVar(self)
self.hour_spinbox = ttk.Spinbox(parent_frame, from_=0, to=23, wrap=True, width=3, format="%02.0f", textvariable=self.hour_var)
self.minute_spinbox = ttk.Spinbox(parent_frame, from_=0, to=59, wrap=True, width=3, format="%02.0f", textvariable=self.minute_var)
self.second_spinbox = ttk.Spinbox(parent_frame, from_=0, to=59, wrap=True, width=3, format="%02.0f", textvariable=self.second_var)
self.hour_spinbox.grid(row=0, column=1, sticky=tk.W, padx=(0,2))
ttk.Label(parent_frame, text=":").grid(row=0, column=2, sticky=tk.W)
self.minute_spinbox.grid(row=0, column=3, sticky=tk.W, padx=(0,2))
ttk.Label(parent_frame, text=":").grid(row=0, column=4, sticky=tk.W)
self.second_spinbox.grid(row=0, column=5, sticky=tk.W)
# 使用内部变量获取调度类型
schedule_type = self.schedule_type_internal_var.get()
if schedule_type == "once":
ttk.Label(parent_frame, text="日期 (YYYY-MM-DD):").grid(row=1, column=0, sticky=tk.W, pady=2)
self.year_var = tk.StringVar(self)
self.month_var = tk.StringVar(self)
self.day_var = tk.StringVar(self)
# --- 新增:如果当前是新建任务且调度类型为“一次性”,则自动填充为当前日期 ---
# 检查 self.task_data["trigger"] 中是否已包含日期信息,如果未包含,则说明是新选择“一次性”类型
if "year" not in self.task_data["trigger"]:
now = datetime.now()
self.year_var.set(str(now.year))
self.month_var.set(f"{now.month:02d}")
self.day_var.set(f"{now.day:02d}")
# --- 新增结束 ---
self.year_entry = ttk.Entry(parent_frame, width=5, textvariable=self.year_var)
self.month_entry = ttk.Entry(parent_frame, width=3, textvariable=self.month_var)
self.day_entry = ttk.Entry(parent_frame, width=3, textvariable=self.day_var)
self.year_entry.grid(row=1, column=1, sticky=tk.W, padx=(0,2))
ttk.Label(parent_frame, text="-").grid(row=1, column=2, sticky=tk.W)
self.month_entry.grid(row=1, column=3, sticky=tk.W, padx=(0,2))
ttk.Label(parent_frame, text="-").grid(row=1, column=4, sticky=tk.W)
self.day_entry.grid(row=1, column=5, sticky=tk.W)
elif schedule_type == "weekly":
ttk.Label(parent_frame, text="选择星期:").grid(row=1, column=0, sticky=tk.W, pady=2)
self.days_of_week_vars = []
day_names = ["周一", "周二", "周三", "周四", "周五", "周六", "周日"]
for i, day_name in enumerate(day_names):
var = tk.BooleanVar(self)
cb = ttk.Checkbutton(parent_frame, text=day_name, variable=var)
cb.grid(row=1 + (i // 4), column=1 + (i % 4), sticky=tk.W, padx=2)
self.days_of_week_vars.append((i, var)) # Store (day_index, BooleanVar)
elif schedule_type == "monthly":
self.monthly_type_var = tk.StringVar(self)
self.monthly_type_var.set("month_day") # Default to specific day of month
ttk.Radiobutton(parent_frame, text="每月指定日期", variable=self.monthly_type_var, value="month_day", command=lambda: self._on_monthly_type_change(parent_frame)).grid(row=1, column=0, columnspan=3, sticky=tk.W)
ttk.Radiobutton(parent_frame, text="每月第N个星期X", variable=self.monthly_type_var, value="week_of_month", command=lambda: self._on_monthly_type_change(parent_frame)).grid(row=2, column=0, columnspan=3, sticky=tk.W)
self.monthly_specific_day_frame = ttk.Frame(parent_frame)
self.monthly_specific_day_frame.grid(row=1, column=3, columnspan=3, sticky=(tk.W, tk.E))
ttk.Label(self.monthly_specific_day_frame, text="日期:").pack(side=tk.LEFT)
self.month_day_var = tk.StringVar(self)
self.month_day_spinbox = ttk.Spinbox(self.monthly_specific_day_frame, from_=1, to=31, wrap=True, width=3, textvariable=self.month_day_var)
self.month_day_spinbox.pack(side=tk.LEFT, padx=2)
self.monthly_week_day_frame = ttk.Frame(parent_frame)
self.monthly_week_day_frame.grid(row=2, column=3, columnspan=3, sticky=(tk.W, tk.E))
ttk.Label(self.monthly_week_day_frame, text="").pack(side=tk.LEFT)
self.week_of_month_var = tk.StringVar(self)
self.week_of_month_optionmenu = ttk.OptionMenu(self.monthly_week_day_frame, self.week_of_month_var, "第一个", "第一个", "第二个", "第三个", "第四个", "最后一个")
self.week_of_month_optionmenu.pack(side=tk.LEFT, padx=2)
ttk.Label(self.monthly_week_day_frame, text="").pack(side=tk.LEFT)
self.day_of_week_var = tk.StringVar(self)
self.day_of_week_optionmenu = ttk.OptionMenu(self.monthly_week_day_frame, self.day_of_week_var, "星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日")
self.day_of_week_optionmenu.pack(side=tk.LEFT, padx=2)
self._on_monthly_type_change(parent_frame) # Initialize visibility
def _on_schedule_type_change(self, selected_display_type):
# 将选中的中文显示文本映射回英文内部类型
internal_type = self.schedule_type_reverse_map.get(selected_display_type, "daily")
self.schedule_type_internal_var.set(internal_type) # 更新内部变量
# 根据内部类型重新创建触发条件部件
self._create_trigger_widgets(self.trigger_frame)
self._load_trigger_data() # 重新加载触发条件数据以填充新创建的部件
def _on_monthly_type_change(self, parent_frame):
if self.monthly_type_var.get() == "month_day":
self.monthly_specific_day_frame.grid(row=1, column=3, columnspan=3, sticky=(tk.W, tk.E))
self.monthly_week_day_frame.grid_forget()
else:
self.monthly_specific_day_frame.grid_forget()
self.monthly_week_day_frame.grid(row=2, column=3, columnspan=3, sticky=(tk.W, tk.E))
def _create_actions_widgets(self, parent_frame):
# Clear existing action widgets
for widget in parent_frame.winfo_children():
widget.destroy()
self.action_entries = [] # List to hold (device_dropdown, command_dropdown, delay_entry) for each action
# Add Action Button
ttk.Button(parent_frame, text="添加指令", command=self._add_action_row).pack(pady=5)
# Scrollable frame for actions
self.action_canvas = tk.Canvas(parent_frame)
self.action_scrollbar = ttk.Scrollbar(parent_frame, orient="vertical", command=self.action_canvas.yview)
self.action_scrollable_frame = ttk.Frame(self.action_canvas)
self.action_scrollable_frame.bind(
"<Configure>",
lambda e: self.action_canvas.configure(
scrollregion=self.action_canvas.bbox("all")
)
)
self.action_canvas.create_window((0, 0), window=self.action_scrollable_frame, anchor="nw")
self.action_canvas.configure(yscrollcommand=self.action_scrollbar.set)
self.action_canvas.pack(side="left", fill="both", expand=True)
self.action_scrollbar.pack(side="right", fill="y")
# Initial action rows if any
if self.task_data["actions"]:
for action in self.task_data["actions"]:
self._add_action_row(action)
def _add_action_row(self, action_data=None):
row_frame = ttk.Frame(self.action_scrollable_frame, padding=5, relief=tk.RIDGE, borderwidth=1)
row_frame.pack(fill=tk.X, pady=2)
# Device selection
ttk.Label(row_frame, text="设备:").pack(side=tk.LEFT)
device_names = [d["name"] for d in self.master_app.devices]
device_var = tk.StringVar(self)
# --- 修正点1: 确保 lambda 捕获正确的 command_dropdown 和 command_var ---
# 先创建 command_dropdown 和 command_var再创建 device_dropdown
command_var = tk.StringVar(self)
command_dropdown = ttk.OptionMenu(row_frame, command_var, "无指令") # Placeholder
device_dropdown = ttk.OptionMenu(row_frame, device_var, device_names[0] if device_names else "无设备", *device_names,
command=lambda selected_device_name, dv=device_var, cdd=command_dropdown, cv=command_var: self._on_device_select(selected_device_name, dv, cdd, cv))
device_dropdown.pack(side=tk.LEFT, padx=5)
# Command selection (will be updated based on device)
ttk.Label(row_frame, text="指令:").pack(side=tk.LEFT)
command_dropdown.pack(side=tk.LEFT, padx=5) # 修正点2: 放置 command_dropdown
# Delay input
ttk.Label(row_frame, text="延迟(ms):").pack(side=tk.LEFT)
delay_var = tk.StringVar(self)
delay_entry = ttk.Entry(row_frame, width=5, textvariable=delay_var)
delay_entry.pack(side=tk.LEFT, padx=5)
delay_var.set("0") # Default delay
# Delete button
delete_btn = ttk.Button(row_frame, text="X", command=lambda: self._delete_action_row(row_frame))
delete_btn.pack(side=tk.RIGHT)
self.action_entries.append((row_frame, device_var, device_dropdown, command_var, command_dropdown, delay_var))
# Initialize dropdowns if action_data is provided (editing existing task)
if action_data:
device = next((d for d in self.master_app.devices if d["id"] == action_data["device_id"]), None)
if device:
device_var.set(device["name"])
self._on_device_select(device["name"], device_var, command_dropdown, command_var)
if 0 <= action_data["command_index"] < len(device["commands"]):
command_var.set(device["commands"][action_data["command_index"]]["name"])
delay_var.set(str(action_data.get("delay_ms", 0)))
else: # For new rows, set default device if available
if device_names:
device_var.set(device_names[0])
self._on_device_select(device_names[0], device_var, command_dropdown, command_var)
self.action_scrollable_frame.update_idletasks()
self.action_canvas.config(scrollregion=self.action_canvas.bbox("all"))
def _delete_action_row(self, row_frame):
for i, (frame, _, _, _, _, _) in enumerate(self.action_entries):
if frame == row_frame:
self.action_entries.pop(i)
row_frame.destroy()
break
self.action_scrollable_frame.update_idletasks()
self.action_canvas.config(scrollregion=self.action_canvas.bbox("all"))
def _on_device_select(self, selected_device_name, device_var, command_dropdown, command_var):
device = next((d for d in self.master_app.devices if d["name"] == selected_device_name), None)
if device:
commands = device.get("commands", [])
command_names = [cmd["name"] for cmd in commands]
# Update the command dropdown
menu = command_dropdown["menu"]
menu.delete(0, "end")
if command_names:
for cmd_name in command_names:
menu.add_command(label=cmd_name, command=tk._setit(command_var, cmd_name))
command_var.set(command_names[0]) # Set default to first command
else:
menu.add_command(label="无指令", command=tk._setit(command_var, "无指令"))
command_var.set("无指令")
else:
menu = command_dropdown["menu"]
menu.delete(0, "end")
menu.add_command(label="无指令", command=tk._setit(command_var, "无指令"))
command_var.set("无指令")
def _load_task_data(self):
self.name_entry.delete(0, tk.END)
self.name_entry.insert(0, self.task_data["name"])
# 设置内部变量 (英文)
self.schedule_type_internal_var.set(self.task_data["schedule_type"])
# 设置显示变量 (中文)
self.schedule_type_display_var.set(self.schedule_type_map.get(self.schedule_type_internal_var.get(), "每天"))
# 触发更新逻辑,这会调用 _create_trigger_widgets 和 _load_trigger_data
self._on_schedule_type_change(self.schedule_type_display_var.get())
self.enabled_var.set(self.task_data["enabled"])
def _load_trigger_data(self):
trigger = self.task_data["trigger"]
self.hour_var.set(f"{trigger['hour']:02d}")
self.minute_var.set(f"{trigger['minute']:02d}")
self.second_var.set(f"{trigger['second']:02d}")
# 使用内部变量获取调度类型
if self.schedule_type_internal_var.get() == "once":
# 只有当 task_data["trigger"] 中有日期信息时才加载,否则保持 _create_trigger_widgets 中设置的当前日期
if "year" in trigger:
self.year_var.set(str(trigger["year"]))
self.month_var.set(f"{trigger['month']:02d}")
self.day_var.set(f"{trigger['day']:02d}")
elif self.schedule_type_internal_var.get() == "weekly":
if hasattr(self, 'days_of_week_vars'):
for i, var in self.days_of_week_vars:
var.set(i in trigger["days_of_week"])
elif self.schedule_type_internal_var.get() == "monthly":
if "month_day" in trigger:
self.monthly_type_var.set("month_day")
self.month_day_var.set(str(trigger["month_day"]))
elif "week_of_month" in trigger:
self.monthly_type_var.set("week_of_month")
week_of_month_map = {1: "第一个", 2: "第二个", 3: "第三个", 4: "第四个", -1: "最后一个"}
day_names = ["星期一", "星期二", "星期三", "星期四", "星期五", "星期六", "星期日"]
self.week_of_month_var.set(week_of_month_map.get(trigger["week_of_month"], "第一个"))
self.day_of_week_var.set(day_names[trigger["day_of_week"]])
self._on_monthly_type_change(self.trigger_frame) # Update visibility after loading
def _save_task(self):
try:
# Basic validation
name = self.name_entry.get().strip()
if not name:
messagebox.showerror("错误", "任务名称不能为空。")
return
hour = int(self.hour_var.get())
minute = int(self.minute_var.get())
second = int(self.second_var.get())
if not (0 <= hour <= 23 and 0 <= minute <= 59 and 0 <= second <= 59):
messagebox.showerror("错误", "时间格式不正确。小时(0-23), 分钟(0-59), 秒(0-59)。")
return
new_trigger = {"hour": hour, "minute": minute, "second": second}
# 从内部变量获取调度类型 (英文)
schedule_type = self.schedule_type_internal_var.get()
if schedule_type == "once":
year = int(self.year_var.get())
month = int(self.month_var.get())
day = int(self.day_var.get())
try:
datetime(year, month, day) # Validate date
except ValueError:
messagebox.showerror("错误", "一次性任务日期无效。")
return
new_trigger.update({"year": year, "month": month, "day": day})
elif schedule_type == "weekly":
selected_days = [idx for idx, var in self.days_of_week_vars if var.get()]
if not selected_days:
messagebox.showerror("错误", "每周任务至少需要选择一天。")
return
new_trigger["days_of_week"] = selected_days
elif schedule_type == "monthly":
if self.monthly_type_var.get() == "month_day":
month_day = int(self.month_day_var.get())
if not (1 <= month_day <= 31):
messagebox.showerror("错误", "每月指定日期必须在1-31之间。")
return
new_trigger["month_day"] = month_day
else: # week_of_month
week_of_month_map_rev = {"第一个": 1, "第二个": 2, "第三个": 3, "第四个": 4, "最后一个": -1}
day_names_rev = {"星期一": 0, "星期二": 1, "星期三": 2, "星期四": 3, "星期五": 4, "星期六": 5, "星期日": 6}
week_of_month = week_of_month_map_rev.get(self.week_of_month_var.get())
day_of_week = day_names_rev.get(self.day_of_week_var.get())
if week_of_month is None or day_of_week is None:
messagebox.showerror("错误", "每月第N个星期X任务的设置无效。")
return
new_trigger["week_of_month"] = week_of_month
new_trigger["day_of_week"] = day_of_week
new_actions = []
for _, device_var, _, command_var, _, delay_var in self.action_entries:
selected_device_name = device_var.get()
selected_command_name = command_var.get()
delay = int(delay_var.get())
if selected_device_name == "无设备" or selected_command_name == "无指令":
messagebox.showerror("错误", "指令列表中有未选择设备或指令的项。")
return
if delay < 0:
messagebox.showerror("错误", "指令延迟不能为负数。")
return
device = next((d for d in self.master_app.devices if d["name"] == selected_device_name), None)
if not device:
messagebox.showerror("错误", f"设备 '{selected_device_name}' 未找到。")
return
command_index = -1
for i, cmd in enumerate(device["commands"]):
if cmd["name"] == selected_command_name:
command_index = i
break
if command_index == -1:
messagebox.showerror("错误", f"指令 '{selected_command_name}' 在设备 '{selected_device_name}' 中未找到。")
return
new_actions.append({
"device_id": device["id"],
"command_index": command_index,
"delay_ms": delay
})
if not new_actions:
messagebox.showerror("错误", "请至少添加一个执行指令。")
return
# Update or add task
self.task_data.update({
"name": name,
"schedule_type": schedule_type, # 保存英文键
"trigger": new_trigger,
"actions": new_actions,
"enabled": self.enabled_var.get()
})
# Clear last_run_time and next_run_time for re-calculation
self.task_data["last_run_time"] = None
self.task_data["next_run_time"] = None
# If it's a new task, add it to the list
if self.original_task_id not in [t["id"] for t in self.master_app.scheduled_tasks]:
self.master_app.scheduled_tasks.append(self.task_data)
self.master_app.save_config_callback()
# 修正此处:通过 master_app.master 访问 MainControlApp 的方法
self.master_app.master._recalculate_all_next_run_times() # 重新计算所有任务的下次运行时间
self.master_app.master.refresh_scheduler_display() # 立即刷新主界面的调度器状态
self.master_app._update_schedule_display() # Refresh the list in SchedulerApp
self.master_app.log_message_callback(f"定时任务 '{name}' (ID: {self.task_data['id'][:8]}...) 已保存。", "blue")
self.destroy()
except ValueError as e:
messagebox.showerror("输入错误", f"请检查输入值是否正确: {e}")
except Exception as e:
messagebox.showerror("错误", f"保存任务时发生未知错误: {e}")