"""
ModbusLink TCP传输层实现
ModbusLink TCP Transport Layer Implementation
"""
import time
import socket
import struct
import asyncio
import threading
from typing import Optional
from .base_transport import SyncBaseTransport, AsyncBaseTransport
from ..common.logging import get_logger
from ..common.language import get_message
from ..common.exceptions import ConnectError, TimeOutError, InvalidReplyError, ModbusException
[文档]
class SyncTcpTransport(SyncBaseTransport):
"""
同步TCP传输层实现
Sync TCP Transport Layer Implementation
"""
[文档]
def __init__(
self,
host: str = "127.0.0.1",
port: int = 502,
timeout: float = 1.0
) -> None:
"""
初始化同步TCP传输层
Initialize Sync TCP Transport Layer
Args:
host: 目标主机IP地址或域名(默认"127.0.0.1") | Target host IP address or domain name (default "127.0.0.1")
port: 目标端口(默认502) | Target port (default 502)
timeout: 超时时间(默认1.0秒) | Timeout time (default 1.0 second)
Raises:
ValueError: 当参数无效时 | When parameters are invalid
"""
if not host or not isinstance(host, str):
raise ValueError(get_message(
cn="主机地址不能为空且必须是字符串",
en="Host address cannot be empty and must be a string"
))
if not isinstance(port, int) or port < 0 or port > 65535:
raise ValueError(get_message(
cn="端口必须是1-65535之间的整数",
en="Port must be an integer between 1-65535"
))
if not isinstance(timeout, (int, float)) or timeout < 0.0:
raise ValueError(get_message(
cn="超时时间必须是正数",
en="Timeout time must be a positive number"
))
self.host = host
self.port = port
self.timeout = timeout
self._socket: Optional[socket.socket] = None
self._transaction_id = 0
self._lock = threading.Lock()
self._logger = get_logger("transport.sync_tcp")
[文档]
def open(self) -> None:
"""
打开同步TCP传输层
Open Sync TCP Transport Layer
"""
try:
self._socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
self._socket.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1) # 禁用Nagle算法 | Disable Nagle algorithm
self._socket.settimeout(self.timeout)
self._socket.connect((self.host, self.port))
if not self.is_open:
raise ConnectError(
cn=f"无法建立TCP连接 ({self.host}:{self.port})",
en=f"Unable to established TCP connection ({self.host}:{self.port})"
)
self._logger.info(
cn=f"TCP连接建立成功 ({self.host}:{self.port})",
en=f"TCP connection established successfully ({self.host}:{self.port})"
)
except socket.error as e:
raise ConnectError(
cn=f"TCP连接建立失败: {e}",
en=f"TCP connection established failed: {e}"
) from e
[文档]
def close(self) -> None:
"""
关闭同步TCP传输层
Close Sync TCP Transport Layer
"""
if self._socket:
try:
self._socket.close()
self._logger.info(
cn=f"TCP连接关闭成功 ({self.host}:{self.port})",
en=f"TCP connection closed successfully ({self.host}:{self.port})"
)
except socket.error as e:
self._logger.debug(
cn=f"TCP连接关闭失败 (可忽略): {e}",
en=f"TCP connection closed failed (ignored): {e}"
)
finally:
self._socket = None
[文档]
def is_open(self) -> bool:
"""
检查同步TCP传输层连接状态
Check Sync TCP Transport Layer connection status
Returns:
如果传输层打开则返回True,否则返回False
True if the transport layer is open, otherwise False
"""
if self._socket is None:
return False
try:
self._socket.gettimeout()
return True
except socket.error:
return False
[文档]
def flush(self) -> int:
"""
同步清空接收缓冲区中的所有待处理数据
Sync Flush all pending data in receive buffer
Returns:
丢弃的字节数
Number of bytes discarded
"""
if not self.is_open():
return 0
discarded = 0
try:
# 设置非阻塞模式 | Set non-blocking mode
self._socket.setblocking(False)
while True:
try:
chunk = self._socket.recv(4096)
if not chunk:
break
discarded += len(chunk)
except BlockingIOError:
break
except socket.error:
break
finally:
# 恢复超时设置 | Restore timeout setting
self._socket.setblocking(True)
self._socket.settimeout(self.timeout)
if discarded > 0:
self._logger.warning(
cn=f"已清空接收缓冲区: 丢弃 {discarded} 字节的陈旧数据",
en=f"Flushed receive buffer: discarded {discarded} bytes of stale data"
)
return discarded
[文档]
def send_and_receive(self, slave_id: int, pdu: bytes, timeout: Optional[float] = None) -> bytes:
"""
同步TCP传输层PDU发送和接收数据
Sync TCP Transport Layer PDU send and receive data
通信流程 | Communication Process:
1. 构建MBAP头 | Build MBAP header
2. 发送MBAP头和PDU | Send MBAP header and PDU
3. 接收响应MBAP头 | Receive response MBAP header
4. 验证MBAP头 | Verify MBAP header
5. 接收响应PDU | Receive response PDU
6. 返回响应PDU | Return response PDU
Args:
slave_id: 从机地址/单元标识符 | Slave address/unit identifier
pdu: 协议数据单元(功能码 + 数据) | Protocol data unit (function code + data)
timeout: [ 未使用 ] 超时时间(秒) | [ unused ] Timeout time (seconds)
Returns:
响应的PDU部分(功能码 + 数据)
Response PDU part (function code + data)
"""
with self._lock:
if not self.is_open():
raise ConnectError(
cn=f"TCP连接未建立",
en=f"TCP connection is not established"
)
# 1. 构建MBAP头 | Build MBAP header
# 生成事务ID
transaction_id = self._transaction_id
self._transaction_id = (self._transaction_id + 1) % 0x10000 # 16位回绕 | 16-bit wraparound
# MBAP头格式: | MBAP header format:
# - Transaction ID (2字节): 事务标识符 | Transaction identifier
# - Protocol ID (2字节): 协议标识符,固定为0x0000 | Protocol identifier, fixed to 0x0000
# - Length (2字节): 后续字节长度(Unit ID + PDU) | Length of following bytes (Unit ID + PDU)
# - Unit ID (1字节): 单元标识符(从站地址) | Unit identifier (slave address)
mbap_header = struct.pack(
">HHHB",
transaction_id, # Transaction ID
0x0000, # Protocol ID
len(pdu) + 1, # Length
slave_id # Unit ID
)
# 构建完整请求帧 | Build complete request frame
request_frame = mbap_header + pdu
# 2. 发送MBAP头和PDU | Send MBAP header and PDU
self._logger.debug(
cn=f"TCP发送数据: {request_frame.hex(' ').upper()}",
en=f"TCP Send data: {request_frame.hex(' ').upper()}"
)
try:
# 发送请求 | Send request
self._socket.sendall(request_frame)
start_time = time.time()
while True:
if time.time() - start_time > self.timeout:
raise TimeOutError(
cn=f"TCP通信超时: ({self.timeout}秒)",
en=f"TCP communication timeout: ({self.timeout} seconds)"
)
# 3. 接收响应MBAP头 | Receive response MBAP header
response_mbap_header = self._receive_exact(7)
# 解析响应MBAP头 | Parse response MBAP header
(
response_transaction_id,
response_protocol_id,
response_length,
response_slave_id
) = struct.unpack(">HHHB", response_mbap_header)
# 4. 验证MBAP头 | Verify MBAP header
# 事务ID匹配检查 | Transaction ID match check
if response_transaction_id != transaction_id:
self._logger.warning(
cn=f"事务ID过期响应: 期望 {transaction_id},实际 {response_transaction_id},正在丢弃...",
en=f"Stale transaction ID response: expected {transaction_id}, actual {response_transaction_id}, discarding..."
)
# 读取完过期帧 | Read stale frame
self._receive_exact(response_length - 1)
continue
# 协议ID匹配检查 | Protocol ID match check
if response_protocol_id != 0x0000:
raise InvalidReplyError(
cn=f"协议ID不匹配: 期望 0x0000,实际 {response_protocol_id}",
en=f"Protocol ID does not match: expected 0x0000, actual {response_protocol_id}"
)
# 从机地址匹配检查 | Slave address match check
if response_slave_id != slave_id:
raise InvalidReplyError(
cn=f"从机地址不匹配: 期望 {slave_id},实际 {response_slave_id}",
en=f"Slave address does not match: expected {slave_id}, actual {response_slave_id}"
)
# 5.接收响应PDU | Receive response PDU
if response_length - 1 > 0:
pdu_length = response_length - 1
if pdu_length <= 0:
raise InvalidReplyError(
cn=f"无效的PDU长度: {pdu_length}",
en=f"Invalid PDU length: {pdu_length}"
)
response_pdu = self._receive_exact(pdu_length)
break
self._logger.debug(
cn=f"TCP接收数据: {(response_mbap_header + response_pdu).hex(' ').upper()}",
en=f"TCP Receive data: {(response_mbap_header + response_pdu).hex(' ').upper()}"
)
response_function_code = response_pdu[0]
if response_function_code & 0x80: # 异常响应 | Exception response
exception_code = response_pdu[1] if len(response_pdu) >= 2 else 0
raise ModbusException(exception_code, pdu[0])
return response_pdu
except socket.timeout:
raise TimeOutError(
cn=f"TCP通信超时: ({self.timeout}秒)",
en=f"TCP communication timeout: ({self.timeout} seconds)"
)
except socket.error as e:
raise ConnectError(
cn=f"TCP通信错误: {e}",
en=f"TCP communication error: {e}"
) from e
def _receive_exact(self, length: int, timeout: Optional[float] = None) -> bytes:
"""
接收指定长度的数据
Receive exact length of data
Args:
length: 需要接收的字节数 | Number of bytes to receive
timeout: [ 未使用 ] 接收超时时间(秒) | [ unused ] Receive timeout time (seconds)
Returns:
接收到的数据
Received data
Raises:
ConnectError: 连接错误 | Connection error
TimeOutError: 接收超时错误 | Receive timeout error
"""
if not self.is_open():
raise ConnectError(
cn=f"TCP连接未建立",
en=f"TCP connection is not established"
)
data = bytearray()
start_time = time.time()
while len(data) < length:
try:
if time.time() - start_time > self.timeout:
raise TimeOutError(
cn=f"TCP接收数据超时 ({self.timeout}s)",
en=f"TCP receive data timeout ({self.timeout}s)"
)
chunk = self._socket.recv(length - len(data))
if not chunk:
raise ConnectError(
cn=f"连接被远程主机关闭,已接收 {len(data)}/{length} 字节",
en=f"Connection closed by remote host, received {len(data)}/{length} bytes"
)
data.extend(chunk)
except socket.timeout:
raise TimeOutError(
cn=f"TCP接收数据超时,已接收 {len(data)}/{length} 字节",
en=f"TCP receive data timeout, received {len(data)}/{length} bytes"
)
except socket.error as e:
raise ConnectError(
cn=f"TCP接收数据错误: {e}",
en=f"TCP receive data error: {e}"
) from e
return data
[文档]
def __repr__(self) -> str:
"""
返回对象的字符串表示
Return object's string representation
Returns:
对象的字符串表示
Object's string representation
"""
return f"<SyncTcpTransport host={self.host} port={self.port} timeout={self.timeout}>"
[文档]
class AsyncTcpTransport(AsyncBaseTransport):
"""
异步TCP传输层实现
Async TCP Transport Layer Implementation
"""
[文档]
def __init__(
self,
host: str = "127.0.0.1",
port: int = 502,
timeout: float = 1.0
) -> None:
"""
初始化异步TCP传输层
Initialize Async TCP Transport Layer
Args:
host: 目标主机IP地址或域名(默认"127.0.0.1") | Target host IP address or domain name (default "127.0.0.1")
port: 目标端口(默认502) | Target port (default 502)
timeout: 超时时间(默认1.0秒) | Timeout time (default 1.0 second)
Raises:
ValueError: 当参数无效时 | When parameters are invalid
"""
if not host or not isinstance(host, str):
raise ValueError(get_message(
cn="主机地址不能为空且必须是字符串",
en="Host address cannot be empty and must be a string"
))
if not isinstance(port, int) or port < 0 or port > 65535:
raise ValueError(get_message(
cn="端口必须是1-65535之间的整数",
en="Port must be an integer between 1-65535"
))
if not isinstance(timeout, (int, float)) or timeout < 0.0:
raise ValueError(get_message(
cn="超时时间必须是正数",
en="Timeout time must be a positive number"
))
self.host = host
self.port = port
self.timeout = timeout
self._reader: Optional[asyncio.StreamReader] = None
self._writer: Optional[asyncio.StreamWriter] = None
self._transaction_id = 0
self._lock = asyncio.Lock()
self._logger = get_logger("transport.async_tcp")
[文档]
async def open(self) -> None:
"""
打开异步TCP传输层
Open Async TCP Transport Layer
"""
try:
self._reader, self._writer = await asyncio.wait_for(
asyncio.open_connection(self.host, self.port),
timeout=self.timeout
)
# 禁用Nagle算法 | Disable Nagle algorithm
sock = self._writer.get_extra_info('socket')
if sock is not None:
sock.setsockopt(socket.IPPROTO_TCP, socket.TCP_NODELAY, 1)
if not self.is_open:
raise ConnectError(
cn=f"无法建立TCP连接 ({self.host}:{self.port})",
en=f"Unable to established TCP connection ({self.host}:{self.port})"
)
self._logger.info(
cn=f"TCP连接建立成功 ({self.host}:{self.port})",
en=f"TCP connection established successfully ({self.host}:{self.port})"
)
except (ConnectionRefusedError, OSError) as e:
raise ConnectError(
cn=f"TCP连接建立失败: {e}",
en=f"TCP connection established failed: {e}"
) from e
[文档]
async def close(self) -> None:
"""
关闭异步TCP传输层
Close Async TCP Transport Layer
"""
if self._writer:
try:
self._writer.close()
await self._writer.wait_closed()
self._logger.info(
cn=f"TCP连接关闭成功 ({self.host}:{self.port})",
en=f"TCP connection closed successfully ({self.host}:{self.port})"
)
except (ConnectionRefusedError, OSError) as e:
self._logger.debug(
cn=f"TCP连接关闭失败 (可忽略): {e}",
en=f"TCP connection closed failed (ignored): {e}"
)
finally:
self._reader = None
self._writer = None
[文档]
def is_open(self) -> bool:
"""
检查异步TCP传输层连接状态
Check Async TCP Transport Layer connection status
Returns:
如果传输层打开则返回True,否则返回False
True if the transport layer is open, otherwise False
"""
if self._writer is None or self._reader is None:
return False
if self._writer.is_closing():
return False
transport = self._writer.transport
if transport is None or transport.is_closing():
return False
return True
[文档]
async def flush(self) -> int:
"""
异步清空接收缓冲区中的所有待处理数据
Async Flush all pending data in receive buffer
Returns:
丢弃的字节数
Number of bytes discarded
"""
if not self.is_open():
return 0
discarded = 0
try:
# 尝试读取所有可用数据,使用很短的超时时间
while True:
try:
chunk = await asyncio.wait_for(
self._reader.read(4096),
timeout=0.01 # 10ms 超时
)
if not chunk:
break
discarded += len(chunk)
except asyncio.TimeoutError:
break
except Exception:
pass
if discarded > 0:
self._logger.warning(
cn=f"已清空接收缓冲区: 丢弃 {discarded} 字节的陈旧数据",
en=f"Flushed receive buffer: discarded {discarded} bytes of stale data"
)
return discarded
[文档]
async def send_and_receive(self, slave_id: int, pdu: bytes, timeout: Optional[float] = None) -> bytes:
"""
异步TCP传输层PDU发送和接收数据
Async TCP Transport Layer PDU send and receive data
通信流程 | Communication Process:
1. 构建MBAP头 | Build MBAP header
2. 发送MBAP头和PDU | Send MBAP header and PDU
3. 接收响应MBAP头 | Receive response MBAP header
4. 验证MBAP头 | Verify MBAP header
5. 接收响应PDU | Receive response PDU
6. 返回响应PDU | Return response PDU
Args:
slave_id: 从机地址/单元标识符 | Slave address/unit identifier
pdu: 协议数据单元(功能码 + 数据) | Protocol data unit (function code + data)
timeout: [ 未使用 ] 超时时间(秒) | [ unused ] Timeout time (seconds)
Returns:
响应的PDU部分(功能码 + 数据)
Response PDU part (function code + data)
"""
async with self._lock:
if not self.is_open():
raise ConnectError(
cn=f"TCP连接未建立",
en=f"TCP connection is not established"
)
# 1. 构建MBAP头 | Build MBAP header
# 生成事务ID
transaction_id = self._transaction_id
self._transaction_id = (self._transaction_id + 1) % 0x10000 # 16位回绕 | 16-bit wraparound
# MBAP头格式: | MBAP header format:
# - Transaction ID (2字节): 事务标识符 | Transaction identifier
# - Protocol ID (2字节): 协议标识符,固定为0x0000 | Protocol identifier, fixed to 0x0000
# - Length (2字节): 后续字节长度(Unit ID + PDU) | Length of following bytes (Unit ID + PDU)
# - Unit ID (1字节): 单元标识符(从站地址) | Unit identifier (slave address)
mbap_header = struct.pack(
">HHHB",
transaction_id, # Transaction ID
0x0000, # Protocol ID
len(pdu) + 1, # Length
slave_id # Unit ID
)
# 构建完整请求帧 | Build complete request frame
request_frame = mbap_header + pdu
# 2. 发送MBAP头和PDU | Send MBAP header and PDU
self._logger.debug(
cn=f"TCP发送数据: {request_frame.hex(' ').upper()}",
en=f"TCP Send data: {request_frame.hex(' ').upper()}"
)
try:
# 发送请求 | Send request
self._writer.write(request_frame)
await asyncio.wait_for(
self._writer.drain(),
timeout=self.timeout
)
start_time = time.time()
while True:
elapsed_time = time.time() - start_time
remaining_time = self.timeout - elapsed_time
if remaining_time <= 0:
raise TimeOutError(
cn=f"TCP通信超时: ({self.timeout}秒)",
en=f"TCP communication timeout: ({self.timeout} seconds)"
)
# 3. 接收响应MBAP头 | Receive response MBAP header
response_mbap_header = await self._receive_exact(7, remaining_time)
# 解析响应MBAP头 | Parse response MBAP header
(
response_transaction_id,
response_protocol_id,
response_length,
response_slave_id
) = struct.unpack(">HHHB", response_mbap_header)
# 4. 验证MBAP头 | Verify MBAP header
# 事务ID匹配检查 | Transaction ID match check
if response_transaction_id != transaction_id:
self._logger.warning(
cn=f"事务ID过期响应: 期望 {transaction_id},实际 {response_transaction_id},正在丢弃...",
en=f"Stale transaction ID response: expected {transaction_id}, actual {response_transaction_id}, discarding..."
)
elapsed_time = time.time() - start_time
remaining_time = self.timeout - elapsed_time
if remaining_time <= 0:
raise TimeOutError(
cn=f"TCP通信超时: ({self.timeout}秒)",
en=f"TCP communication timeout: ({self.timeout} seconds)"
)
# 读取完过期帧 | Read stale frame
if response_length - 1 > 0:
await self._receive_exact(response_length - 1, remaining_time)
continue
# 协议ID匹配检查 | Protocol ID match check
if response_protocol_id != 0x0000:
raise InvalidReplyError(
cn=f"协议ID不匹配: 期望 0x0000,实际 {response_protocol_id}",
en=f"Protocol ID does not match: expected 0x0000, actual {response_protocol_id}"
)
# 从机地址匹配检查 | Slave address match check
if response_slave_id != slave_id:
raise InvalidReplyError(
cn=f"从机地址不匹配: 期望 {slave_id},实际 {response_slave_id}",
en=f"Slave address does not match: expected {slave_id}, actual {response_slave_id}"
)
# 5.接收响应PDU | Receive response PDU
pdu_length = response_length - 1
if pdu_length <= 0:
raise InvalidReplyError(
cn=f"无效的PDU长度: {pdu_length}",
en=f"Invalid PDU length: {pdu_length}"
)
elapsed_time = time.time() - start_time
remaining_time = self.timeout - elapsed_time
response_pdu = await self._receive_exact(pdu_length, remaining_time)
break
self._logger.debug(
cn=f"TCP接收数据: {(response_mbap_header + response_pdu).hex(' ').upper()}",
en=f"TCP Receive data: {(response_mbap_header + response_pdu).hex(' ').upper()}"
)
response_function_code = response_pdu[0]
if response_function_code & 0x80: # 异常响应 | Exception response
exception_code = response_pdu[1] if len(response_pdu) >= 2 else 0
raise ModbusException(exception_code, pdu[0])
return response_pdu
except asyncio.TimeoutError:
raise TimeOutError(
cn=f"TCP通信超时: ({self.timeout}秒)",
en=f"TCP communication timeout: ({self.timeout} seconds)"
)
except (ConnectionRefusedError, OSError) as e:
raise ConnectError(
cn=f"TCP通信错误: {e}",
en=f"TCP communication error: {e}"
) from e
async def _receive_exact(self, length: int, timeout: Optional[float] = None) -> bytes:
"""
接收指定长度的数据
Receive exact length of data
Args:
length: 需要接收的字节数 | Number of bytes to receive
timeout: 接收超时时间(秒) | Receive timeout time (seconds)
Returns:
接收到的数据
Received data
Raises:
ConnectError: 连接错误 | Connection error
TimeOutError: 接收超时错误 | Receive timeout error
"""
if not self.is_open():
raise ConnectError(
cn=f"TCP连接未建立",
en=f"TCP connection is not established"
)
data = bytearray()
try:
data = await asyncio.wait_for(
self._reader.readexactly(length),
timeout=self.timeout if timeout is None else timeout
)
except asyncio.IncompleteReadError:
raise ConnectError(
cn=f"连接被远程主机关闭,已接收 {len(data)}/{length} 字节",
en=f"Connection closed by remote host, received {len(data)}/{length} bytes"
)
except asyncio.TimeoutError:
raise TimeOutError(
cn=f"TCP接收数据超时,已接收 {len(data)}/{length} 字节",
en=f"TCP receive data timeout, received {len(data)}/{length} bytes"
)
except (ConnectionRefusedError, OSError) as e:
raise ConnectError(
cn=f"TCP接收数据错误: {e}",
en=f"TCP receive data error: {e}"
) from e
return data
[文档]
def __repr__(self) -> str:
"""
返回对象的字符串表示
Return object's string representation
Returns:
对象的字符串表示
Object's string representation
"""
return f"<AsyncTcpTransport host={self.host} port={self.port} timeout={self.timeout}>"