"""ModbusLink 异步RTU传输层实现
实现基于asyncio的异步Modbus RTU协议传输,包括CRC16校验。
Async RTU Transport Layer Implementation
Implements async Modbus RTU protocol transport based on asyncio, including CRC16 validation.
"""
import asyncio
from typing import Optional
import serial_asyncio
from .async_base import AsyncBaseTransport
from ..common.exceptions import (
ConnectionError,
TimeoutError,
CRCError,
InvalidResponseError,
)
from ..utils.crc import CRC16Modbus
from ..utils.logging import get_logger
[docs]
class AsyncRtuTransport(AsyncBaseTransport):
"""
异步Modbus RTU传输层实现
处理基于asyncio的异步Modbus RTU通信,包括:
Async Modbus RTU Transport Layer Implementation
Handles async Modbus RTU communication based on asyncio, including:
- 异步串口连接管理 | Async serial port connection management
- CRC16校验码的计算和验证 | CRC16 checksum calculation and validation
- ADU(应用数据单元)的构建和解析 | ADU (Application Data Unit) construction and parsing
- 异步错误处理和超时管理 | Async error handling and timeout management
"""
[docs]
def __init__(
self,
port: str,
baudrate: int = 9600,
bytesize: int = 8,
parity: str = "N",
stopbits: float = 1,
timeout: float = 1.0,
):
"""
初始化异步RTU传输层
Initialize async RTU transport layer
Args:
port: 串口名称 (如 'COM1', '/dev/ttyUSB0') | Serial port name (e.g. 'COM1', '/dev/ttyUSB0')
baudrate: 波特率,默认9600 | Baud rate, default 9600
bytesize: 数据位,默认8位 | Data bits, default 8 bits
parity: 校验位,默认无校验 | Parity, default no parity
stopbits: 停止位,默认1位 | Stop bits, default 1 bit
timeout: 超时时间(秒),默认1.0秒 | Timeout in seconds, default 1.0 seconds
Raises:
ValueError: 当参数无效时 | When parameters are invalid
TypeError: 当参数类型错误时 | When parameter types are incorrect
"""
if not port or not isinstance(port, str):
raise ValueError(
"串口名称不能为空且必须是字符串 | Port name cannot be empty and must be a string"
)
if not isinstance(baudrate, int) or baudrate <= 0:
raise ValueError("波特率必须是正整数 | Baudrate must be a positive integer")
if not isinstance(timeout, (int, float)) or timeout <= 0:
raise ValueError("超时时间必须是正数 | Timeout must be a positive number")
self.port = port
self.baudrate = baudrate
self.bytesize = bytesize
self.parity = parity
self.stopbits = stopbits
self.timeout = timeout
self._reader: Optional[asyncio.StreamReader] = None
self._writer: Optional[asyncio.StreamWriter] = None
self._logger = get_logger("transport.async_rtu")
[docs]
async def open(self) -> None:
"""
异步打开串口连接
Async open serial port connection
"""
try:
self._reader, self._writer = await serial_asyncio.open_serial_connection(
url=self.port,
baudrate=self.baudrate,
bytesize=self.bytesize,
parity=self.parity,
stopbits=self.stopbits,
)
self._logger.info(
f"异步RTU连接已建立 | Async RTU connection established: {self.port} @ {self.baudrate}bps"
)
except Exception as e:
raise ConnectionError(f"异步串口连接失败 | Async serial port connection failed: {e}")
[docs]
async def close(self) -> None:
"""
异步关闭串口连接
Async close serial port connection
"""
if self._writer:
try:
self._writer.close()
await self._writer.wait_closed()
self._logger.info(f"异步RTU连接已关闭 | Async RTU connection closed: {self.port}")
except Exception as e:
self._logger.debug(
f"关闭异步串口连接时出现错误(可忽略)| Error during async serial connection close (ignorable): {e}"
)
finally:
self._reader = None
self._writer = None
[docs]
async def is_open(self) -> bool:
"""
异步检查串口连接状态
Async check serial port connection status
"""
return self._reader is not None and self._writer is not None and not self._writer.is_closing()
[docs]
async def send_and_receive(self, slave_id: int, pdu: bytes) -> bytes:
"""
异步发送PDU并接收响应
实现异步RTU协议的完整通信流程:
Async send PDU and receive response
Implements complete async RTU protocol communication flow:
1. 构建ADU(地址 + PDU + CRC) | Build ADU (Address + PDU + CRC)
2. 异步发送请求 | Async send request
3. 异步接收响应 | Async receive response
4. 验证CRC | Validate CRC
5. 返回响应PDU | Return response PDU
"""
if not await self.is_open():
raise ConnectionError(
"异步串口连接未建立 | Async serial port connection not established"
)
# 1. 构建请求帧 | Build request frame
frame_prefix = bytes([slave_id]) + pdu
crc = CRC16Modbus.calculate(frame_prefix)
request_adu = frame_prefix + crc
self._logger.debug(
f"发送异步RTU请求 | Sending async RTU request: {request_adu.hex()}"
)
try:
# 2. 清空接收缓冲区并发送请求 | Clear receive buffer and send request
if self._reader.at_eof():
raise ConnectionError("异步串口连接已断开 | Async serial connection lost")
# 清空可能存在的旧数据 | Clear any existing old data
while True:
try:
await asyncio.wait_for(self._reader.read(1024), timeout=0.01)
except asyncio.TimeoutError:
break
self._writer.write(request_adu)
await self._writer.drain()
# 3. 接收响应 | Receive response
function_code = pdu[0] if pdu else 0
response_pdu = await self._receive_response(slave_id, function_code)
self._logger.debug(
f"接收到异步RTU响应PDU | Received async RTU response PDU: {response_pdu.hex()}"
)
return response_pdu
except asyncio.TimeoutError:
raise TimeoutError(
f"异步RTU通信超时 | Async RTU communication timeout: {self.timeout}s"
)
except Exception as e:
if isinstance(e, (ConnectionError, TimeoutError, CRCError, InvalidResponseError)):
raise
raise ConnectionError(f"异步RTU通信错误 | Async RTU communication error: {e}")
async def _receive_response(self, expected_slave_id: int, function_code: int) -> bytes:
"""
异步接收并验证响应帧
Async receive and validate response frame
Args:
expected_slave_id: 期望的从站地址 | Expected slave address
function_code: 功能码 | Function code
Returns:
响应的PDU部分 | PDU part of response
Raises:
TimeoutError: 接收超时 | Receive timeout
CRCError: CRC校验失败 | CRC validation failed
InvalidResponseError: 响应格式无效 | Invalid response format
"""
try:
# 接收从站地址 | Receive slave address
slave_addr_bytes = await asyncio.wait_for(
self._reader.read(1), timeout=self.timeout
)
if len(slave_addr_bytes) != 1:
raise InvalidResponseError("未接收到从站地址 | No slave address received")
received_slave_id = slave_addr_bytes[0]
if received_slave_id != expected_slave_id:
raise InvalidResponseError(
f"从站地址不匹配 | Slave address mismatch: expected {expected_slave_id}, got {received_slave_id}"
)
# 接收功能码 | Receive function code
func_code_bytes = await asyncio.wait_for(
self._reader.read(1), timeout=self.timeout
)
if len(func_code_bytes) != 1:
raise InvalidResponseError("未接收到功能码 | No function code received")
received_function_code = func_code_bytes[0]
# 检查是否为异常响应 | Check if it's an exception response
if received_function_code & 0x80:
# 异常响应:功能码 + 异常码 + CRC | Exception response: function code + exception code + CRC
exception_code_bytes = await asyncio.wait_for(
self._reader.read(1), timeout=self.timeout
)
if len(exception_code_bytes) != 1:
raise InvalidResponseError("未接收到异常码 | No exception code received")
# 接收CRC | Receive CRC
crc_bytes = await asyncio.wait_for(
self._reader.read(2), timeout=self.timeout
)
if len(crc_bytes) != 2:
raise InvalidResponseError("CRC数据不完整 | Incomplete CRC data")
# 验证CRC | Validate CRC
response_without_crc = slave_addr_bytes + func_code_bytes + exception_code_bytes
expected_crc = CRC16Modbus.calculate(response_without_crc)
if crc_bytes != expected_crc:
raise CRCError(
f"异常响应CRC校验失败 | Exception response CRC validation failed: "
f"expected {expected_crc.hex()}, got {crc_bytes.hex()}"
)
# 返回异常响应PDU | Return exception response PDU
return func_code_bytes + exception_code_bytes
# 正常响应处理 | Normal response handling
if received_function_code != function_code:
raise InvalidResponseError(
f"功能码不匹配 | Function code mismatch: expected {function_code}, got {received_function_code}"
)
# 根据功能码确定数据长度 | Determine data length based on function code
if function_code in [0x01, 0x02]: # 读线圈/离散输入 | Read coils/discrete inputs
data_length_bytes = await asyncio.wait_for(
self._reader.read(1), timeout=self.timeout
)
if len(data_length_bytes) != 1:
raise InvalidResponseError("未接收到数据长度 | No data length received")
data_length = data_length_bytes[0]
elif function_code in [0x03, 0x04]: # 读保持寄存器/输入寄存器 | Read holding/input registers
data_length_bytes = await asyncio.wait_for(
self._reader.read(1), timeout=self.timeout
)
if len(data_length_bytes) != 1:
raise InvalidResponseError("未接收到数据长度 | No data length received")
data_length = data_length_bytes[0]
elif function_code in [0x05, 0x06]: # 写单个线圈/寄存器 | Write single coil/register
data_length = 4 # 地址(2) + 值(2) | Address(2) + Value(2)
data_length_bytes = b''
elif function_code in [0x0F, 0x10]: # 写多个线圈/寄存器 | Write multiple coils/registers
data_length = 4 # 地址(2) + 数量(2) | Address(2) + Quantity(2)
data_length_bytes = b''
else:
raise InvalidResponseError(f"不支持的功能码 | Unsupported function code: {function_code}")
# 接收数据部分 | Receive data part
data_bytes = await asyncio.wait_for(
self._reader.read(data_length), timeout=self.timeout
)
if len(data_bytes) != data_length:
raise InvalidResponseError(
f"数据长度不匹配 | Data length mismatch: expected {data_length}, got {len(data_bytes)}"
)
# 接收CRC | Receive CRC
crc_bytes = await asyncio.wait_for(
self._reader.read(2), timeout=self.timeout
)
if len(crc_bytes) != 2:
raise InvalidResponseError("CRC数据不完整 | Incomplete CRC data")
# 验证CRC | Validate CRC
response_without_crc = slave_addr_bytes + func_code_bytes + data_length_bytes + data_bytes
expected_crc = CRC16Modbus.calculate(response_without_crc)
if crc_bytes != expected_crc:
raise CRCError(
f"CRC校验失败 | CRC validation failed: "
f"expected {expected_crc.hex()}, got {crc_bytes.hex()}"
)
# 返回PDU(功能码 + 数据) | Return PDU (function code + data)
return func_code_bytes + data_length_bytes + data_bytes
except asyncio.TimeoutError:
raise TimeoutError(f"异步接收响应超时 | Async receive response timeout: {self.timeout}s")
[docs]
def __repr__(self) -> str:
"""
返回传输层的字符串表示
Return string representation of transport layer
"""
status = "已连接 | Connected" if asyncio.run(self.is_open()) else "未连接 | Disconnected"
return (
f"AsyncRtuTransport(port='{self.port}', baudrate={self.baudrate}, "
f"timeout={self.timeout}, status='{status}')"
)