modbuslink.server.async_rtu_server 源代码

"""
ModbusLink 异步RTU服务器实现
提供基于串口的异步Modbus RTU服务器功能。

ModbusLink Async RTU Server Implementation
Provides serial port-based async Modbus RTU server functionality.
"""

import asyncio
import struct
from typing import Optional
import serial_asyncio
from .async_base_server import AsyncBaseModbusServer
from .data_store import ModbusDataStore
from ..common.exceptions import ConnectionError
from ..utils.crc import CRC16Modbus
from ..utils.logging import get_logger


[文档] class AsyncRtuModbusServer(AsyncBaseModbusServer): """ 异步RTU Modbus服务器 实现基于串口的异步Modbus RTU服务器。 使用CRC16校验确保数据完整性。 Async RTU Modbus Server Implements serial port-based async Modbus RTU server. Uses CRC16 checksum to ensure data integrity. """
[文档] def __init__(self, port: str, baudrate: int = 9600, bytesize: int = 8, parity: str = 'N', stopbits: int = 1, timeout: float = 1.0, data_store: Optional[ModbusDataStore] = None, slave_id: int = 1): """ 初始化异步RTU Modbus服务器 Initialize Async RTU Modbus Server Args: port: 串口名称 | Serial port name baudrate: 波特率 | Baud rate bytesize: 数据位 | Data bits parity: 校验位 ('N', 'E', 'O') | Parity ('N', 'E', 'O') stopbits: 停止位 | Stop bits timeout: 超时时间(秒) | Timeout in seconds data_store: 数据存储实例 | Data store instance slave_id: 从站地址 | Slave address """ super().__init__(data_store, slave_id) self.port = port self.baudrate = baudrate self.bytesize = bytesize self.parity = parity self.stopbits = stopbits self.timeout = timeout self._reader: Optional[asyncio.StreamReader] = None self._writer: Optional[asyncio.StreamWriter] = None self._server_task: Optional[asyncio.Task] = None self._logger = get_logger("server.rtu") # 计算字符间隔时间(3.5个字符时间) | Calculate character interval time (3.5 character times) self._char_time = 11.0 / baudrate # 11位每字符(起始位+8数据位+校验位+停止位) | 11 bits per character self._frame_timeout = max(self._char_time * 3.5, 0.001) # 最小1ms | Minimum 1ms self._logger.info(f"RTU服务器初始化 | RTU server initialized: {port}@{baudrate}, 帧超时 | Frame timeout: {self._frame_timeout:.3f}s")
[文档] async def start(self) -> None: """ 启动异步RTU服务器 Start Async RTU Server Raises: ConnectionError: 当无法打开串口时 | When serial port cannot be opened """ if self._running: self._logger.warning("服务器已在运行 | Server is already running") return try: # 打开串口连接 | Open serial connection self._reader, self._writer = await serial_asyncio.open_serial_connection( url=self.port, baudrate=self.baudrate, bytesize=self.bytesize, parity=self.parity, stopbits=self.stopbits ) self._running = True # 启动服务器任务 | Start server task self._server_task = asyncio.create_task(self._server_loop()) self._logger.info(f"RTU服务器启动成功 | RTU server started successfully: {self.port}") except Exception as e: self._logger.error(f"启动RTU服务器失败 | Failed to start RTU server: {e}") raise ConnectionError(f"无法打开串口 | Cannot open serial port: {e}")
[文档] async def stop(self) -> None: """ 停止异步RTU服务器 Stop Async RTU Server """ if not self._running: self._logger.warning("服务器未运行 | Server is not running") return self._running = False # 取消服务器任务 | Cancel server task if self._server_task: self._server_task.cancel() try: await self._server_task except asyncio.CancelledError: pass self._server_task = None # 关闭串口连接 | Close serial connection if self._writer: self._writer.close() await self._writer.wait_closed() self._writer = None self._reader = None self._logger.info("RTU服务器已停止 | RTU server stopped")
[文档] async def is_running(self) -> bool: """ 检查服务器运行状态 Check Server Running Status Returns: 如果服务器正在运行返回True,否则返回False | True if server is running, False otherwise """ return self._running and self._reader is not None and self._writer is not None
async def _server_loop(self) -> None: """ 服务器主循环 Server Main Loop """ self._logger.info("RTU服务器主循环启动 | RTU server main loop started") buffer = bytearray() try: while self._running and self._reader: try: # 读取数据 | Read data data = await asyncio.wait_for( self._reader.read(256), timeout=self.timeout ) if not data: continue buffer.extend(data) # 等待帧间隔 | Wait for frame interval await asyncio.sleep(self._frame_timeout) # 检查是否还有更多数据 | Check if there's more data try: additional_data = await asyncio.wait_for( self._reader.read(256), timeout=self._frame_timeout ) if additional_data: buffer.extend(additional_data) continue # 继续读取 | Continue reading except asyncio.TimeoutError: pass # 没有更多数据,处理当前帧 | No more data, process current frame # 处理完整帧 | Process complete frame if len(buffer) >= 4: # 最小帧长度:地址+功能码+CRC | Minimum frame length: address + function code + CRC await self._process_frame(bytes(buffer)) buffer.clear() except asyncio.TimeoutError: # 超时是正常的,继续循环 | Timeout is normal, continue loop if buffer: buffer.clear() continue except Exception as e: self._logger.error(f"服务器循环异常 | Server loop exception: {e}") if buffer: buffer.clear() await asyncio.sleep(0.1) # 短暂延迟后继续 | Brief delay before continuing except asyncio.CancelledError: self._logger.info("RTU服务器主循环被取消 | RTU server main loop cancelled") except Exception as e: self._logger.error(f"RTU服务器主循环异常 | RTU server main loop exception: {e}") finally: self._logger.info("RTU服务器主循环结束 | RTU server main loop ended") async def _process_frame(self, frame: bytes) -> None: """ 处理接收到的RTU帧 Process Received RTU Frame Args: frame: 接收到的RTU帧 | Received RTU frame """ try: if len(frame) < 4: self._logger.debug(f"帧长度不足 | Frame length insufficient: {len(frame)}") return # 提取地址、PDU和CRC | Extract address, PDU and CRC slave_id = frame[0] pdu = frame[1:-2] received_crc = struct.unpack("<H", frame[-2:])[0] # RTU使用小端序CRC | RTU uses little-endian CRC # 验证CRC | Verify CRC calculated_crc = CRC16Modbus.crc16_to_int(frame[:-2]) if received_crc != calculated_crc: self._logger.warning(f"CRC校验失败 | CRC verification failed: 接收 | Received 0x{received_crc:04X}, 计算 | Calculated 0x{calculated_crc:04X}") return self._logger.debug(f"接收到RTU帧 | Received RTU frame: 从站 | Slave {slave_id}, PDU长度 | PDU Length {len(pdu)}") # 处理请求 | Process request response_pdu = self.process_request(slave_id, pdu) if response_pdu: # 只有非广播请求才响应 | Only respond to non-broadcast requests # 构建响应帧 | Build response frame response_frame = struct.pack("B", slave_id) + response_pdu response_crc = CRC16Modbus.crc16_to_int(response_frame) response_frame += struct.pack("<H", response_crc) # 发送响应 | Send response if self._writer: self._writer.write(response_frame) await self._writer.drain() self._logger.debug(f"发送RTU响应 | Sent RTU response: 从站 | Slave {slave_id}, 帧长度 | Frame Length {len(response_frame)}") except Exception as e: self._logger.error(f"处理RTU帧时出错 | Error processing RTU frame: {e}")
[文档] async def serve_forever(self) -> None: """ 持续运行服务器直到被停止 Run Server Forever Until Stopped """ if not self._running: await self.start() if self._server_task: try: await self._server_task except asyncio.CancelledError: self._logger.info("服务器被取消 | Server cancelled") except Exception as e: self._logger.error(f"服务器运行异常 | Server running exception: {e}") raise else: raise ConnectionError("服务器未启动 | Server not started")