"""
ModbusLink 异步ASCII服务器实现
ModbusLink Async ASCII Server Implementation
"""
import asyncio
import struct
import binascii
from typing import Optional
from .serial_server import AsyncSerialModbusServer
from .data_store import ModbusDataStore
from ..utils.lrc import LRCModbus
[docs]
class AsyncAsciiModbusServer(AsyncSerialModbusServer):
"""
异步ASCII Modbus服务器
实现基于串口的异步Modbus ASCII服务器。
使用LRC校验和ASCII编码。
Async ASCII Modbus Server
Implements serial port-based async Modbus ASCII server.
Uses LRC checksum and ASCII encoding.
"""
[docs]
def __init__(
self,
port: str,
baudrate: int = 9600,
bytesize: int = 7,
parity: str = 'E',
stopbits: int = 1,
data_store: Optional[ModbusDataStore] = None,
slave_id: int = 1
):
"""
初始化异步ASCII Modbus服务器 | Initialize Async ASCII Modbus Server
Args:
port: 串口名称 | Serial port name
baudrate: 波特率 | Baud rate
bytesize: 数据位 | Data bits
parity: 校验位 ('N', 'E', 'O') | Parity ('N', 'E', 'O')
stopbits: 停止位 | Stop bits
data_store: 数据存储实例 | Data store instance
slave_id: 从站地址 | Slave address
"""
super().__init__(
port=port,
baudrate=baudrate,
bytesize=bytesize,
parity=parity,
stopbits=stopbits,
data_store=data_store,
slave_id=slave_id,
log_name="server.ascii",
protocol_name="ASCII"
)
self._logger.info(
cn=f"ASCII服务器初始化: {port}@{baudrate}",
en=f"ASCII server initialized: {port}@{baudrate}"
)
async def _server_loop(self) -> None:
"""
服务器主循环
Server Main Loop
"""
self._logger.info(
cn="ASCII服务器主循环启动",
en="ASCII server main loop started"
)
try:
while self._running and self._reader:
try:
# Modbus ASCII 使用 CRLF (\r\n) 作为结束符
# Modbus ASCII uses CRLF (\r\n) as terminator
line = await self._reader.readuntil(b'\r\n')
# 移除首尾空白字符 (CRLF)
line = line.strip()
if not line:
# 如果读取到空字节,可能是端口关闭或暂时无数据
await asyncio.sleep(0.01)
continue
# ASCII帧必须以 ':' 开头
# ASCII frame must start with ':'
if not line.startswith(b':'):
# 尝试寻找中间的 ':' (处理垃圾数据)
idx = line.find(b':')
if idx != -1:
line = line[idx:]
else:
continue # 无效帧,丢弃
# 处理这一行数据
await self._process_frame(line)
except asyncio.IncompleteReadError:
# 流被关闭或部分读取
if self._running:
await asyncio.sleep(0.1)
continue
except asyncio.LimitOverrunError:
# 缓冲区溢出(非常长地行),可能是噪声
# 读取所有数据清空缓冲区
try:
await self._reader.read(1024)
except Exception:
pass
except asyncio.CancelledError:
raise
except Exception as e:
self._logger.error(
cn=f"服务器循环异常: {e}",
en=f"Server loop exception: {e}"
)
await asyncio.sleep(0.1) # 短暂延迟后继续 | Brief delay before continuing
except asyncio.CancelledError:
self._logger.info(
cn="ASCII服务器主循环被取消",
en="ASCII server main loop cancelled"
)
except Exception as e:
self._logger.error(
cn=f"ASCII服务器主循环异常: {e}",
en=f"ASCII server main loop exception: {e}"
)
finally:
self._logger.info(
cn="ASCII服务器主循环结束d",
en="ASCII server main loop ended"
)
async def _process_frame(self, ascii_frame: bytes) -> None:
"""
处理接收到的ASCII帧
Process Received RTU Frame
Args:
ascii_frame: 接收到的ASCII帧 | Received RTU frame
"""
try:
# 移除起始符 ':' | Remove start delimiter ':'
hex_payload = ascii_frame[1:]
# 长度检查:长度必须是偶数(每字节两字符)
if len(hex_payload) % 2 != 0:
self._logger.warning(
cn="ASCII帧长度无效(非偶数)",
en="Invalid ASCII frame length (odd)"
)
return
# 将Hex字符串转换为二进制数据 | Convert Hex string to binary data
try:
binary_frame = binascii.unhexlify(hex_payload)
except binascii.Error:
self._logger.warning(
cn="ASCII帧包含非十六进制字符",
en="ASCII frame contains non-hex characters"
)
return
if len(binary_frame) < 2: # 至少包含地址和LRC
self._logger.debug(
cn=f"ASCII帧长度不足: {len(binary_frame)}",
en=f"ASCII frame length insufficient: {len(binary_frame)}"
)
return
# 提取地址、PDU和LRC | Extract address, PDU and LRC
slave_id = binary_frame[0]
pdu = binary_frame[1:-1]
received_lrc = binary_frame[-1:]
# 验证LRC | Validate LRC
calculated_lrc = LRCModbus.calculate(binary_frame[:-1])
if received_lrc != calculated_lrc:
self._logger.warning(
cn=f"LRC校验失败: 接收 0x{received_lrc.hex().upper()}, 计算 0x{calculated_lrc.hex().upper()}",
en=f"LRC verification failed: Received 0x{received_lrc.hex().upper()}, Calculated 0x{calculated_lrc.hex().upper()}"
)
return
self._logger.debug(
cn=f"接收到ASCII帧: 从站 {slave_id}, PDU长度: {len(pdu)}",
en=f"Received ASCII frame: Slave {slave_id}, PDU Length: {len(pdu)}"
)
# 处理请求 | Process request
response_pdu = self.process_request(slave_id, pdu)
if response_pdu: # 只有非广播请求才响应 | Only respond to non-broadcast requests
# 构建响应帧 | Build response frame
response_frame = struct.pack("B", slave_id) + response_pdu
# 添加LRC | Add LRC
response_frame += LRCModbus.calculate(response_frame)
# 编码为ASCII Hex | Encoded as ASCII Hex
response_ascii = b':' + binascii.hexlify(response_frame).upper() + b'\r\n'
if self._writer:
self._writer.write(response_ascii)
await self._writer.drain()
self._logger.debug(
cn=f"发送ASCII响应: 从站 {slave_id}, 帧长度 {len(response_ascii)}",
en=f"Sent ASCII response: Slave {slave_id}, Frame Length {len(response_ascii)}"
)
except Exception as e:
self._logger.error(
cn=f"处理ASCII帧时出错: {e}",
en=f"Error processing ASCII frame: {e}"
)