"""
ModbusLink ASCII传输层实现
ModbusLink ASCII Transport Layer Implementation
"""
import serial
import asyncio
import threading
import serial_asyncio
from typing import Optional
from serial.rs485 import RS485Settings
from .base_transport import SyncBaseTransport, AsyncBaseTransport
from ..utils.lrc import LRCModbus
from ..common.logging import get_logger
from ..common.language import get_message
from ..common.exceptions import ConnectError, TimeOutError, InvalidReplyError, ModbusException
[文档]
class SyncAsciiTransport(SyncBaseTransport):
"""
同步ASCII传输层实现
Sync ASCII Transport Layer Implementation
"""
[文档]
def __init__(
self,
port: str,
baudrate: int = 9600,
bytesize: int = 7,
parity: str = "E",
stopbits: float = 1,
timeout: float = 1.0,
rs485_mode: Optional[RS485Settings] = None
) -> None:
"""
初始化同步ASCII传输层
Initialize Sync ASCII Transport Layer
Args:
port: 串口名称(如 'COM1', '/dev/ttyUSB0') | Serial port name (e.g. 'COM1', '/dev/ttyUSB0')
baudrate: 波特率(默认9600) | Baud rate (default 9600)
bytesize: 数据位(默认7) | Data bits (default 7)
parity: 校验位(默认偶校验) | Parity (default even parity)
stopbits: 停止位(默认1) | Stop bits (default 1)
timeout: 超时时间(默认1.0秒) | Timeout time (default 1.0 second)
rs485_mode: RS485模式(默认None) | RS485 mode (default None)
Raises:
ValueError: 当参数无效时 | When parameters are invalid
"""
if not port or not isinstance(port, str):
raise ValueError(get_message(
cn="串口名称不能为空且必须是字符串",
en="Port name cannot be empty and must be a string"
))
if not isinstance(baudrate, int) or baudrate <= 0:
raise ValueError(get_message(
cn="波特率必须是正整数",
en="Baud rate must be a positive integer"
))
if not isinstance(timeout, (int, float)) or timeout < 0.0:
raise ValueError(get_message(
cn="超时时间必须是正数",
en="Timeout time must be a positive number"
))
if not isinstance(rs485_mode, (RS485Settings, type(None))):
raise ValueError(get_message(
cn="RS485模式必须是RS485Settings类型",
en="RS485 mode must be RS485Settings type"
))
self.port = port
self.baudrate = baudrate
self.bytesize = bytesize
self.parity = parity
self.stopbits = stopbits
self.timeout = timeout
self.rs485_mode = rs485_mode
self._serial: Optional[serial.Serial] = None
self._lock = threading.Lock()
self._logger = get_logger("transport.sync_ascii")
[文档]
def open(self) -> None:
"""
打开同步ASCII传输层
Open Sync ASCII Transport Layer
"""
try:
self._serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=self.bytesize,
parity=self.parity,
stopbits=self.stopbits,
timeout=self.timeout
)
if not self.is_open:
raise ConnectError(
cn=f"无法建立ASCII连接 ({self.port}@{self.baudrate})",
en=f"Unable to established ASCII connection ({self.port}:{self.baudrate})"
)
self._logger.info(
cn=f"ASCII连接建立成功 ({self.port}@{self.baudrate})",
en=f"ASCII connection established successfully ({self.port}@{self.baudrate})"
)
if self.rs485_mode:
self._serial.rs485_mode = self.rs485_mode
self._logger.info(
cn=f"RTU RS485模式设置成功",
en=f"RTU RS485 mode set successfully"
)
except serial.SerialException as e:
raise ConnectError(
cn=f"ASCII连接建立失败: {e}",
en=f"ASCII connection established failed: {e}"
) from e
[文档]
def close(self) -> None:
"""
关闭同步ASCII传输层
Close Sync ASCII Transport Layer
"""
if self._serial and self.is_open:
try:
self._serial.close()
self._logger.info(
cn=f"ASCII连接关闭成功 ({self.port}@{self.baudrate})",
en=f"ASCII connection closed successfully ({self.port}@{self.baudrate})"
)
except serial.SerialException as e:
self._logger.debug(
cn=f"ASCII连接关闭失败 (可忽略): {e}",
en=f"ASCII connection closed failed (ignored): {e}"
)
finally:
self._serial = None
[文档]
def is_open(self) -> bool:
"""
检查同步ASCII传输层连接状态
Check Sync ASCII Transport Layer connection status
Returns:
如果传输层打开则返回True,否则返回False
True if the transport layer is open, otherwise False
"""
return self._serial is not None and self._serial.is_open
[文档]
def flush(self) -> int:
"""
同步清空接收缓冲区中的所有待处理数据
Sync Flush all pending data in receive buffer
Returns:
丢弃的字节数
Number of bytes discarded
"""
if not self.is_open():
return 0
try:
# 获取当前缓冲区中的字节数 | Get number of bytes currently in buffer
count = self._serial.in_waiting
if count > 0:
self._serial.reset_input_buffer()
self._logger.warning(
cn=f"ASCII缓冲区刷新: 丢弃了 {count} 字节",
en=f"ASCII buffer flush: discarded {count} bytes"
)
return count
except serial.SerialException:
return 0
[文档]
def send_and_receive(self, slave_id: int, pdu: bytes, timeout: Optional[float] = None) -> bytes:
"""
同步ASCII传输层PDU发送和接收数据
Sync ASCII Transport Layer PDU send and receive data
通信流程 | Communication Process:
1. 构建ASCII帧(: + 地址 + PDU + LRC + CR LF) | Build ASCII frame (: + address + PDU + LRC + CR LF)
2. 发送请求 | Send request
3. 接收响应 | Receive response
4. 验证ASCII帧 | Verify ASCII frame
5. 返回响应PDU | Return response PDU
Args:
slave_id: 从机地址/单元标识符 | Slave address/unit identifier
pdu: 协议数据单元(功能码 + 数据) | Protocol data unit (function code + data)
timeout: 超时时间(秒) | Timeout time (seconds)
Returns:
响应的PDU部分(功能码 + 数据)
Response PDU part (function code + data)
"""
with self._lock:
if not self.is_open():
raise ConnectError(
cn=f"ASCII连接未建立",
en=f"ASCII connection is not established"
)
# 1. 构建ASCII帧(: + 地址 + PDU + LRC + CR LF) | Build ASCII frame (: + address + PDU + LRC + CR LF)
request_frame = bytes([slave_id]) + pdu
crc = LRCModbus.calculate(request_frame)
request_ascii = b':' + (request_frame + crc).hex().upper().encode('ascii') + b'\r\n'
# 2. 发送请求 | Send request
self._logger.debug(
cn=f"ASCII发送数据: {(request_frame + crc).hex(' ').upper()}",
en=f"ASCII Send data: {(request_frame + crc).hex(' ').upper()}"
)
try:
# 清空接收缓冲区 | Clear the receive buffer
self.flush()
# 发送请求 | Send request
self._serial.write(request_ascii)
self._serial.flush()
# 3. 接收响应 | Receive response
hex_ascii = self._receive_response()
# self._logger.debug(
# cn=f"ASCII接收数据: {hex_ascii.decode('ascii', errors='ignore')}",
# en=f"ASCII Receive data: {hex_ascii.decode('ascii', errors='ignore')}"
# )
# 4. 验证ASCII帧 | Verify ASCII frame
# 验证ASCII帧格式 | Validate ASCII frame format
if not hex_ascii.startswith(b':'):
raise InvalidReplyError(
cn="ASCII帧格式错误:缺少起始符号",
en="ASCII frame format error: missing start colon"
)
if not hex_ascii.endswith(b'\r\n'):
raise InvalidReplyError(
cn="ASCII帧格式错误:缺少结束符",
en="ASCII frame format error: missing end symbol"
)
# 提取 (地址 + PDU + LRC) | Extract (address + PDU + LRC)
str_ascii = hex_ascii[1:-2].decode('ascii')
# 验证ASCII帧长度 | Validate ASCII frame length
if len(str_ascii) % 2 != 0:
raise InvalidReplyError(
cn="ASCII帧格式错误:无效的帧长度(应该是偶数)",
en="ASCII frame format error: invalid frame length (should be even)"
)
# 十六进制字符串转字节 | Hex string to byte
try:
hex_ascii = bytes.fromhex(str_ascii)
except ValueError:
raise InvalidReplyError(
cn="无效的十六进制数据",
en="Invalid hexadecimal data"
)
self._logger.debug(
cn=f"ASCII接收数据: {hex_ascii.hex(' ').upper()}",
en=f"ASCII Receive data: {hex_ascii.hex(' ').upper()}"
)
# 检验字节长度 | Validate byte length
if len(hex_ascii) < 3: # 至少包含地址+功能码+LRC | At least contain address+function code+LRC
raise InvalidReplyError(
cn="无效的帧长度(应该大于3)",
en="Invalid frame length (should be greater than 3)"
)
# 检验LRC | Validate LRC
if not LRCModbus.validate(hex_ascii):
raise InvalidReplyError(
cn="LRC校验失败",
en="LRC check failed"
)
# 验证从机地址 | Validate slave address
if hex_ascii[0] != slave_id:
raise InvalidReplyError(
cn=f"从机地址不匹配: 期望 {slave_id},实际 {hex_ascii[0]}",
en=f"Slave address does not match: expected {slave_id}, actual {hex_ascii[0]}"
)
# 验证异常响应 | Validate exception response
response_function_code = hex_ascii[1]
if response_function_code & 0x80: # 异常响应 | Exception response
exception_code = hex_ascii[2] if len(hex_ascii) > 2 else 0
raise ModbusException(exception_code, pdu[0])
# 5. 返回响应PDU | Return response PDU
return hex_ascii[1:-1] # 去掉地址和LRC | Remove address and LRC
except serial.SerialTimeoutException:
raise TimeOutError(
cn=f"ASCII通信超时: ({self.timeout}秒)",
en=f"ASCII communication timeout: ({self.timeout} seconds)"
)
except serial.SerialException as e:
raise ConnectError(
cn=f"ASCII通信错误: {e}",
en=f"ASCII communication error: {e}"
) from e
def _receive_response(self) -> bytes:
"""
接收完整的响应帧
Receive complete response frame
Returns:
响应的PDU部分
PDU part of response
Raises:
ConnectError: 连接错误 | Connection error
TimeOutError: 接收超时错误 | Receive timeout error
"""
if not self.is_open():
raise ConnectError(
cn=f"ASCII连接未建立",
en=f"ASCII connection is not established"
)
response = self._serial.read_until(b'\r\n')
if not response.endswith(b'\r\n'):
raise TimeOutError(
cn=f"ASCII接收数据超时 ({self.timeout}s)",
en=f"ASCII receive data timeout ({self.timeout}s)"
)
return response
[文档]
def __repr__(self) -> str:
"""
返回对象的字符串表示
Return object's string representation
Returns:
对象的字符串表示
Object's string representation
"""
return f"<SyncAsciiTransport port={self.port} baudrate={self.baudrate} timeout={self.timeout}>"
[文档]
class AsyncAsciiTransport(AsyncBaseTransport):
"""
异步ASCII传输层实现
Async ASCII Transport Layer Implementation
"""
[文档]
def __init__(
self,
port: str,
baudrate: int = 9600,
bytesize: int = 7,
parity: str = "E",
stopbits: float = 1,
timeout: float = 1.0,
rs485_mode: Optional[RS485Settings] = None
) -> None:
"""
初始化同步ASCII传输层
Initialize Sync ASCII Transport Layer
Args:
port: 串口名称(如 'COM1', '/dev/ttyUSB0') | Serial port name (e.g. 'COM1', '/dev/ttyUSB0')
baudrate: 波特率(默认9600) | Baud rate (default 9600)
bytesize: 数据位(默认7) | Data bits (default 7)
parity: 校验位(默认偶校验) | Parity (default even parity)
stopbits: 停止位(默认1) | Stop bits (default 1)
timeout: 超时时间(默认1.0秒) | Timeout time (default 1.0 second)
rs485_mode: RS485模式(默认None) | RS485 mode (default None)
Raises:
ValueError: 当参数无效时 | When parameters are invalid
"""
if not port or not isinstance(port, str):
raise ValueError(get_message(
cn="串口名称不能为空且必须是字符串",
en="Port name cannot be empty and must be a string"
))
if not isinstance(baudrate, int) or baudrate <= 0:
raise ValueError(get_message(
cn="波特率必须是正整数",
en="Baud rate must be a positive integer"
))
if not isinstance(timeout, (int, float)) or timeout < 0.0:
raise ValueError(get_message(
cn="超时时间必须是正数",
en="Timeout time must be a positive number"
))
if not isinstance(rs485_mode, (RS485Settings, type(None))):
raise ValueError(get_message(
cn="RS485模式必须是RS485Settings类型",
en="RS485 mode must be RS485Settings type"
))
self.port = port
self.baudrate = baudrate
self.bytesize = bytesize
self.parity = parity
self.stopbits = stopbits
self.timeout = timeout
self.rs485_mode = rs485_mode
self._reader: Optional[asyncio.StreamReader] = None
self._writer: Optional[asyncio.StreamWriter] = None
self._lock = asyncio.Lock()
self._logger = get_logger("transport.sync_ascii")
[文档]
async def open(self) -> None:
"""
打开异步ASCII传输层
Open Async ASCII Transport Layer
"""
try:
self._reader, self._writer = await serial_asyncio.open_serial_connection(
url=self.port,
baudrate=self.baudrate,
bytesize=self.bytesize,
parity=self.parity,
stopbits=self.stopbits,
)
if not self.is_open:
raise ConnectError(
cn=f"无法建立ASCII连接 ({self.port}@{self.baudrate})",
en=f"Unable to established ASCII connection ({self.port}:{self.baudrate})"
)
self._logger.info(
cn=f"ASCII连接建立成功 ({self.port}@{self.baudrate})",
en=f"ASCII connection established successfully ({self.port}@{self.baudrate})"
)
if self.rs485_mode:
transport = self._writer.transport
if hasattr(transport, 'serial'):
transport.serial.rs485_mode = self.rs485_mode
self._logger.info(
cn=f"RTU RS485模式设置成功",
en=f"RTU RS485 mode set successfully"
)
else:
self._logger.warning(
cn=f"RTU RS485模式设置失败(无法访问底层串口对象)",
en=f"RTU RS485 mode set failed (Unable to access the underlying serial port object)"
)
except serial.SerialException as e:
raise ConnectError(
cn=f"ASCII连接建立失败: {e}",
en=f"ASCII connection established failed: {e}"
) from e
[文档]
async def close(self) -> None:
"""
关闭异步ASCII传输层
Close Async ASCII Transport Layer
"""
if self._writer:
try:
self._writer.close()
await self._writer.wait_closed()
self._logger.info(
cn=f"ASCII连接关闭成功 ({self.port}@{self.baudrate})",
en=f"ASCII connection closed successfully ({self.port}@{self.baudrate})"
)
except serial.SerialException as e:
self._logger.debug(
cn=f"ASCII连接关闭失败 (可忽略): {e}",
en=f"ASCII connection closed failed (ignored): {e}"
)
finally:
self._reader = None
self._writer = None
[文档]
def is_open(self) -> bool:
"""
检查异步ASCII传输层连接状态
Check Async ASCII Transport Layer connection status
Returns:
如果传输层打开则返回True,否则返回False
True if the transport layer is open, otherwise False
"""
return self._reader is not None and self._writer is not None and not self._writer.is_closing()
[文档]
async def flush(self) -> int:
"""
异步清空接收缓冲区中的所有待处理数据
Async Flush all pending data in receive buffer
Returns:
丢弃的字节数
Number of bytes discarded
"""
if not self.is_open():
return 0
discarded_count = 0
try:
while True:
# 尝试以极短的超时读取数据,清除缓冲区
# Try reading with extremely short timeout to clear buffer
try:
data = await asyncio.wait_for(self._reader.read(4096), timeout=0.001)
if not data:
break
discarded_count += len(data)
except asyncio.TimeoutError:
break
except Exception:
pass
if discarded_count > 0:
self._logger.warning(
cn=f"ASCII缓冲区刷新: 丢弃了 {discarded_count} 字节",
en=f"ASCII buffer flush: discarded {discarded_count} bytes"
)
return discarded_count
[文档]
async def send_and_receive(self, slave_id: int, pdu: bytes, timeout: Optional[float] = None) -> bytes:
"""
异步ASCII传输层PDU发送和接收数据
Async ASCII Transport Layer PDU send and receive data
通信流程 | Communication Process:
1. 构建ASCII帧(: + 地址 + PDU + LRC + CR LF) | Build ASCII frame (: + address + PDU + LRC + CR LF)
2. 发送请求 | Send request
3. 接收响应 | Receive response
4. 验证ASCII帧 | Verify ASCII frame
5. 返回响应PDU | Return response PDU
Args:
slave_id: 从机地址/单元标识符 | Slave address/unit identifier
pdu: 协议数据单元(功能码 + 数据) | Protocol data unit (function code + data)
timeout: 超时时间(秒) | Timeout time (seconds)
Returns:
响应的PDU部分(功能码 + 数据)
Response PDU part (function code + data)
"""
async with self._lock:
if not self.is_open():
raise ConnectError(
cn=f"ASCII连接未建立",
en=f"ASCII connection is not established"
)
# 1. 构建ASCII帧(: + 地址 + PDU + LRC + CR LF) | Build ASCII frame (: + address + PDU + LRC + CR LF)
request_frame = bytes([slave_id]) + pdu
crc = LRCModbus.calculate(request_frame)
request_ascii = b':' + (request_frame + crc).hex().upper().encode('ascii') + b'\r\n'
# 2. 发送请求 | Send request
self._logger.debug(
cn=f"ASCII发送数据: {(request_frame + crc).hex(' ').upper()}",
en=f"ASCII Send data: {(request_frame + crc).hex(' ').upper()}"
)
try:
# 清空接收缓冲区 (使用封装好的 flush 方法)
# Clear receive buffer (use encapsulated flush method)
await self.flush()
self._writer.write(request_ascii)
await self._writer.drain()
# 3. 接收响应 | Receive response
hex_ascii = await self._receive_response()
# self._logger.debug(
# cn=f"ASCII接收数据: {hex_ascii.decode('ascii', errors='ignore')}",
# en=f"ASCII Receive data: {hex_ascii.decode('ascii', errors='ignore')}"
# )
# 4. 验证ASCII帧 | Verify ASCII frame
# 验证ASCII帧格式 | Validate ASCII frame format
if not hex_ascii.startswith(b':'):
raise InvalidReplyError(
cn="ASCII帧格式错误:缺少起始符号",
en="ASCII frame format error: missing start colon"
)
if not hex_ascii.endswith(b'\r\n'):
raise InvalidReplyError(
cn="ASCII帧格式错误:缺少结束符",
en="ASCII frame format error: missing end symbol"
)
# 提取 (地址 + PDU + LRC) | Extract (address + PDU + LRC)
str_ascii = hex_ascii[1:-2].decode('ascii')
# 验证ASCII帧长度 | Validate ASCII frame length
if len(str_ascii) % 2 != 0:
raise InvalidReplyError(
cn="ASCII帧格式错误:无效的帧长度(应该是偶数)",
en="ASCII frame format error: invalid frame length (should be even)"
)
# 十六进制字符串转字节 | Hex string to byte
try:
hex_ascii = bytes.fromhex(str_ascii)
except ValueError:
raise InvalidReplyError(
cn="无效的十六进制数据",
en="Invalid hexadecimal data"
)
self._logger.debug(
cn=f"ASCII接收数据: {hex_ascii.hex(' ').upper()}",
en=f"ASCII Receive data: {hex_ascii.hex(' ').upper()}"
)
# 检验字节长度 | Validate byte length
if len(hex_ascii) < 3: # 至少包含地址+功能码+LRC | At least contain address+function code+LRC
raise InvalidReplyError(
cn="无效的帧长度(应该大于3)",
en="Invalid frame length (should be greater than 3)"
)
# 检验LRC | Validate LRC
if not LRCModbus.validate(hex_ascii):
raise InvalidReplyError(
cn="LRC校验失败",
en="LRC check failed"
)
# 验证从机地址 | Validate slave address
if hex_ascii[0] != slave_id:
raise InvalidReplyError(
cn=f"从机地址不匹配: 期望 {slave_id},实际 {hex_ascii[0]}",
en=f"Slave address does not match: expected {slave_id}, actual {hex_ascii[0]}"
)
# 验证异常响应 | Validate exception response
response_function_code = hex_ascii[1]
if response_function_code & 0x80: # 异常响应 | Exception response
exception_code = hex_ascii[2] if len(hex_ascii) > 2 else 0
raise ModbusException(exception_code, pdu[0])
# 5. 返回响应PDU | Return response PDU
return hex_ascii[1:-1] # 去掉地址和LRC | Remove address and LRC
except serial.SerialTimeoutException:
raise TimeOutError(
cn=f"ASCII通信超时: ({self.timeout}秒)",
en=f"ASCII communication timeout: ({self.timeout} seconds)"
)
except serial.SerialException as e:
raise ConnectError(
cn=f"ASCII通信错误: {e}",
en=f"ASCII communication error: {e}"
) from e
async def _receive_response(self) -> bytes:
"""
接收完整的响应帧
Receive complete response frame
Returns:
响应的PDU部分
PDU part of response
Raises:
ConnectError: 连接错误 | Connection error
TimeOutError: 接收超时错误 | Receive timeout error
"""
if not self.is_open():
raise ConnectError(
cn=f"ASCII连接未建立",
en=f"ASCII connection is not established"
)
try:
response = await asyncio.wait_for(
self._reader.readuntil(b'\r\n'),
timeout=self.timeout
)
except asyncio.TimeoutError:
raise TimeOutError(
cn=f"ASCII接收数据超时 ({self.timeout}s)",
en=f"ASCII receive data timeout ({self.timeout}s)"
)
return response
[文档]
def __repr__(self) -> str:
"""
返回对象的字符串表示
Return object's string representation
Returns:
对象的字符串表示
Object's string representation
"""
return f"<AsyncAsciiTransport port={self.port} baudrate={self.baudrate} timeout={self.timeout}>"