Files
TCPUDP/network_manager.py
2026-02-04 21:16:33 +08:00

336 lines
20 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import socket
import threading
import time
import sys
# import subprocess # <--- 移除 subprocess 模块导入
import logging
class NetworkManager:
def __init__(self, log_callback, connection_status_callback, ip_status_callback, tcp_status_callback, app_logger):
self.log_callback = log_callback
self.connection_status_callback = connection_status_callback
self.ip_status_callback = ip_status_callback
self.tcp_status_callback = tcp_status_callback
self.app_logger = app_logger # 保存 app_logger 实例
self.sockets = {} # Dictionary to hold persistent sockets if keep_alive is true
self.socket_lock = threading.Lock() # Lock for managing self.sockets
def _prepare_command_data(self, command):
# Placeholder for actual command data preparation
if command["type"] == "hex":
return bytes.fromhex(command["data"].replace(" ", ""))
else: # Default to text
return command["data"].encode('utf-8')
def _log_with_prefix(self, message, color, is_scheduled_task):
prefix = "[调度任务] " if is_scheduled_task else ""
full_message = prefix + message
# 调用 UI 的 log_callback
self.log_callback(full_message, color)
# 根据颜色将消息发送到 app_logger 的对应级别
if color == "red":
self.app_logger.error(full_message)
elif color == "orange":
self.app_logger.warning(full_message)
elif color == "blue" or color == "green":
self.app_logger.info(full_message)
elif color == "gray":
self.app_logger.debug(full_message) # 'gray' 通常用于更详细的内部信息,映射到 DEBUG
else:
self.app_logger.info(full_message) # 默认使用 INFO
def _connect_tcp_and_send(self, device, command, is_scheduled_task=False):
# This method handles connection, sending, and receiving for TCP
# It's called by send_command_async and send_command_sync_for_scheduler
sock = None
response_data = b""
device_id = device["id"]
self.app_logger.debug(f"[NetworkManager] _connect_tcp_and_send called for device '{device['name']}', scheduled: {is_scheduled_task}")
try:
# Check for existing persistent connection if keep_alive is true
if device.get("keep_alive"):
with self.socket_lock:
sock = self.sockets.get(device_id)
if sock:
# Try to use existing socket, check if it's still open
try:
sock.send(b'') # Send a dummy byte to check connection
self.app_logger.debug(f"[NetworkManager] Using existing keep-alive socket for '{device['name']}'.")
except (socket.error, BrokenPipeError):
self._log_with_prefix(f"设备 '{device['name']}' ({device['address']}:{device['port']}) 的保持连接已断开,重新连接。", "orange", is_scheduled_task)
self.app_logger.debug(f"[NetworkManager] Keep-alive for '{device['name']}' broken, re-establishing.")
sock.close()
sock = None
if not sock:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(device.get("timeout", 5))
sock.connect((device["address"], device["port"]))
self._log_with_prefix(f"成功建立与设备 '{device['name']}' ({device['address']}:{device['port']}) 的保持连接。", "green", is_scheduled_task)
self.app_logger.debug(f"[NetworkManager] Established new keep-alive socket for '{device['name']}'.")
self.sockets[device_id] = sock
else:
# Non-persistent connection: create new socket
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(device.get("timeout", 5))
sock.connect((device["address"], device["port"]))
self._log_with_prefix(f"成功连接到设备 '{device['name']}' ({device['address']}:{device['port']})。", "green", is_scheduled_task)
self.app_logger.debug(f"[NetworkManager] Established non-persistent connection for '{device['name']}'.")
data_to_send = self._prepare_command_data(command)
sock.sendall(data_to_send)
self._log_with_prefix(f"发送到设备 '{device['name']}' ({device['address']}:{device['port']}) 指令 '{command['name']}': {data_to_send!r}", "gray", is_scheduled_task)
self.app_logger.debug(f"[NetworkManager] Sent data to '{device['name']}': {data_to_send!r}")
# Receive response
response_data = sock.recv(1024)
response_str = response_data.decode('utf-8', errors='ignore').strip()
self._log_with_prefix(f"从设备 '{device['name']}' ({device['address']}:{device['port']}) 收到响应: {response_str!r}", "gray", is_scheduled_task)
self.app_logger.debug(f"[NetworkManager] Received response from '{device['name']}': {response_str!r}")
return response_str
except socket.timeout:
error_msg = f"与设备 '{device['name']}' ({device['address']}:{device['port']}) 通信超时。"
self._log_with_prefix(error_msg, "red", is_scheduled_task)
self.app_logger.error(f"[NetworkManager] {error_msg}")
raise ConnectionError(error_msg)
except (socket.error, ConnectionRefusedError, OSError) as e:
error_msg = f"与设备 '{device['name']}' ({device['address']}:{device['port']}) 连接或通信失败: {e}"
self_log_with_prefix(error_msg, "red", is_scheduled_task)
self.app_logger.error(f"[NetworkManager] {error_msg}")
raise ConnectionError(error_msg)
except Exception as e:
error_msg = f"发送指令 '{command['name']}' 到设备 '{device['name']}' 时发生未知错误: {e}"
self._log_with_prefix(error_msg, "red", is_scheduled_task)
self.app_logger.error(f"[NetworkManager] {error_msg}")
raise RuntimeError(error_msg)
finally:
if sock and not device.get("keep_alive"):
sock.close()
self._log_with_prefix(f"已关闭与设备 '{device['name']}' ({device['address']}:{device['port']}) 的非保持连接。", "gray", is_scheduled_task)
self.app_logger.debug(f"[NetworkManager] Closed non-persistent connection for '{device['name']}'.")
def send_command_async(self, device_index, device, command, on_finish_callback=None):
def _send_task():
self.app_logger.debug(f"[NetworkManager] send_command_async: _send_task started for '{device['name']}'.")
try:
if device["protocol"] == "TCP":
self._connect_tcp_and_send(device, command, is_scheduled_task=False) # Not a scheduled task
elif device["protocol"] == "UDP":
# UDP send logic (simplified, assuming no response needed for async)
# For UDP, you'd typically create a socket, sendto, and then close it.
# No explicit 'connection' is made.
try:
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
data_to_send = self._prepare_command_data(command)
udp_sock.sendto(data_to_send, (device["address"], device["port"]))
self._log_with_prefix(f"UDP指令 '{command['name']}' 已发送到设备 '{device['name']}' ({device['address']}:{device['port']})。", "green", False)
self.app_logger.debug(f"[NetworkManager] UDP command '{command['name']}' sent to '{device['name']}' (async).")
except Exception as e:
self._log_with_prefix(f"UDP指令 '{command['name']}' 发送失败到设备 '{device['name']}': {e}", "red", False)
self.app_logger.error(f"[NetworkManager] UDP command '{command['name']}' send failed to '{device['name']}': {e}")
finally:
if udp_sock:
udp_sock.close()
else:
self._log_with_prefix(f"不支持的协议 '{device['protocol']}'", "red", False)
self.app_logger.error(f"[NetworkManager] Unsupported protocol '{device['protocol']}' (async).")
except Exception as e:
# Errors are already logged by _connect_tcp_and_send or specific UDP logic
self.app_logger.error(f"[NetworkManager] send_command_async error for '{device['name']}': {e}")
finally:
if on_finish_callback:
on_finish_callback()
self.app_logger.debug(f"[NetworkManager] send_command_async: _send_task finished for '{device['name']}'.")
threading.Thread(target=_send_task).start()
def send_command_sync_for_scheduler(self, device, command, is_scheduled_task=True):
"""
为调度器同步发送指令,并返回响应。
此方法在后台线程中被调用。
"""
self.app_logger.debug(f"[NetworkManager] send_command_sync_for_scheduler called for '{device['name']}', command '{command['name']}'.")
if device["protocol"] == "TCP":
return self._connect_tcp_and_send(device, command, is_scheduled_task=is_scheduled_task)
elif device["protocol"] == "UDP":
# UDP logic for scheduler (simplified)
try:
udp_sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
data_to_send = self._prepare_command_data(command)
udp_sock.sendto(data_to_send, (device["address"], device["port"]))
self._log_with_prefix(f"UDP指令 '{command['name']}' 已发送到设备 '{device['name']}' ({device['address']}:{device['port']})。", "green", is_scheduled_task)
self.app_logger.debug(f"[NetworkManager] UDP command '{command['name']}' sent to '{device['name']}' (scheduled).")
return "UDP command sent (no response expected)" # UDP typically doesn't have a direct response
except Exception as e:
self._log_with_prefix(f"UDP指令 '{command['name']}' 发送失败到设备 '{device['name']}': {e}", "red", is_scheduled_task)
self.app_logger.error(f"[NetworkManager] UDP command '{command['name']}' send failed to '{device['name']}': {e}")
raise RuntimeError(f"UDP command send failed: {e}")
finally:
if udp_sock:
udp_sock.close()
else:
self._log_with_prefix(f"不支持的协议 '{device['protocol']}'", "red", is_scheduled_task)
self.app_logger.error(f"[NetworkManager] Unsupported protocol '{device['protocol']}' (scheduled).")
raise ValueError(f"不支持的协议: {device['protocol']}")
def check_ip_reachability_async(self, device_index, device):
"""
使用 socket 尝试 TCP 连接来检查 IP 可达性。
此方法在后台线程中运行。
"""
def _check():
is_reachable = False
sock = None
target_address = device["address"]
target_port = device["port"] # 使用设备配置的端口进行检查
self.app_logger.debug(f"[NetworkManager] Starting IP reachability check for '{device['name']}' ({target_address}:{target_port}).")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
sock.settimeout(1) # 设置一个短的超时时间例如1秒
# connect_ex() 方法尝试连接如果成功返回0否则返回错误码
result_code = sock.connect_ex((target_address, target_port))
if result_code == 0:
is_reachable = True
self.app_logger.debug(f"[NetworkManager] IP reachable: TCP connect successful for '{device['name']}' ({target_address}:{target_port}).")
else:
self.app_logger.debug(f"[NetworkManager] IP not reachable: TCP connect failed for '{device['name']}' ({target_address}:{target_port}) with error code {result_code}.")
except socket.timeout:
self.app_logger.debug(f"[NetworkManager] IP not reachable: TCP connect timeout for '{device['name']}' ({target_address}:{target_port}).")
except socket.error as e:
self.app_logger.debug(f"[NetworkManager] IP not reachable: TCP connect error for '{device['name']}' ({target_address}:{target_port}): {e}")
except Exception as e:
self.app_logger.error(f"[NetworkManager] Unexpected error during TCP reachability check for '{device['name']}': {e}")
finally:
if sock:
sock.close() # 确保关闭套接字
self.ip_status_callback(device_index, is_reachable)
self.app_logger.debug(f"[NetworkManager] IP reachability check for '{device['name']}' ({device['address']}:{device['port']}) finished. Status: {'Reachable' if is_reachable else 'Unreachable'}")
threading.Thread(target=_check).start()
def check_tcp_connection_async(self, device_index, device):
"""
检查 TCP 保持连接的状态。
如果设备是 keep_alive且 socket 存在,尝试发送空数据。
如果 socket 不存在,则尝试建立新的 keep_alive 连接。
此方法在后台线程中运行。
"""
def _check():
is_connected = False
sock_to_close = None # 用于临时创建的 socket
device_id = device["id"]
self.app_logger.debug(f"[NetworkManager] Starting TCP connection check for '{device['name']}' (keep_alive: {device.get('keep_alive')}).")
try:
if device.get("keep_alive"):
with self.socket_lock:
sock = self.sockets.get(device_id)
if sock:
try:
sock.send(b'') # Try sending dummy data to check connection
is_connected = True
self.app_logger.debug(f"[NetworkManager] TCP check: Keep-alive socket for '{device['name']}' is active.")
except (socket.error, BrokenPipeError):
self.app_logger.debug(f"[NetworkManager] TCP check: Keep-alive socket for '{device['name']}' broken, closing and removing.")
sock.close()
del self.sockets[device_id]
is_connected = False
else:
# No persistent socket yet, try to establish one to check
self.app_logger.debug(f"[NetworkManager] TCP check: No existing keep-alive for '{device['name']}', trying to establish new one.")
new_sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
new_sock.settimeout(device.get("timeout", 5))
new_sock.connect((device["address"], device["port"]))
self.sockets[device_id] = new_sock # Store it for future use
is_connected = True
self.app_logger.debug(f"[NetworkManager] TCP check: New keep-alive socket established for '{device['name']}'.")
sock_to_close = None # Don't close, it's now persistent
else:
# 如果 keep_alive 是 False我们不在此处进行 TCP 连接检查,因为它是非保持连接。
# IP 可达性检查(通过 TCP 连接尝试)已经足够。
self.app_logger.debug(f"[NetworkManager] TCP check: Device '{device['name']}' is not keep-alive, skipping detailed TCP connection check.")
is_connected = False # 明确设置为 False因为没有保持连接
sock_to_close = None
except Exception as e:
self.app_logger.error(f"[NetworkManager] TCP connection check error for '{device['name']}': {e}")
is_connected = False
finally:
if sock_to_close: # 如果是临时创建的 socket 且未被存储为 persistent
sock_to_close.close()
self.tcp_status_callback(device_index, is_connected)
# 同时更新 IP 状态点,因为 TCP 连接成功必然意味着 IP 可达
self.ip_status_callback(device_index, is_connected) # <--- 新增:根据 TCP 状态更新 IP 状态点
threading.Thread(target=_check).start()
def check_udp_reachability_async(self, device_index, device):
"""
尝试发送一个UDP数据包来检查UDP端口的可达性。
注意:这只能确认数据包是否能发送出去,不能确认远程服务是否在监听。
"""
def _check():
is_reachable = False
sock = None
target_address = device["address"]
target_port = device["port"]
self.app_logger.debug(f"[NetworkManager] Starting UDP reachability check for '{device['name']}' ({target_address}:{target_port}).")
try:
sock = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
sock.settimeout(1) # 短暂超时
# 尝试发送一个空数据包或一个小的"心跳"数据包
sock.sendto(b'', (target_address, target_port))
is_reachable = True # 如果sendto没有抛出异常我们认为IP可达
self.app_logger.debug(f"[NetworkManager] UDP send successful for '{device['name']}' ({target_address}:{target_port}).")
except socket.timeout:
self.app_logger.debug(f"[NetworkManager] UDP send timeout for '{device['name']}' ({target_address}:{target_port}).")
except socket.error as e:
# 常见的错误可能是"Network is unreachable"或"Host is down"
self.app_logger.debug(f"[NetworkManager] UDP send failed for '{device['name']}' ({target_address}:{target_port}): {e}")
except Exception as e:
self.app_logger.error(f"[NetworkManager] Unexpected error during UDP reachability check for '{device['name']}': {e}")
finally:
if sock:
sock.close()
self.ip_status_callback(device_index, is_reachable)
self.app_logger.debug(f"[NetworkManager] UDP reachability check for '{device['name']}' ({device['address']}:{device['port']}) finished. Status: {'Reachable' if is_reachable else 'Unreachable'}")
threading.Thread(target=_check).start()
def close_connection_for_device(self, device):
"""关闭指定设备的保持连接"""
device_id = device["id"]
with self.socket_lock:
if device_id in self.sockets:
try:
self.sockets[device_id].close()
self.log_callback(f"已关闭设备 {device_id} 的保持连接。", "gray")
self.app_logger.debug(f"[NetworkManager] Closed keep-alive socket for device {device_id}.")
except Exception as e:
self.log_callback(f"关闭设备 {device_id} 的保持连接时出错: {e}", "red")
self.app_logger.error(f"[NetworkManager] Error closing keep-alive socket for device {device_id}: {e}")
finally:
del self.sockets[device_id]
def close_all_sockets(self):
with self.socket_lock:
for device_id, sock in self.sockets.items():
try:
sock.close()
self.log_callback(f"已关闭设备 {device_id} 的保持连接。", "gray")
self.app_logger.debug(f"[NetworkManager] Closed keep-alive socket for device {device_id}.")
except Exception as e:
self.log_callback(f"关闭设备 {device_id} 的保持连接时出错: {e}", "red")
self.app_logger.error(f"[NetworkManager] Error closing keep-alive socket for device {device_id}: {e}")
self.sockets.clear()