"""
ModbusLink TCP传输层实现
实现基于TCP/IP的Modbus TCP协议传输,包括MBAP头处理。
TCP Transport Layer Implementation
Implements Modbus TCP protocol transport based on TCP/IP, including MBAP header processing.
"""
import socket
import struct
from typing import Optional
from .base import BaseTransport
from ..common.exceptions import ConnectionError, TimeoutError, InvalidResponseError
from ..common.language import get_message
from ..utils.logging import get_logger
[docs]
class TcpTransport(BaseTransport):
"""
Modbus TCP传输层实现
处理基于TCP/IP的Modbus TCP通信,包括:
Modbus TCP Transport Layer Implementation
Handles Modbus TCP communication based on TCP/IP, including:
- TCP socket连接管理 | TCP socket connection management
- MBAP头的构建和解析 | MBAP header construction and parsing
- 事务标识符管理 | Transaction identifier management
- 错误处理和超时管理 | Error handling and timeout management
"""
[docs]
def __init__(self, host: str, port: int = 502, timeout: float = 10.0):
"""
初始化TCP传输层 | Initialize TCP transport layer
Args:
host: 目标主机IP地址或域名 | Target host IP address or domain name
port: 目标端口,默认502(Modbus TCP标准端口) | Target port, default 502 (Modbus TCP standard port)
timeout: 超时时间(秒),默认10.0秒 | Timeout in seconds, default 10.0 seconds
Raises:
ValueError: 当参数无效时 | When parameters are invalid
"""
if not host or not isinstance(host, str):
raise ValueError(get_message(
cn="主机地址不能为空且必须是字符串",
en="Host address cannot be empty and must be a string"
))
if not isinstance(port, int) or port <= 0 or port > 65535:
raise ValueError(get_message(
cn="端口必须是1-65535之间的整数",
en="Port must be an integer between 1-65535"
))
if not isinstance(timeout, (int, float)) or timeout <= 0:
raise ValueError(get_message(
cn="超时时间必须是正数",
en="Timeout must be a positive number"
))
self.host = host
self.port = port
self.timeout = timeout
self._socket: Optional[socket.socket] = None
self._transaction_id = 0
self._logger = get_logger("transport.tcp")
[docs]
def open(self) -> None:
"""建立TCP连接 | Establish TCP connection"""
try:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.settimeout(self.timeout)
self._socket.connect((self.host, self.port))
self._logger.info(
cn=f"TCP连接已建立: {self.host}:{self.port}",
en=f"TCP connection established: {self.host}:{self.port}"
)
except socket.error as e:
raise ConnectionError(
cn=f"TCP连接失败: {e}",
en=f"TCP connection failed: {e}"
)
[docs]
def close(self) -> None:
"""关闭TCP连接 | Close TCP connection"""
if self._socket:
try:
self._socket.close()
self._logger.info(
cn=f"TCP连接已关闭: {self.host}:{self.port}",
en=f"TCP connection closed: {self.host}:{self.port}"
)
except socket.error as e:
self._logger.debug(
cn=f"关闭连接时出现错误(可忽略): {e}",
en=f"Error during connection close (ignorable): {e}"
)
finally:
self._socket = None
[docs]
def is_open(self) -> bool:
"""检查TCP连接状态 | Check TCP connection status"""
if self._socket is None:
return False
try:
# 尝试获取对端地址来检查连接状态 | Try to get peer address to check connection status
self._socket.getpeername()
return True
except socket.error:
return False
[docs]
def send_and_receive(self, slave_id: int, pdu: bytes) -> bytes:
"""
发送PDU并接收响应
实现TCP协议的完整通信流程:
Send PDU and receive response
Implements complete TCP protocol communication flow:
1. 构建MBAP头 | Build MBAP header
2. 发送请求(MBAP头 + PDU) | Send request (MBAP header + PDU)
3. 接收响应MBAP头 | Receive response MBAP header
4. 验证MBAP头 | Validate MBAP header
5. 接收响应PDU | Receive response PDU
6. 返回响应PDU | Return response PDU
"""
if not self.is_open():
raise ConnectionError(
cn="TCP连接未建立",
en="TCP connection not established"
)
# 1. 生成事务ID并构建MBAP头 | Generate transaction ID and build MBAP header
current_transaction_id = self._transaction_id
self._transaction_id = (
self._transaction_id + 1
) % 0x10000 # 16位回绕 | 16-bit wraparound
# MBAP头格式: | MBAP header format:
# - Transaction ID (2字节): 事务标识符 | Transaction identifier
# - Protocol ID (2字节): 协议标识符,固定为0x0000 | Protocol identifier, fixed to 0x0000
# - Length (2字节): 后续字节长度(Unit ID + PDU) | Length of following bytes (Unit ID + PDU)
# - Unit ID (1字节): 单元标识符(从站地址) | Unit identifier (slave address)
mbap_header = struct.pack(
">HHHB", # 大端序:2个short, 1个short, 1个byte | Big endian: 2 shorts, 1 short, 1 byte
current_transaction_id, # Transaction ID
0x0000, # Protocol ID
len(pdu)
+ 1, # Length (PDU长度 + Unit ID的1字节) | Length (PDU length + 1 byte for Unit ID)
slave_id, # Unit ID
)
# 2. 构建完整请求帧 | Build complete request frame
request_frame = mbap_header + pdu
self._logger.debug(
cn=f"TCP发送: {request_frame.hex(' ').upper()}",
en=f"TCP Send: {request_frame.hex(' ').upper()}"
)
try:
# 3. 发送请求 | Send request
if self._socket is None:
raise ConnectionError(
cn="TCP连接未建立",
en="TCP connection not established"
)
self._socket.sendall(request_frame)
# 4. 接收响应MBAP头(7字节) | Receive response MBAP header (7 bytes)
response_mbap = self._receive_exact(7)
# 5. 解析响应MBAP头 | Parse response MBAP header
(
response_transaction_id,
response_protocol_id,
response_length,
response_unit_id,
) = struct.unpack(">HHHB", response_mbap)
# 6. 验证MBAP头 | Validate MBAP header
if response_transaction_id != current_transaction_id:
raise InvalidResponseError(
cn=f"事务ID不匹配: 期望 {current_transaction_id}, 得到 {response_transaction_id}",
en=f"Transaction ID mismatch: Expected {current_transaction_id}, Received {response_transaction_id}"
)
if response_protocol_id != 0x0000:
raise InvalidResponseError(
cn=f"协议ID无效: 期望 0x0000, 得到 0x{response_protocol_id:04X}",
en=f"Invalid Protocol ID: Expected 0x0000, Received 0x{response_protocol_id:04X}"
)
if response_unit_id != slave_id:
raise InvalidResponseError(
cn=f"单元ID不匹配 : 期望 {slave_id}, 得到 {response_unit_id}",
en=f"Unit ID mismatch: Expected {slave_id}, Received {response_unit_id}"
)
# 7. 接收响应PDU | Receive response PDU
pdu_length = (
response_length - 1
) # 减去Unit ID的1字节 | Subtract 1 byte for Unit ID
if pdu_length <= 0:
raise InvalidResponseError(
cn=f"PDU长度无效: {pdu_length}",
en=f"Invalid PDU length: {pdu_length}"
)
response_pdu = self._receive_exact(pdu_length)
self._logger.debug(
cn=f"TCP接收: {(response_mbap + response_pdu).hex(' ').upper()}",
en=f"TCP Receive: {(response_mbap + response_pdu).hex(' ').upper()}"
)
# 8. 检查是否为异常响应 | Check if it's an exception response
if (
len(response_pdu) > 0 and response_pdu[0] & 0x80
): # 异常响应 | Exception response
from ..common.exceptions import ModbusException
function_code = (
response_pdu[0] & 0x7F
) # 去除异常标志位 | Remove exception flag bit
exception_code = response_pdu[1] if len(response_pdu) > 1 else 0
raise ModbusException(exception_code, function_code)
return response_pdu
except socket.timeout:
raise TimeoutError(
cn=f"TCP通信超时 ({self.timeout}秒)",
en=f"TCP communication timeout ({self.timeout}seconds)"
)
except socket.error as e:
raise ConnectionError(
cn=f"TCP通信错误: {e}",
en=f"TCP communication error: {e}"
)
def _receive_exact(self, length: int) -> bytes:
"""
精确接收指定长度的数据 | Receive exact length of data
Args:
length: 需要接收的字节数 | Number of bytes to receive
Returns:
接收到的数据 | Received data
Raises:
TimeoutError: 接收超时 | Receive timeout
ConnectionError: 连接错误 | Connection error
"""
data = b""
while len(data) < length:
try:
if self._socket is None:
raise ConnectionError(
cn="TCP连接未建立",
en="TCP connection not established"
)
chunk = self._socket.recv(length - len(data))
if not chunk:
raise ConnectionError(
cn="连接被远程主机关闭",
en="Connection closed by remote host"
)
data += chunk
except socket.timeout:
raise TimeoutError(
cn=f"接收数据超时,已接收 {len(data)}/{length} 字节",
en=f"Data receive timeout,Received {len(data)}/{length} bytes"
)
except socket.error as e:
raise ConnectionError(
cn=f"接收数据错误 : {e}",
en=f"Data receive error: {e}"
)
return data
[docs]
def __repr__(self) -> str:
"""字符串表示 | String representation"""
status = "Connected" if self.is_open() else "Disconnected"
return f"TcpTransport({self.host}:{self.port}, {status})"