"""
ModbusLink TCP传输层实现
实现基于TCP/IP的Modbus TCP协议传输,包括MBAP头处理。
TCP Transport Layer Implementation
Implements Modbus TCP protocol transport based on TCP/IP, including MBAP header processing.
"""
import socket
import struct
from typing import Optional
from .base import BaseTransport
from ..common.exceptions import ConnectionError, TimeoutError, InvalidResponseError
from ..utils.logging import get_logger
[docs]
class TcpTransport(BaseTransport):
"""
Modbus TCP传输层实现
处理基于TCP/IP的Modbus TCP通信,包括:
Modbus TCP Transport Layer Implementation
Handles Modbus TCP communication based on TCP/IP, including:
- TCP socket连接管理 | TCP socket connection management
- MBAP头的构建和解析 | MBAP header construction and parsing
- 事务标识符管理 | Transaction identifier management
- 错误处理和超时管理 | Error handling and timeout management
"""
[docs]
def __init__(self, host: str, port: int = 502, timeout: float = 10.0):
"""
初始化TCP传输层
Initialize TCP transport layer
Args:
host: 目标主机IP地址或域名 | Target host IP address or domain name
port: 目标端口,默认502(Modbus TCP标准端口) | Target port, default 502 (Modbus TCP standard port)
timeout: 超时时间(秒),默认10.0秒 | Timeout in seconds, default 10.0 seconds
Raises:
ValueError: 当参数无效时 | When parameters are invalid
TypeError: 当参数类型错误时 | When parameter types are incorrect
"""
if not host or not isinstance(host, str):
raise ValueError(
"主机地址不能为空且必须是字符串 | Host address cannot be empty and must be a string"
)
if not isinstance(port, int) or port <= 0 or port > 65535:
raise ValueError(
"端口必须是1-65535之间的整数 | Port must be an integer between 1-65535"
)
if not isinstance(timeout, (int, float)) or timeout <= 0:
raise ValueError("超时时间必须是正数 | Timeout must be a positive number")
self.host = host
self.port = port
self.timeout = timeout
self._socket: Optional[socket.socket] = None
self._transaction_id = 0
self._logger = get_logger("transport.tcp")
[docs]
def open(self) -> None:
"""建立TCP连接 | Establish TCP connection"""
try:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(self.timeout)
self._socket.connect((self.host, self.port))
self._logger.info(
f"TCP连接已建立 | TCP connection established: {self.host}:{self.port}"
)
except socket.error as e:
raise ConnectionError(f"TCP连接失败 | TCP connection failed: {e}")
[docs]
def close(self) -> None:
"""关闭TCP连接 | Close TCP connection"""
if self._socket:
try:
self._socket.close()
self._logger.info(
f"TCP连接已关闭 | TCP connection closed: {self.host}:{self.port}"
)
except socket.error as e:
self._logger.debug(
f"关闭连接时出现错误(可忽略)| Error during connection close (ignorable): {e}"
)
finally:
self._socket = None
[docs]
def is_open(self) -> bool:
"""检查TCP连接状态 | Check TCP connection status"""
if self._socket is None:
return False
try:
# 尝试获取对端地址来检查连接状态 | Try to get peer address to check connection status
self._socket.getpeername()
return True
except socket.error:
return False
[docs]
def send_and_receive(self, slave_id: int, pdu: bytes) -> bytes:
"""
发送PDU并接收响应
实现TCP协议的完整通信流程:
Send PDU and receive response
Implements complete TCP protocol communication flow:
1. 构建MBAP头 | Build MBAP header
2. 发送请求(MBAP头 + PDU) | Send request (MBAP header + PDU)
3. 接收响应MBAP头 | Receive response MBAP header
4. 验证MBAP头 | Validate MBAP header
5. 接收响应PDU | Receive response PDU
6. 返回响应PDU | Return response PDU
"""
if not self.is_open():
raise ConnectionError("TCP连接未建立 | TCP connection not established")
# 1. 生成事务ID并构建MBAP头 | Generate transaction ID and build MBAP header
current_transaction_id = self._transaction_id
self._transaction_id = (
self._transaction_id + 1
) % 0x10000 # 16位回绕 | 16-bit wraparound
# MBAP头格式: | MBAP header format:
# - Transaction ID (2字节): 事务标识符 | Transaction identifier
# - Protocol ID (2字节): 协议标识符,固定为0x0000 | Protocol identifier, fixed to 0x0000
# - Length (2字节): 后续字节长度(Unit ID + PDU) | Length of following bytes (Unit ID + PDU)
# - Unit ID (1字节): 单元标识符(从站地址) | Unit identifier (slave address)
mbap_header = struct.pack(
">HHHB", # 大端序:2个short, 1个short, 1个byte | Big endian: 2 shorts, 1 short, 1 byte
current_transaction_id, # Transaction ID
0x0000, # Protocol ID
len(pdu)
+ 1, # Length (PDU长度 + Unit ID的1字节) | Length (PDU length + 1 byte for Unit ID)
slave_id, # Unit ID
)
# 2. 构建完整请求帧 | Build complete request frame
request_frame = mbap_header + pdu
self._logger.debug(f"TCP发送 | TCP Send: {request_frame.hex(' ').upper()}")
try:
# 3. 发送请求 | Send request
if self._socket is None:
raise ConnectionError("TCP连接未建立 | TCP connection not established")
self._socket.sendall(request_frame)
# 4. 接收响应MBAP头(7字节) | Receive response MBAP header (7 bytes)
response_mbap = self._receive_exact(7)
# 5. 解析响应MBAP头 | Parse response MBAP header
(
response_transaction_id,
response_protocol_id,
response_length,
response_unit_id,
) = struct.unpack(">HHHB", response_mbap)
# 6. 验证MBAP头 | Validate MBAP header
if response_transaction_id != current_transaction_id:
raise InvalidResponseError(
f"事务ID不匹配 | Transaction ID mismatch: 期望 | Expected {current_transaction_id}, 收到 | Received {response_transaction_id}"
)
if response_protocol_id != 0x0000:
raise InvalidResponseError(
f"协议ID无效 | Invalid Protocol ID: 期望 | Expected 0x0000, 收到 | Received 0x{response_protocol_id:04X}"
)
if response_unit_id != slave_id:
raise InvalidResponseError(
f"单元ID不匹配 | Unit ID mismatch: 期望 | Expected {slave_id}, 收到 | Received {response_unit_id}"
)
# 7. 接收响应PDU | Receive response PDU
pdu_length = (
response_length - 1
) # 减去Unit ID的1字节 | Subtract 1 byte for Unit ID
if pdu_length <= 0:
raise InvalidResponseError(
f"PDU长度无效 | Invalid PDU length: {pdu_length}"
)
response_pdu = self._receive_exact(pdu_length)
self._logger.debug(
f"TCP接收 | TCP Receive: {(response_mbap + response_pdu).hex(' ').upper()}"
)
# 8. 检查是否为异常响应 | Check if it's an exception response
if (
len(response_pdu) > 0 and response_pdu[0] & 0x80
): # 异常响应 | Exception response
from ..common.exceptions import ModbusException
function_code = (
response_pdu[0] & 0x7F
) # 去除异常标志位 | Remove exception flag bit
exception_code = response_pdu[1] if len(response_pdu) > 1 else 0
raise ModbusException(exception_code, function_code)
return response_pdu
except socket.timeout:
raise TimeoutError(
f"TCP通信超时 | TCP communication timeout ({self.timeout}秒 | seconds)"
)
except socket.error as e:
raise ConnectionError(f"TCP通信错误 | TCP communication error: {e}")
def _receive_exact(self, length: int) -> bytes:
"""
精确接收指定长度的数据
Receive exact length of data
Args:
length: 需要接收的字节数 | Number of bytes to receive
Returns:
接收到的数据 | Received data
Raises:
TimeoutError: 接收超时 | Receive timeout
ConnectionError: 连接错误 | Connection error
"""
data = b""
while len(data) < length:
try:
if self._socket is None:
raise ConnectionError(
"TCP连接未建立 | TCP connection not established"
)
chunk = self._socket.recv(length - len(data))
if not chunk:
raise ConnectionError(
"连接被远程主机关闭 | Connection closed by remote host"
)
data += chunk
except socket.timeout:
raise TimeoutError(
f"接收数据超时 | Data receive timeout,已接收 | Received {len(data)}/{length} 字节 | bytes"
)
except socket.error as e:
raise ConnectionError(f"接收数据错误 | Data receive error: {e}")
return data
[docs]
def __repr__(self) -> str:
"""字符串表示 | String representation"""
status = "已连接 | Connected" if self.is_open() else "未连接 | Disconnected"
return f"TcpTransport({self.host}:{self.port}, {status})"