336 lines
20 KiB
Python
336 lines
20 KiB
Python
import socket
|
||
import threading
|
||
import time
|
||
import sys
|
||
# import subprocess # <--- 移除 subprocess 模块导入
|
||
import logging
|
||
|
||
class NetworkManager:
|
||
def __init__(self, log_callback, connection_status_callback, ip_status_callback, tcp_status_callback, app_logger):
|
||
self.log_callback = log_callback
|
||
self.connection_status_callback = connection_status_callback
|
||
self.ip_status_callback = ip_status_callback
|
||
self.tcp_status_callback = tcp_status_callback
|
||
self.app_logger = app_logger # 保存 app_logger 实例
|
||
self.sockets = {} # Dictionary to hold persistent sockets if keep_alive is true
|
||
self.socket_lock = threading.Lock() # Lock for managing self.sockets
|
||
|
||
def _prepare_command_data(self, command):
|
||
# Placeholder for actual command data preparation
|
||
if command["type"] == "hex":
|
||
return bytes.fromhex(command["data"].replace(" ", ""))
|
||
else: # Default to text
|
||
return command["data"].encode('utf-8')
|
||
|
||
def _log_with_prefix(self, message, color, is_scheduled_task):
|
||
prefix = "[调度任务] " if is_scheduled_task else ""
|
||
full_message = prefix + message
|
||
|
||
# 调用 UI 的 log_callback
|
||
self.log_callback(full_message, color)
|
||
|
||
# 根据颜色将消息发送到 app_logger 的对应级别
|
||
if color == "red":
|
||
self.app_logger.error(full_message)
|
||
elif color == "orange":
|
||
self.app_logger.warning(full_message)
|
||
elif color == "blue" or color == "green":
|
||
self.app_logger.info(full_message)
|
||
elif color == "gray":
|
||
self.app_logger.debug(full_message) # 'gray' 通常用于更详细的内部信息,映射到 DEBUG
|
||
else:
|
||
self.app_logger.info(full_message) # 默认使用 INFO
|
||
|
||
def _connect_tcp_and_send(self, device, command, is_scheduled_task=False):
|
||
# This method handles connection, sending, and receiving for TCP
|
||
# It's called by send_command_async and send_command_sync_for_scheduler
|
||
sock = None
|
||
response_data = b""
|
||
device_id = device["id"]
|
||
|
||
self.app_logger.debug(f"[NetworkManager] _connect_tcp_and_send called for device '{device['name']}', scheduled: {is_scheduled_task}")
|
||
|
||
try:
|
||
# Check for existing persistent connection if keep_alive is true
|
||
if device.get("keep_alive"):
|
||
with self.socket_lock:
|
||
sock = self.sockets.get(device_id)
|
||
if sock:
|
||
# Try to use existing socket, check if it's still open
|
||
try:
|
||
sock.send(b'') # Send a dummy byte to check connection
|
||
self.app_logger.debug(f"[NetworkManager] Using existing keep-alive socket for '{device['name']}'.")
|
||
except (socket.error, BrokenPipeError):
|
||
self._log_with_prefix(f"设备 '{device['name']}' ({device['address']}:{device['port']}) 的保持连接已断开,重新连接。", "orange", is_scheduled_task)
|
||
self.app_logger.debug(f"[NetworkManager] Keep-alive for '{device['name']}' broken, re-establishing.")
|
||
sock.close()
|
||
sock = None
|
||
if not sock:
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
sock.settimeout(device.get("timeout", 5))
|
||
sock.connect((device["address"], device["port"]))
|
||
self._log_with_prefix(f"成功建立与设备 '{device['name']}' ({device['address']}:{device['port']}) 的保持连接。", "green", is_scheduled_task)
|
||
self.app_logger.debug(f"[NetworkManager] Established new keep-alive socket for '{device['name']}'.")
|
||
self.sockets[device_id] = sock
|
||
else:
|
||
# Non-persistent connection: create new socket
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
sock.settimeout(device.get("timeout", 5))
|
||
sock.connect((device["address"], device["port"]))
|
||
self._log_with_prefix(f"成功连接到设备 '{device['name']}' ({device['address']}:{device['port']})。", "green", is_scheduled_task)
|
||
self.app_logger.debug(f"[NetworkManager] Established non-persistent connection for '{device['name']}'.")
|
||
|
||
data_to_send = self._prepare_command_data(command)
|
||
sock.sendall(data_to_send)
|
||
self._log_with_prefix(f"发送到设备 '{device['name']}' ({device['address']}:{device['port']}) 指令 '{command['name']}': {data_to_send!r}", "gray", is_scheduled_task)
|
||
self.app_logger.debug(f"[NetworkManager] Sent data to '{device['name']}': {data_to_send!r}")
|
||
|
||
# Receive response
|
||
response_data = sock.recv(1024)
|
||
response_str = response_data.decode('utf-8', errors='ignore').strip()
|
||
self._log_with_prefix(f"从设备 '{device['name']}' ({device['address']}:{device['port']}) 收到响应: {response_str!r}", "gray", is_scheduled_task)
|
||
self.app_logger.debug(f"[NetworkManager] Received response from '{device['name']}': {response_str!r}")
|
||
return response_str
|
||
|
||
except socket.timeout:
|
||
error_msg = f"与设备 '{device['name']}' ({device['address']}:{device['port']}) 通信超时。"
|
||
self._log_with_prefix(error_msg, "red", is_scheduled_task)
|
||
self.app_logger.error(f"[NetworkManager] {error_msg}")
|
||
raise ConnectionError(error_msg)
|
||
except (socket.error, ConnectionRefusedError, OSError) as e:
|
||
error_msg = f"与设备 '{device['name']}' ({device['address']}:{device['port']}) 连接或通信失败: {e}"
|
||
self_log_with_prefix(error_msg, "red", is_scheduled_task)
|
||
self.app_logger.error(f"[NetworkManager] {error_msg}")
|
||
raise ConnectionError(error_msg)
|
||
except Exception as e:
|
||
error_msg = f"发送指令 '{command['name']}' 到设备 '{device['name']}' 时发生未知错误: {e}"
|
||
self._log_with_prefix(error_msg, "red", is_scheduled_task)
|
||
self.app_logger.error(f"[NetworkManager] {error_msg}")
|
||
raise RuntimeError(error_msg)
|
||
finally:
|
||
if sock and not device.get("keep_alive"):
|
||
sock.close()
|
||
self._log_with_prefix(f"已关闭与设备 '{device['name']}' ({device['address']}:{device['port']}) 的非保持连接。", "gray", is_scheduled_task)
|
||
self.app_logger.debug(f"[NetworkManager] Closed non-persistent connection for '{device['name']}'.")
|
||
|
||
def send_command_async(self, device_index, device, command, on_finish_callback=None):
|
||
def _send_task():
|
||
self.app_logger.debug(f"[NetworkManager] send_command_async: _send_task started for '{device['name']}'.")
|
||
try:
|
||
if device["protocol"] == "TCP":
|
||
self._connect_tcp_and_send(device, command, is_scheduled_task=False) # Not a scheduled task
|
||
elif device["protocol"] == "UDP":
|
||
# UDP send logic (simplified, assuming no response needed for async)
|
||
# For UDP, you'd typically create a socket, sendto, and then close it.
|
||
# No explicit 'connection' is made.
|
||
try:
|
||
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
data_to_send = self._prepare_command_data(command)
|
||
udp_sock.sendto(data_to_send, (device["address"], device["port"]))
|
||
self._log_with_prefix(f"UDP指令 '{command['name']}' 已发送到设备 '{device['name']}' ({device['address']}:{device['port']})。", "green", False)
|
||
self.app_logger.debug(f"[NetworkManager] UDP command '{command['name']}' sent to '{device['name']}' (async).")
|
||
except Exception as e:
|
||
self._log_with_prefix(f"UDP指令 '{command['name']}' 发送失败到设备 '{device['name']}': {e}", "red", False)
|
||
self.app_logger.error(f"[NetworkManager] UDP command '{command['name']}' send failed to '{device['name']}': {e}")
|
||
finally:
|
||
if udp_sock:
|
||
udp_sock.close()
|
||
else:
|
||
self._log_with_prefix(f"不支持的协议 '{device['protocol']}'。", "red", False)
|
||
self.app_logger.error(f"[NetworkManager] Unsupported protocol '{device['protocol']}' (async).")
|
||
|
||
except Exception as e:
|
||
# Errors are already logged by _connect_tcp_and_send or specific UDP logic
|
||
self.app_logger.error(f"[NetworkManager] send_command_async error for '{device['name']}': {e}")
|
||
finally:
|
||
if on_finish_callback:
|
||
on_finish_callback()
|
||
self.app_logger.debug(f"[NetworkManager] send_command_async: _send_task finished for '{device['name']}'.")
|
||
|
||
threading.Thread(target=_send_task).start()
|
||
|
||
def send_command_sync_for_scheduler(self, device, command, is_scheduled_task=True):
|
||
"""
|
||
为调度器同步发送指令,并返回响应。
|
||
此方法在后台线程中被调用。
|
||
"""
|
||
self.app_logger.debug(f"[NetworkManager] send_command_sync_for_scheduler called for '{device['name']}', command '{command['name']}'.")
|
||
if device["protocol"] == "TCP":
|
||
return self._connect_tcp_and_send(device, command, is_scheduled_task=is_scheduled_task)
|
||
elif device["protocol"] == "UDP":
|
||
# UDP logic for scheduler (simplified)
|
||
try:
|
||
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
data_to_send = self._prepare_command_data(command)
|
||
udp_sock.sendto(data_to_send, (device["address"], device["port"]))
|
||
self._log_with_prefix(f"UDP指令 '{command['name']}' 已发送到设备 '{device['name']}' ({device['address']}:{device['port']})。", "green", is_scheduled_task)
|
||
self.app_logger.debug(f"[NetworkManager] UDP command '{command['name']}' sent to '{device['name']}' (scheduled).")
|
||
return "UDP command sent (no response expected)" # UDP typically doesn't have a direct response
|
||
except Exception as e:
|
||
self._log_with_prefix(f"UDP指令 '{command['name']}' 发送失败到设备 '{device['name']}': {e}", "red", is_scheduled_task)
|
||
self.app_logger.error(f"[NetworkManager] UDP command '{command['name']}' send failed to '{device['name']}': {e}")
|
||
raise RuntimeError(f"UDP command send failed: {e}")
|
||
finally:
|
||
if udp_sock:
|
||
udp_sock.close()
|
||
else:
|
||
self._log_with_prefix(f"不支持的协议 '{device['protocol']}'。", "red", is_scheduled_task)
|
||
self.app_logger.error(f"[NetworkManager] Unsupported protocol '{device['protocol']}' (scheduled).")
|
||
raise ValueError(f"不支持的协议: {device['protocol']}")
|
||
|
||
def check_ip_reachability_async(self, device_index, device):
|
||
"""
|
||
使用 socket 尝试 TCP 连接来检查 IP 可达性。
|
||
此方法在后台线程中运行。
|
||
"""
|
||
def _check():
|
||
is_reachable = False
|
||
sock = None
|
||
target_address = device["address"]
|
||
target_port = device["port"] # 使用设备配置的端口进行检查
|
||
|
||
self.app_logger.debug(f"[NetworkManager] Starting IP reachability check for '{device['name']}' ({target_address}:{target_port}).")
|
||
try:
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
sock.settimeout(1) # 设置一个短的超时时间,例如1秒
|
||
|
||
# connect_ex() 方法尝试连接,如果成功返回0,否则返回错误码
|
||
result_code = sock.connect_ex((target_address, target_port))
|
||
|
||
if result_code == 0:
|
||
is_reachable = True
|
||
self.app_logger.debug(f"[NetworkManager] IP reachable: TCP connect successful for '{device['name']}' ({target_address}:{target_port}).")
|
||
else:
|
||
self.app_logger.debug(f"[NetworkManager] IP not reachable: TCP connect failed for '{device['name']}' ({target_address}:{target_port}) with error code {result_code}.")
|
||
|
||
except socket.timeout:
|
||
self.app_logger.debug(f"[NetworkManager] IP not reachable: TCP connect timeout for '{device['name']}' ({target_address}:{target_port}).")
|
||
except socket.error as e:
|
||
self.app_logger.debug(f"[NetworkManager] IP not reachable: TCP connect error for '{device['name']}' ({target_address}:{target_port}): {e}")
|
||
except Exception as e:
|
||
self.app_logger.error(f"[NetworkManager] Unexpected error during TCP reachability check for '{device['name']}': {e}")
|
||
finally:
|
||
if sock:
|
||
sock.close() # 确保关闭套接字
|
||
self.ip_status_callback(device_index, is_reachable)
|
||
self.app_logger.debug(f"[NetworkManager] IP reachability check for '{device['name']}' ({device['address']}:{device['port']}) finished. Status: {'Reachable' if is_reachable else 'Unreachable'}")
|
||
threading.Thread(target=_check).start()
|
||
|
||
def check_tcp_connection_async(self, device_index, device):
|
||
"""
|
||
检查 TCP 保持连接的状态。
|
||
如果设备是 keep_alive,且 socket 存在,尝试发送空数据。
|
||
如果 socket 不存在,则尝试建立新的 keep_alive 连接。
|
||
此方法在后台线程中运行。
|
||
"""
|
||
def _check():
|
||
is_connected = False
|
||
sock_to_close = None # 用于临时创建的 socket
|
||
device_id = device["id"]
|
||
|
||
self.app_logger.debug(f"[NetworkManager] Starting TCP connection check for '{device['name']}' (keep_alive: {device.get('keep_alive')}).")
|
||
|
||
try:
|
||
if device.get("keep_alive"):
|
||
with self.socket_lock:
|
||
sock = self.sockets.get(device_id)
|
||
if sock:
|
||
try:
|
||
sock.send(b'') # Try sending dummy data to check connection
|
||
is_connected = True
|
||
self.app_logger.debug(f"[NetworkManager] TCP check: Keep-alive socket for '{device['name']}' is active.")
|
||
except (socket.error, BrokenPipeError):
|
||
self.app_logger.debug(f"[NetworkManager] TCP check: Keep-alive socket for '{device['name']}' broken, closing and removing.")
|
||
sock.close()
|
||
del self.sockets[device_id]
|
||
is_connected = False
|
||
else:
|
||
# No persistent socket yet, try to establish one to check
|
||
self.app_logger.debug(f"[NetworkManager] TCP check: No existing keep-alive for '{device['name']}', trying to establish new one.")
|
||
new_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
|
||
new_sock.settimeout(device.get("timeout", 5))
|
||
new_sock.connect((device["address"], device["port"]))
|
||
self.sockets[device_id] = new_sock # Store it for future use
|
||
is_connected = True
|
||
self.app_logger.debug(f"[NetworkManager] TCP check: New keep-alive socket established for '{device['name']}'.")
|
||
sock_to_close = None # Don't close, it's now persistent
|
||
else:
|
||
# 如果 keep_alive 是 False,我们不在此处进行 TCP 连接检查,因为它是非保持连接。
|
||
# IP 可达性检查(通过 TCP 连接尝试)已经足够。
|
||
self.app_logger.debug(f"[NetworkManager] TCP check: Device '{device['name']}' is not keep-alive, skipping detailed TCP connection check.")
|
||
is_connected = False # 明确设置为 False,因为没有保持连接
|
||
sock_to_close = None
|
||
except Exception as e:
|
||
self.app_logger.error(f"[NetworkManager] TCP connection check error for '{device['name']}': {e}")
|
||
is_connected = False
|
||
finally:
|
||
if sock_to_close: # 如果是临时创建的 socket 且未被存储为 persistent
|
||
sock_to_close.close()
|
||
self.tcp_status_callback(device_index, is_connected)
|
||
# 同时更新 IP 状态点,因为 TCP 连接成功必然意味着 IP 可达
|
||
self.ip_status_callback(device_index, is_connected) # <--- 新增:根据 TCP 状态更新 IP 状态点
|
||
threading.Thread(target=_check).start()
|
||
|
||
def check_udp_reachability_async(self, device_index, device):
|
||
"""
|
||
尝试发送一个UDP数据包来检查UDP端口的可达性。
|
||
注意:这只能确认数据包是否能发送出去,不能确认远程服务是否在监听。
|
||
"""
|
||
def _check():
|
||
is_reachable = False
|
||
sock = None
|
||
target_address = device["address"]
|
||
target_port = device["port"]
|
||
|
||
self.app_logger.debug(f"[NetworkManager] Starting UDP reachability check for '{device['name']}' ({target_address}:{target_port}).")
|
||
try:
|
||
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
|
||
sock.settimeout(1) # 短暂超时
|
||
|
||
# 尝试发送一个空数据包或一个小的"心跳"数据包
|
||
sock.sendto(b'', (target_address, target_port))
|
||
is_reachable = True # 如果sendto没有抛出异常,我们认为IP可达
|
||
self.app_logger.debug(f"[NetworkManager] UDP send successful for '{device['name']}' ({target_address}:{target_port}).")
|
||
|
||
except socket.timeout:
|
||
self.app_logger.debug(f"[NetworkManager] UDP send timeout for '{device['name']}' ({target_address}:{target_port}).")
|
||
except socket.error as e:
|
||
# 常见的错误可能是"Network is unreachable"或"Host is down"
|
||
self.app_logger.debug(f"[NetworkManager] UDP send failed for '{device['name']}' ({target_address}:{target_port}): {e}")
|
||
except Exception as e:
|
||
self.app_logger.error(f"[NetworkManager] Unexpected error during UDP reachability check for '{device['name']}': {e}")
|
||
finally:
|
||
if sock:
|
||
sock.close()
|
||
self.ip_status_callback(device_index, is_reachable)
|
||
self.app_logger.debug(f"[NetworkManager] UDP reachability check for '{device['name']}' ({device['address']}:{device['port']}) finished. Status: {'Reachable' if is_reachable else 'Unreachable'}")
|
||
threading.Thread(target=_check).start()
|
||
|
||
def close_connection_for_device(self, device):
|
||
"""关闭指定设备的保持连接"""
|
||
device_id = device["id"]
|
||
with self.socket_lock:
|
||
if device_id in self.sockets:
|
||
try:
|
||
self.sockets[device_id].close()
|
||
self.log_callback(f"已关闭设备 {device_id} 的保持连接。", "gray")
|
||
self.app_logger.debug(f"[NetworkManager] Closed keep-alive socket for device {device_id}.")
|
||
except Exception as e:
|
||
self.log_callback(f"关闭设备 {device_id} 的保持连接时出错: {e}", "red")
|
||
self.app_logger.error(f"[NetworkManager] Error closing keep-alive socket for device {device_id}: {e}")
|
||
finally:
|
||
del self.sockets[device_id]
|
||
|
||
def close_all_sockets(self):
|
||
with self.socket_lock:
|
||
for device_id, sock in self.sockets.items():
|
||
try:
|
||
sock.close()
|
||
self.log_callback(f"已关闭设备 {device_id} 的保持连接。", "gray")
|
||
self.app_logger.debug(f"[NetworkManager] Closed keep-alive socket for device {device_id}.")
|
||
except Exception as e:
|
||
self.log_callback(f"关闭设备 {device_id} 的保持连接时出错: {e}", "red")
|
||
self.app_logger.error(f"[NetworkManager] Error closing keep-alive socket for device {device_id}: {e}")
|
||
self.sockets.clear()
|
||
|