modbuslink.transport.async_ascii 源代码

"""ModbusLink 异步ASCII传输层实现
实现基于asyncio的异步Modbus ASCII协议传输,包括LRC校验。

Async ASCII Transport Layer Implementation
Implements async Modbus ASCII protocol transport based on asyncio, including LRC validation.
"""

import asyncio
from typing import Optional

import serial_asyncio

from .async_base import AsyncBaseTransport
from ..common.exceptions import (
    ConnectionError,
    TimeoutError,
    CRCError,
    InvalidResponseError,
)
from ..utils.logging import get_logger


[文档] class AsyncAsciiTransport(AsyncBaseTransport): """ 异步Modbus ASCII传输层实现 处理基于asyncio的异步Modbus ASCII通信,包括: Async Modbus ASCII Transport Layer Implementation Handles async Modbus ASCII communication based on asyncio, including: - 异步串口连接管理 | Async serial port connection management - LRC校验码的计算和验证 | LRC checksum calculation and validation - ASCII编码和解码 | ASCII encoding and decoding - ADU(应用数据单元)的构建和解析 | ADU (Application Data Unit) construction and parsing - 异步错误处理和超时管理 | Async error handling and timeout management """
[文档] def __init__( self, port: str, baudrate: int = 9600, bytesize: int = 7, parity: str = "E", stopbits: float = 1, timeout: float = 1.0, ): """ 初始化异步ASCII传输层 Initialize async ASCII transport layer Args: port: 串口名称 (如 'COM1', '/dev/ttyUSB0') | Serial port name (e.g. 'COM1', '/dev/ttyUSB0') baudrate: 波特率,默认9600 | Baud rate, default 9600 bytesize: 数据位,默认7位 | Data bits, default 7 bits parity: 校验位,默认偶校验 | Parity, default even parity stopbits: 停止位,默认1位 | Stop bits, default 1 bit timeout: 超时时间(秒),默认1.0秒 | Timeout in seconds, default 1.0 seconds Raises: ValueError: 当参数无效时 | When parameters are invalid TypeError: 当参数类型错误时 | When parameter types are incorrect """ if not port or not isinstance(port, str): raise ValueError( "串口名称不能为空且必须是字符串 | Port name cannot be empty and must be a string" ) if not isinstance(baudrate, int) or baudrate <= 0: raise ValueError("波特率必须是正整数 | Baudrate must be a positive integer") if not isinstance(timeout, (int, float)) or timeout <= 0: raise ValueError("超时时间必须是正数 | Timeout must be a positive number") self.port = port self.baudrate = baudrate self.bytesize = bytesize self.parity = parity self.stopbits = stopbits self.timeout = timeout self._reader: Optional[asyncio.StreamReader] = None self._writer: Optional[asyncio.StreamWriter] = None self._logger = get_logger("transport.async_ascii")
[文档] async def open(self) -> None: """ 异步打开串口连接 Async open serial port connection """ try: self._reader, self._writer = await serial_asyncio.open_serial_connection( url=self.port, baudrate=self.baudrate, bytesize=self.bytesize, parity=self.parity, stopbits=self.stopbits, ) self._logger.info( f"异步ASCII连接已建立 | Async ASCII connection established: {self.port} @ {self.baudrate}bps" ) except Exception as e: raise ConnectionError(f"异步串口连接失败 | Async serial port connection failed: {e}")
[文档] async def close(self) -> None: """ 异步关闭串口连接 Async close serial port connection """ if self._writer: try: self._writer.close() await self._writer.wait_closed() self._logger.info(f"异步ASCII连接已关闭 | Async ASCII connection closed: {self.port}") except Exception as e: self._logger.debug( f"关闭异步串口连接时出现错误(可忽略)| Error during async serial connection close (ignorable): {e}" ) finally: self._reader = None self._writer = None
[文档] async def is_open(self) -> bool: """ 异步检查串口连接状态 Async check serial port connection status """ return self._reader is not None and self._writer is not None and not self._writer.is_closing()
[文档] async def send_and_receive(self, slave_id: int, pdu: bytes) -> bytes: """ 异步发送PDU并接收响应 实现异步ASCII协议的完整通信流程: Async send PDU and receive response Implements complete async ASCII protocol communication flow: 1. 构建ASCII帧(:地址PDU LRC CR LF) | Build ASCII frame (:Address PDU LRC CR LF) 2. 异步发送请求 | Async send request 3. 异步接收响应 | Async receive response 4. 验证LRC | Validate LRC 5. 返回响应PDU | Return response PDU """ if not await self.is_open(): raise ConnectionError( "异步串口连接未建立 | Async serial port connection not established" ) # 1. 构建请求帧 | Build request frame frame_data = bytes([slave_id]) + pdu lrc = self._calculate_lrc(frame_data) # ASCII编码:冒号 + 数据的十六进制表示 + LRC + CRLF | ASCII encoding: colon + hex data + LRC + CRLF ascii_data = frame_data + bytes([lrc]) ascii_frame = b':' + ascii_data.hex().upper().encode('ascii') + b'\r\n' self._logger.debug( f"发送异步ASCII请求 | Sending async ASCII request: {ascii_frame.decode('ascii', errors='ignore')}" ) try: # 2. 清空接收缓冲区并发送请求 | Clear receive buffer and send request if self._reader.at_eof(): raise ConnectionError("异步串口连接已断开 | Async serial connection lost") # 清空可能存在的旧数据 | Clear any existing old data while True: try: await asyncio.wait_for(self._reader.read(1024), timeout=0.01) except asyncio.TimeoutError: break self._writer.write(ascii_frame) await self._writer.drain() # 3. 接收响应 | Receive response function_code = pdu[0] if pdu else 0 response_pdu = await self._receive_response(slave_id, function_code) self._logger.debug( f"接收到异步ASCII响应PDU | Received async ASCII response PDU: {response_pdu.hex()}" ) return response_pdu except asyncio.TimeoutError: raise TimeoutError( f"异步ASCII通信超时 | Async ASCII communication timeout: {self.timeout}s" ) except Exception as e: if isinstance(e, (ConnectionError, TimeoutError, CRCError, InvalidResponseError)): raise raise ConnectionError(f"异步ASCII通信错误 | Async ASCII communication error: {e}")
async def _receive_response(self, expected_slave_id: int, function_code: int) -> bytes: """ 异步接收并验证响应帧 Async receive and validate response frame Args: expected_slave_id: 期望的从站地址 | Expected slave address function_code: 功能码 | Function code Returns: 响应的PDU部分 | PDU part of response Raises: TimeoutError: 接收超时 | Receive timeout CRCError: LRC校验失败 | LRC validation failed InvalidResponseError: 响应格式无效 | Invalid response format """ try: # 接收完整的ASCII帧直到CRLF | Receive complete ASCII frame until CRLF response_line = await asyncio.wait_for( self._reader.readuntil(b'\r\n'), timeout=self.timeout ) # 验证帧格式 | Validate frame format if not response_line.startswith(b':'): raise InvalidResponseError("ASCII响应帧格式无效:缺少起始冒号 | Invalid ASCII response frame format: missing start colon") if not response_line.endswith(b'\r\n'): raise InvalidResponseError("ASCII响应帧格式无效:缺少结束符 | Invalid ASCII response frame format: missing end markers") # 提取十六进制数据部分 | Extract hex data part hex_data = response_line[1:-2].decode('ascii') if len(hex_data) % 2 != 0: raise InvalidResponseError("ASCII响应帧格式无效:十六进制数据长度不是偶数 | Invalid ASCII response frame format: hex data length is not even") # 将十六进制字符串转换为字节 | Convert hex string to bytes try: response_bytes = bytes.fromhex(hex_data) except ValueError as e: raise InvalidResponseError(f"ASCII响应帧格式无效:十六进制数据解析失败 | Invalid ASCII response frame format: hex data parsing failed: {e}") if len(response_bytes) < 3: # 至少需要:地址 + 功能码 + LRC | At least need: address + function code + LRC raise InvalidResponseError("ASCII响应帧太短 | ASCII response frame too short") # 分离数据和LRC | Separate data and LRC frame_data = response_bytes[:-1] received_lrc = response_bytes[-1] # 验证LRC | Validate LRC expected_lrc = self._calculate_lrc(frame_data) if received_lrc != expected_lrc: raise CRCError( f"LRC校验失败 | LRC validation failed: " f"expected {expected_lrc:02X}, got {received_lrc:02X}" ) # 验证从站地址 | Validate slave address received_slave_id = frame_data[0] if received_slave_id != expected_slave_id: raise InvalidResponseError( f"从站地址不匹配 | Slave address mismatch: expected {expected_slave_id}, got {received_slave_id}" ) # 提取PDU | Extract PDU response_pdu = frame_data[1:] if len(response_pdu) == 0: raise InvalidResponseError("响应PDU为空 | Response PDU is empty") received_function_code = response_pdu[0] # 检查是否为异常响应 | Check if it's an exception response if received_function_code & 0x80: if len(response_pdu) != 2: raise InvalidResponseError("异常响应格式无效 | Invalid exception response format") return response_pdu # 验证功能码 | Validate function code if received_function_code != function_code: raise InvalidResponseError( f"功能码不匹配 | Function code mismatch: expected {function_code}, got {received_function_code}" ) return response_pdu except asyncio.TimeoutError: raise TimeoutError(f"异步接收ASCII响应超时 | Async receive ASCII response timeout: {self.timeout}s") @staticmethod def _calculate_lrc(data: bytes) -> int: """ 计算LRC(纵向冗余校验) LRC = (-(所有字节的和)) & 0xFF Calculate LRC (Longitudinal Redundancy Check) LRC = (-(sum of all bytes)) & 0xFF Args: data: 要计算LRC的数据 | Data to calculate LRC for Returns: LRC校验码 | LRC checksum """ lrc = 0 for byte in data: lrc += byte return (-lrc) & 0xFF
[文档] def __repr__(self) -> str: """ 返回传输层的字符串表示 Return string representation of transport layer """ status = "已连接 | Connected" if asyncio.run(self.is_open()) else "未连接 | Disconnected" return ( f"AsyncAsciiTransport(port='{self.port}', baudrate={self.baudrate}, " f"timeout={self.timeout}, status='{status}')" )