modbuslink.transport.rtu_transport 源代码

"""
ModbusLink RTU传输层实现

ModbusLink RTU Transport Layer Implementation
"""

import serial
import asyncio
import threading
import serial_asyncio
from typing import Optional
from serial.rs485 import RS485Settings

from .base_transport import SyncBaseTransport, AsyncBaseTransport
from ..utils.crc import CRC16Modbus
from ..common.logging import get_logger
from ..common.language import get_message
from ..common.exceptions import ConnectError, TimeOutError, InvalidReplyError, ModbusException


[文档] class SyncRtuTransport(SyncBaseTransport): """ 同步RTU传输层实现 Sync RTU Transport Layer Implementation """
[文档] def __init__( self, port: str, baudrate: int = 9600, bytesize: int = 8, parity: str = "N", stopbits: float = 1, timeout: float = 1.0, rs485_mode: Optional[RS485Settings] = None ) -> None: """ 初始化同步RTU传输层 Initialize Sync RTU Transport Layer Args: port: 串口名称(如 'COM1', '/dev/ttyUSB0') | Serial port name (e.g. 'COM1', '/dev/ttyUSB0') baudrate: 波特率(默认9600) | Baud rate (default 9600) bytesize: 数据位(默认8) | Data bits (default 8) parity: 校验位(默认无校验) | Parity (default no parity) stopbits: 停止位(默认1) | Stop bits (default 1) timeout: 超时时间(默认1.0秒) | Timeout time (default 1.0 second) rs485_mode: RS485模式(默认None) | RS485 mode (default None) Raises: ValueError: 当参数无效时 | When parameters are invalid """ if not port or not isinstance(port, str): raise ValueError(get_message( cn="串口名称不能为空且必须是字符串", en="Port name cannot be empty and must be a string" )) if not isinstance(baudrate, int) or baudrate <= 0: raise ValueError(get_message( cn="波特率必须是正整数", en="Baud rate must be a positive integer" )) if not isinstance(timeout, (int, float)) or timeout < 0.0: raise ValueError(get_message( cn="超时时间必须是正数", en="Timeout time must be a positive number" )) if not isinstance(rs485_mode, (RS485Settings, type(None))): raise ValueError(get_message( cn="RS485模式必须是RS485Settings类型", en="RS485 mode must be RS485Settings type" )) self.port = port self.baudrate = baudrate self.bytesize = bytesize self.parity = parity self.stopbits = stopbits self.timeout = timeout self.rs485_mode = rs485_mode self._serial: Optional[serial.Serial] = None self._lock = threading.Lock() self._logger = get_logger("transport.sync_rtu")
[文档] def open(self) -> None: """ 打开同步RTU传输层 Open Sync RTU Transport Layer """ try: self._serial = serial.Serial( port=self.port, baudrate=self.baudrate, bytesize=self.bytesize, parity=self.parity, stopbits=self.stopbits, timeout=self.timeout ) if not self.is_open: raise ConnectError( cn=f"无法建立RTU连接 ({self.port}@{self.baudrate})", en=f"Unable to established RTU connection ({self.port}:{self.baudrate})" ) self._logger.info( cn=f"RTU连接建立成功 ({self.port}@{self.baudrate})", en=f"RTU connection established successfully ({self.port}@{self.baudrate})" ) if self.rs485_mode: self._serial.rs485_mode = self.rs485_mode self._logger.info( cn=f"RTU RS485模式设置成功", en=f"RTU RS485 mode set successfully" ) except serial.SerialException as e: raise ConnectError( cn=f"RTU连接建立失败: {e}", en=f"RTU connection established failed: {e}" ) from e
[文档] def close(self) -> None: """ 关闭同步RTU传输层 Close Sync RTU Transport Layer """ if self._serial and self.is_open: try: self._serial.close() self._logger.info( cn=f"RTU连接关闭成功 ({self.port}@{self.baudrate})", en=f"RTU connection closed successfully ({self.port}@{self.baudrate})" ) except serial.SerialException as e: self._logger.debug( cn=f"RTU连接关闭失败 (可忽略): {e}", en=f"RTU connection closed failed (ignored): {e}" ) finally: self._serial = None
[文档] def is_open(self) -> bool: """ 检查同步RTU传输层连接状态 Check Sync RTU Transport Layer connection status Returns: 如果传输层打开则返回True,否则返回False True if the transport layer is open, otherwise False """ return self._serial is not None and self._serial.is_open
[文档] def flush(self) -> int: """ 同步清空接收缓冲区中的所有待处理数据 Sync Flush all pending data in receive buffer Returns: 丢弃的字节数 Number of bytes discarded """ if not self.is_open(): return 0 try: # 获取当前缓冲区中的字节数 | Get number of bytes currently in buffer count = self._serial.in_waiting if count > 0: self._serial.reset_input_buffer() self._logger.warning( cn=f"RTU缓冲区刷新: 丢弃了 {count} 字节", en=f"RTU buffer flush: discarded {count} bytes" ) return count except serial.SerialException: return 0
[文档] def send_and_receive(self, slave_id: int, pdu: bytes, timeout: Optional[float] = None) -> bytes: """ 同步RTU传输层PDU发送和接收数据 Sync RTU Transport Layer PDU send and receive data 通信流程 | Communication Process: 1. 构建ADU(地址 + PDU + CRC) | Build ADU (address + PDU + CRC) 2. 发送请求 | Send request 3. 接收响应 | Receive response 4. 验证ADU | Verify ADU 5. 返回响应PDU | Return response PDU Args: slave_id: 从机地址/单元标识符 | Slave address/unit identifier pdu: 协议数据单元(功能码 + 数据) | Protocol data unit (function code + data) timeout: 超时时间(秒) | Timeout time (seconds) Returns: 响应的PDU部分(功能码 + 数据) Response PDU part (function code + data) """ with self._lock: if not self.is_open(): raise ConnectError( cn=f"RTU连接未建立", en=f"RTU connection is not established" ) # 1. 构建ADU(地址 + PDU + CRC) | Build ADU (address + PDU + CRC) request_frame = bytes([slave_id]) + pdu crc = CRC16Modbus.calculate(request_frame) request_adu = request_frame + crc # 2. 发送请求 | Send request self._logger.debug( cn=f"RTU发送数据: {request_adu.hex(' ').upper()}", en=f"RTU Send data: {request_adu.hex(' ').upper()}" ) try: # 清空接收缓冲区 | Clear the receive buffer self.flush() # 发送请求 | Send request self._serial.write(request_adu) self._serial.flush() # 3. 接收响应 | Receive response response_adu = self._receive_response() self._logger.debug( cn=f"RTU接收数据: {response_adu.hex(' ').upper()}", en=f"RTU Receive data: {response_adu.hex(' ').upper()}" ) # 4. 验证ADU | Verify ADU # 验证CRC | Validate CRC if not CRC16Modbus.validate(response_adu): raise InvalidReplyError( cn="CRC校验失败", en="CRC check failed" ) # 验证从机地址 | Validate slave address if response_adu[0] != slave_id: raise InvalidReplyError( cn=f"从机地址不匹配: 期望 {slave_id},实际 {response_adu[0]}", en=f"Slave address does not match: expected {slave_id}, actual {response_adu[0]}" ) # 验证异常响应 | Validate exception response response_function_code = response_adu[1] if response_function_code & 0x80: # 异常响应 | Exception response exception_code = response_adu[2] if len(response_adu) > 2 else 0 raise ModbusException(exception_code, pdu[0]) # 5. 返回响应PDU | Return response PDU return response_adu[1:-2] # 去掉地址和CRC | Remove address and CRC except serial.SerialTimeoutException: raise TimeOutError( cn=f"RTU通信超时: ({self.timeout}秒)", en=f"RTU communication timeout: ({self.timeout} seconds)" ) except serial.SerialException as e: raise ConnectError( cn=f"RTU通信错误: {e}", en=f"RTU communication error: {e}" ) from e
def _receive_response(self) -> bytes: """ 接收完整的响应帧 Receive complete response frame Returns: 响应的PDU部分 PDU part of response Raises: ConnectError: 连接错误 | Connection error TimeOutError: 接收超时错误 | Receive timeout error """ if not self.is_open(): raise ConnectError( cn=f"RTU连接未建立", en=f"RTU connection is not established" ) # 读取地址 + 功能码 | Read address + function code response = bytes(self._serial.read(2)) if len(response) < 2: raise TimeOutError( cn=f"RTU接收数据超时,已接收 {len(response)}/2 字节", en=f"RTU receive data timeout, received {len(response)}/2 bytes" ) function_code = response[1] # 检查是否为异常响应 | Check if it's an exception response if function_code & 0x80: # 异常响应格式:地址 + 异常功能码 + 异常码 + CRC | Exception response format: address + exception function code + exception code + CRC remaining = bytes(self._serial.read(3)) # 异常码 + CRC | Exception code + CRC if len(remaining) < 3: raise TimeOutError( cn=f"RTU接收数据超时,已接收 {len(remaining)}/3 字节", en=f"RTU receive data timeout, received {len(remaining)}/3 bytes" ) return response + remaining # 正常响应 | Normal response # 读取线圈/离散输入/读取保持寄存器/输入寄存器 | Read coils/discrete inputs/holding registers/input registers if function_code in [0x01, 0x02, 0x03, 0x04]: # 格式:地址 + 功能码 + 字节数 + 数据 + CRC | Format: address + function code + byte count + data + CRC byte_count = self._serial.read(1) # 字节数 | Byte count if len(byte_count) < 1: raise TimeOutError( cn=f"RTU接收数据超时,已接收 {len(byte_count)}/1 字节", en=f"RTU receive data timeout, received {len(byte_count)}/1 bytes" ) remaining = self._serial.read(byte_count[0] + 2) # 数据 + CRC | Data + CRC if len(remaining) < byte_count[0]: raise TimeOutError( cn=f"RTU接收数据超时,已接收 {len(remaining)}/{byte_count[0]} 字节", en=f"RTU receive data timeout, received {len(remaining)}/{byte_count[0]} bytes" ) return response + byte_count + remaining # 写单个线圈/寄存器/写多个线圈/寄存器 | Write single coil/register/write multiple coils/registers if function_code in [0x05, 0x06, 0x0F, 0x10]: # 格式:地址 + 功能码 + 地址 + 值 + CRC | Format: address + function code + address + value + CRC remaining = bytes(self._serial.read(6)) # 地址 + 值 + CRC | Address + value + CRC if len(remaining) < 6: raise TimeOutError( cn=f"RTU接收数据超时,已接收 {len(remaining)}/6 字节", en=f"RTU receive data timeout, received {len(remaining)}/6 bytes" ) return response + remaining else: # 未知功能码 | Unknown function code remaining = bytes(self._serial.read(10)) # 最多再读10字节 | Read at most 10 more bytes return response + remaining
[文档] def __repr__(self) -> str: """ 返回对象的字符串表示 Return object's string representation Returns: 对象的字符串表示 Object's string representation """ return f"<SyncRtuTransport port={self.port} baudrate={self.baudrate} timeout={self.timeout}>"
[文档] class AsyncRtuTransport(AsyncBaseTransport): """ 异步RTU传输层实现 Async RTU Transport Layer Implementation """
[文档] def __init__( self, port: str, baudrate: int = 9600, bytesize: int = 8, parity: str = "N", stopbits: float = 1, timeout: float = 1.0, rs485_mode: Optional[RS485Settings] = None ) -> None: """ 初始化异步RTU传输层 Initialize Async RTU Transport Layer Args: port: 串口名称(如 'COM1', '/dev/ttyUSB0') | Serial port name (e.g. 'COM1', '/dev/ttyUSB0') baudrate: 波特率(默认9600) | Baud rate (default 9600) bytesize: 数据位(默认8) | Data bits (default 8) parity: 校验位(默认无校验) | Parity (default no parity) stopbits: 停止位(默认1) | Stop bits (default 1) timeout: 超时时间(默认1.0秒) | Timeout time (default 1.0 second) rs485_mode: RS485模式(默认None) | RS485 mode (default None) Raises: ValueError: 当参数无效时 | When parameters are invalid """ if not port or not isinstance(port, str): raise ValueError(get_message( cn="串口名称不能为空且必须是字符串", en="Port name cannot be empty and must be a string" )) if not isinstance(baudrate, int) or baudrate <= 0: raise ValueError(get_message( cn="波特率必须是正整数", en="Baud rate must be a positive integer" )) if not isinstance(timeout, (int, float)) or timeout < 0.0: raise ValueError(get_message( cn="超时时间必须是正数", en="Timeout time must be a positive number" )) if not isinstance(rs485_mode, (RS485Settings, type(None))): raise ValueError(get_message( cn="RS485模式必须是RS485Settings类型", en="RS485 mode must be RS485Settings type" )) self.port = port self.baudrate = baudrate self.bytesize = bytesize self.parity = parity self.stopbits = stopbits self.timeout = timeout self.rs485_mode = rs485_mode self._reader: Optional[asyncio.StreamReader] = None self._writer: Optional[asyncio.StreamWriter] = None self._lock = asyncio.Lock() self._logger = get_logger("transport.async_rtu")
[文档] async def open(self) -> None: """ 打开异步RTU传输层 Open Async RTU Transport Layer """ try: self._reader, self._writer = await serial_asyncio.open_serial_connection( url=self.port, baudrate=self.baudrate, bytesize=self.bytesize, parity=self.parity, stopbits=self.stopbits, ) if not self.is_open: raise ConnectError( cn=f"无法建立RTU连接 ({self.port}@{self.baudrate})", en=f"Unable to established RTU connection ({self.port}:{self.baudrate})" ) self._logger.info( cn=f"RTU连接建立成功 ({self.port}@{self.baudrate})", en=f"RTU connection established successfully ({self.port}@{self.baudrate})" ) if self.rs485_mode: transport = self._writer.transport if hasattr(transport, 'serial'): transport.serial.rs485_mode = self.rs485_mode self._logger.info( cn=f"RTU RS485模式设置成功", en=f"RTU RS485 mode set successfully" ) else: self._logger.warning( cn=f"RTU RS485模式设置失败(无法访问底层串口对象)", en=f"RTU RS485 mode set failed (Unable to access the underlying serial port object)" ) except serial.SerialException as e: raise ConnectError( cn=f"RTU连接建立失败: {e}", en=f"RTU connection established failed: {e}" ) from e
[文档] async def close(self) -> None: """ 关闭异步RTU传输层 Close Async RTU Transport Layer """ if self._writer: try: self._writer.close() await self._writer.wait_closed() self._logger.info( cn=f"RTU连接关闭成功 ({self.port}@{self.baudrate})", en=f"RTU connection closed successfully ({self.port}@{self.baudrate})" ) except serial.SerialException as e: self._logger.debug( cn=f"RTU连接关闭失败 (可忽略): {e}", en=f"RTU connection closed failed (ignored): {e}" ) finally: self._reader = None self._writer = None
[文档] def is_open(self) -> bool: """ 检查异步RTU传输层连接状态 Check Async RTU Transport Layer connection status Returns: 如果传输层打开则返回True,否则返回False True if the transport layer is open, otherwise False """ return self._reader is not None and self._writer is not None and not self._writer.is_closing()
[文档] async def flush(self) -> int: """ 异步清空接收缓冲区中的所有待处理数据 Async Flush all pending data in receive buffer Returns: 丢弃的字节数 Number of bytes discarded """ if not self.is_open(): return 0 discarded_count = 0 try: while True: # 尝试以极短的超时读取数据,清除缓冲区 # Try reading with extremely short timeout to clear buffer try: data = await asyncio.wait_for(self._reader.read(4096), timeout=0.001) if not data: break discarded_count += len(data) except asyncio.TimeoutError: break except Exception: pass if discarded_count > 0: self._logger.warning( cn=f"RTU缓冲区刷新: 丢弃了 {discarded_count} 字节", en=f"RTU buffer flush: discarded {discarded_count} bytes" ) return discarded_count
[文档] async def send_and_receive(self, slave_id: int, pdu: bytes, timeout: Optional[float] = None) -> bytes: """ 异步RTU传输层PDU发送和接收数据 Async RTU Transport Layer PDU send and receive data 通信流程 | Communication Process: 1. 构建ADU(地址 + PDU + CRC) | Build ADU (address + PDU + CRC) 2. 发送请求 | Send request 3. 接收响应 | Receive response 4. 验证ADU | Verify ADU 5. 返回响应PDU | Return response PDU Args: slave_id: 从机地址/单元标识符 | Slave address/unit identifier pdu: 协议数据单元(功能码 + 数据) | Protocol data unit (function code + data) timeout: 超时时间(秒) | Timeout time (seconds) Returns: 响应的PDU部分(功能码 + 数据) Response PDU part (function code + data) """ async with self._lock: if not self.is_open(): raise ConnectError( cn=f"RTU连接未建立", en=f"RTU connection is not established" ) # 1. 构建ADU(地址 + PDU + CRC) | Build ADU (address + PDU + CRC) request_frame = bytes([slave_id]) + pdu crc = CRC16Modbus.calculate(request_frame) request_adu = request_frame + crc # 2. 发送请求 | Send request self._logger.debug( cn=f"RTU发送数据: {request_adu.hex(' ').upper()}", en=f"RTU Send data: {request_adu.hex(' ').upper()}" ) try: # 清空接收缓冲区中的所有待处理数据 | Clear all pending data in receive buffer await self.flush() # 发送请求 | Send request self._writer.write(request_adu) await self._writer.drain() # 3. 接收响应 | Receive response response_adu = await self._receive_response() self._logger.debug( cn=f"RTU接收数据: {response_adu.hex(' ').upper()}", en=f"RTU Receive data: {response_adu.hex(' ').upper()}" ) # 4. 验证ADU | Verify ADU # 验证CRC | Validate CRC if not CRC16Modbus.validate(response_adu): raise InvalidReplyError( cn="CRC校验失败", en="CRC check failed" ) # 验证从机地址 | Validate slave address if response_adu[0] != slave_id: raise InvalidReplyError( cn=f"从机地址不匹配: 期望 {slave_id},实际 {response_adu[0]}", en=f"Slave address does not match: expected {slave_id}, actual {response_adu[0]}" ) # 验证异常响应 | Validate exception response response_function_code = response_adu[1] if response_function_code & 0x80: # 异常响应 | Exception response exception_code = response_adu[2] if len(response_adu) > 2 else 0 raise ModbusException(exception_code, pdu[0]) # 5. 返回响应PDU | Return response PDU return response_adu[1:-2] # 去掉地址和CRC | Remove address and CRC except serial.SerialTimeoutException: raise TimeOutError( cn=f"RTU通信超时: ({self.timeout}秒)", en=f"RTU communication timeout: ({self.timeout} seconds)" ) except serial.SerialException as e: raise ConnectError( cn=f"RTU通信错误: {e}", en=f"RTU communication error: {e}" ) from e
async def _receive_response(self) -> bytes: """ 接收完整的响应帧 Receive complete response frame Returns: 响应的PDU部分 PDU part of response Raises: ConnectError: 连接错误 | Connection error TimeOutError: 接收超时错误 | Receive timeout error """ if not self.is_open(): raise ConnectError( cn=f"RTU连接未建立", en=f"RTU connection is not established" ) # 读取地址 + 功能码 | Read address + function code response = await asyncio.wait_for( self._reader.readexactly(2), timeout=self.timeout ) if len(response) < 2: raise TimeOutError( cn=f"RTU接收数据超时,已接收 {len(response)}/2 字节", en=f"RTU receive data timeout, received {len(response)}/2 bytes" ) function_code = response[1] # 检查是否为异常响应 | Check if it's an exception response if function_code & 0x80: # 异常响应格式:地址 + 异常功能码 + 异常码 + CRC | Exception response format: address + exception function code + exception code + CRC remaining = await asyncio.wait_for( self._reader.readexactly(3), timeout=self.timeout ) # 异常码 + CRC | Exception code + CRC if len(remaining) < 3: raise TimeOutError( cn=f"RTU接收数据超时,已接收 {len(remaining)}/3 字节", en=f"RTU receive data timeout, received {len(remaining)}/3 bytes" ) return response + remaining # 正常响应 | Normal response # 读取线圈/离散输入/读取保持寄存器/输入寄存器 | Read coils/discrete inputs/holding registers/input registers if function_code in [0x01, 0x02, 0x03, 0x04]: # 格式:地址 + 功能码 + 字节数 + 数据 + CRC | Format: address + function code + byte count + data + CRC byte_count = await asyncio.wait_for( self._reader.readexactly(1), timeout=self.timeout ) # 字节数 | Byte count if len(byte_count) < 1: raise TimeOutError( cn=f"RTU接收数据超时,已接收 {len(byte_count)}/1 字节", en=f"RTU receive data timeout, received {len(byte_count)}/1 bytes" ) remaining = await asyncio.wait_for( self._reader.readexactly(byte_count[0] + 2), timeout=self.timeout ) # 数据 + CRC | Data + CRC if len(remaining) < byte_count[0]: raise TimeOutError( cn=f"RTU接收数据超时,已接收 {len(remaining)}/{byte_count[0]} 字节", en=f"RTU receive data timeout, received {len(remaining)}/{byte_count[0]} bytes" ) return response + byte_count + remaining # 写单个线圈/寄存器/写多个线圈/寄存器 | Write single coil/register/write multiple coils/registers if function_code in [0x05, 0x06, 0x0F, 0x10]: # 格式:地址 + 功能码 + 地址 + 值 + CRC | Format: address + function code + address + value + CRC remaining = await asyncio.wait_for( self._reader.readexactly(6), timeout=self.timeout ) # 地址 + 值 + CRC | Address + value + CRC if len(remaining) < 6: raise TimeOutError( cn=f"RTU接收数据超时,已接收 {len(remaining)}/6 字节", en=f"RTU receive data timeout, received {len(remaining)}/6 bytes" ) return response + remaining else: # 未知功能码 | Unknown function code remaining = await asyncio.wait_for( self._reader.readexactly(10), timeout=self.timeout ) # 最多再读10字节 | Read at most 10 more bytes return response + remaining
[文档] def __repr__(self) -> str: """ 返回对象的字符串表示 Return object's string representation Returns: 对象的字符串表示 Object's string representation """ return f"<AsyncRtuTransport port={self.port} baudrate={self.baudrate} timeout={self.timeout}>"