Source code for modbuslink.server.rtu_server

"""
ModbusLink 异步RTU服务器实现

ModbusLink Async RTU Server Implementation
"""

import asyncio
import struct
from typing import Optional

from .serial_server import AsyncSerialModbusServer
from .data_store import ModbusDataStore
from ..utils.crc import CRC16Modbus


[docs] class AsyncRtuModbusServer(AsyncSerialModbusServer): """ 异步RTU Modbus服务器 实现基于串口的异步Modbus RTU服务器。 使用CRC16校验确保数据完整性。 Async RTU Modbus Server Implements serial port-based async Modbus RTU server. Uses CRC16 checksum to ensure data integrity. """
[docs] def __init__( self, port: str, baudrate: int = 9600, bytesize: int = 8, parity: str = 'N', stopbits: int = 1, data_store: Optional[ModbusDataStore] = None, slave_id: int = 1 ): """ 初始化异步RTU Modbus服务器 Initialize Async RTU Modbus Server Args: port: 串口名称 | Serial port name baudrate: 波特率 | Baud rate bytesize: 数据位 | Data bits parity: 校验位 ('N', 'E', 'O') | Parity ('N', 'E', 'O') stopbits: 停止位 | Stop bits data_store: 数据存储实例 | Data store instance slave_id: 从站地址 | Slave address """ super().__init__( port=port, baudrate=baudrate, bytesize=bytesize, parity=parity, stopbits=stopbits, data_store=data_store, slave_id=slave_id, log_name="server.rtu", protocol_name="RTU" ) # 计算字符间隔时间(3.5个字符时间) | Calculate character interval time (3.5 character times) self._char_time = 11.0 / baudrate # 11位每字符(起始位+8数据位+校验位+停止位) | 11 bits per character self._frame_timeout = max(self._char_time * 3.5, 0.001) # 最小1ms | Minimum 1ms self._logger.info( cn=f"RTU服务器初始化: {port}@{baudrate}, 帧超时: {self._frame_timeout:.3f}s", en=f"RTU server initialized: {port}@{baudrate}, Frame timeout: {self._frame_timeout:.3f}s" )
async def _server_loop(self) -> None: """ 服务器主循环 Server Main Loop """ self._logger.info( cn="RTU服务器主循环启动", en="RTU server main loop started" ) buffer = bytearray() try: while self._running and self._reader: try: # 1. 等待第一块数据 (无超时或长超时,取决于业务需求) # 1. Wait for the first chunk of data # serial_asyncio 的 read 会尽可能多地读取缓冲区的数据,最大为指定字节数 data = await self._reader.read(1024) if not data: # 如果读取到空字节,可能是端口关闭或暂时无数据 await asyncio.sleep(0.01) continue buffer.extend(data) # 2. 累积读取:只要在帧超时时间内有新数据,就一直读 # 2. Accumulate read: Keep reading as long as new data arrives within frame timeout while True: try: # 尝试在静默时间内读取更多数据 (Modbus RTU 3.5字符时间) # Try to read more data within the silence interval more_data = await asyncio.wait_for( self._reader.read(1024), timeout=self._frame_timeout ) if more_data: buffer.extend(more_data) else: # 读到空数据,退出内层循环进行处理 break except asyncio.TimeoutError: # 超时意味着静默时间已到,认定帧结束 # Timeout means silence interval reached, frame end detected break except Exception: # 其他读取错误,中断累积 break # 3. 处理完整帧 # 3. Process complete frame if len(buffer) > 0: # 转换为 bytes 传递,防止引用被修改 await self._process_frame(bytes(buffer)) except asyncio.CancelledError: raise except Exception as e: self._logger.error( cn=f"RTU循环处理异常: {e}", en=f"RTU loop processing exception: {e}" ) await asyncio.sleep(0.1) # 出错后短暂暂停防止死循环耗尽CPU finally: # 无论成功与否,清空缓冲区准备下一帧 # Clear buffer for the next frame regardless of success or failure buffer.clear() except asyncio.CancelledError: self._logger.info( cn="RTU服务器主循环被取消", en="RTU server main loop cancelled" ) except Exception as e: self._logger.error( cn=f"RTU服务器主循环异常: {e}", en=f"RTU server main loop exception: {e}" ) finally: self._logger.info( cn="RTU服务器主循环结束", en="RTU server main loop ended" ) async def _process_frame(self, frame: bytes) -> None: """ 处理接收到的RTU帧 Process Received RTU Frame Args: frame: 接收到的RTU帧 | Received RTU frame """ try: if len(frame) < 4: self._logger.debug( cn=f"帧长度不足: {len(frame)}", en=f"Frame length insufficient: {len(frame)}" ) return # 提取地址、PDU和CRC | Extract address, PDU and CRC slave_id = frame[0] pdu = frame[1:-2] received_crc = frame[-2:] # 验证CRC | Verify CRC calculated_crc = CRC16Modbus.calculate(frame[:-2]) if received_crc != calculated_crc: self._logger.warning( cn=f"CRC校验失败: 接收 0x{received_crc.hex().upper()}, 计算 0x{calculated_crc.hex().upper()}", en=f"CRC verification failed: Received 0x{received_crc.hex().upper()}, Calculated 0x{calculated_crc.hex().upper()}" ) return self._logger.debug( cn=f"接收到RTU帧: 从站 {slave_id}, PDU长度 {len(pdu)}", en=f"Received RTU frame: Slave {slave_id}, PDU Length {len(pdu)}" ) # 处理请求 | Process request response_pdu = self.process_request(slave_id, pdu) if response_pdu: # 只有非广播请求才响应 | Only respond to non-broadcast requests # 构建响应帧 | Build response frame response_frame = struct.pack("B", slave_id) + response_pdu response_frame += CRC16Modbus.calculate(response_frame) # 发送响应 | Send response if self._writer: self._writer.write(response_frame) await self._writer.drain() self._logger.debug( cn=f"发送RTU响应: 从站 {slave_id}, 帧长度 {len(response_frame)}", en=f"Sent RTU response: Slave {slave_id}, Frame Length {len(response_frame)}" ) except Exception as e: self._logger.error( cn=f"处理RTU帧时出错: {e}", en=f"Error processing RTU frame: {e}" )