"""ModbusLink ASCII传输层实现
实现基于串口的Modbus ASCII协议传输,包括LRC校验。
ASCII Transport Layer Implementation
Implements Modbus ASCII protocol transport based on serial port, including LRC validation.
"""
import time
from typing import Optional
import serial
from .base import BaseTransport
from ..common.exceptions import (
ConnectionError,
TimeoutError,
CRCError,
InvalidResponseError,
)
from ..utils.logging import get_logger
[docs]
class AsciiTransport(BaseTransport):
"""
Modbus ASCII传输层实现
处理基于串口的Modbus ASCII通信,包括:
Modbus ASCII Transport Layer Implementation
Handles Modbus ASCII communication based on serial port, including:
- 串口连接管理 | Serial port connection management
- LRC校验码的计算和验证 | LRC checksum calculation and validation
- ASCII编码和解码 | ASCII encoding and decoding
- ADU(应用数据单元)的构建和解析 | ADU (Application Data Unit) construction and parsing
- 错误处理和超时管理 | Error handling and timeout management
"""
[docs]
def __init__(
self,
port: str,
baudrate: int = 9600,
bytesize: int = 7,
parity: str = "E",
stopbits: float = 1,
timeout: float = 1.0,
):
"""
初始化ASCII传输层
Initialize ASCII transport layer
Args:
port: 串口名称 (如 'COM1', '/dev/ttyUSB0') | Serial port name (e.g. 'COM1', '/dev/ttyUSB0')
baudrate: 波特率,默认9600 | Baud rate, default 9600
bytesize: 数据位,默认7位 | Data bits, default 7 bits
parity: 校验位,默认偶校验 | Parity, default even parity
stopbits: 停止位,默认1位 | Stop bits, default 1 bit
timeout: 超时时间(秒),默认1.0秒 | Timeout in seconds, default 1.0 seconds
Raises:
ValueError: 当参数无效时 | When parameters are invalid
TypeError: 当参数类型错误时 | When parameter types are incorrect
"""
if not port or not isinstance(port, str):
raise ValueError(
"串口名称不能为空且必须是字符串 | Port name cannot be empty and must be a string"
)
if not isinstance(baudrate, int) or baudrate <= 0:
raise ValueError("波特率必须是正整数 | Baudrate must be a positive integer")
if not isinstance(timeout, (int, float)) or timeout <= 0:
raise ValueError("超时时间必须是正数 | Timeout must be a positive number")
self.port = port
self.baudrate = baudrate
self.bytesize = bytesize
self.parity = parity
self.stopbits = stopbits
self.timeout = timeout
self._serial: Optional[serial.Serial] = None
self._logger = get_logger("transport.ascii")
[docs]
def open(self) -> None:
"""
打开串口连接
Open serial port connection
"""
try:
self._serial = serial.Serial(
port=self.port,
baudrate=self.baudrate,
bytesize=self.bytesize,
parity=self.parity,
stopbits=self.stopbits,
timeout=self.timeout,
)
self._logger.info(
f"ASCII连接已建立 | ASCII connection established: {self.port} @ {self.baudrate}bps"
)
except Exception as e:
raise ConnectionError(f"串口连接失败 | Serial port connection failed: {e}")
[docs]
def close(self) -> None:
"""
关闭串口连接
Close serial port connection
"""
if self._serial and self._serial.is_open:
try:
self._serial.close()
self._logger.info(f"ASCII连接已关闭 | ASCII connection closed: {self.port}")
except Exception as e:
self._logger.debug(
f"关闭串口连接时出现错误(可忽略)| Error during serial connection close (ignorable): {e}"
)
finally:
self._serial = None
[docs]
def is_open(self) -> bool:
"""
检查串口连接状态
Check serial port connection status
"""
return self._serial is not None and self._serial.is_open
[docs]
def send_and_receive(self, slave_id: int, pdu: bytes) -> bytes:
"""
发送PDU并接收响应
实现ASCII协议的完整通信流程:
Send PDU and receive response
Implements complete ASCII protocol communication flow:
1. 构建ASCII帧(:地址PDU LRC CR LF) | Build ASCII frame (:Address PDU LRC CR LF)
2. 发送请求 | Send request
3. 接收响应 | Receive response
4. 验证LRC | Validate LRC
5. 返回响应PDU | Return response PDU
"""
if not self.is_open():
raise ConnectionError("串口连接未建立 | Serial port connection not established")
# 1. 构建请求帧 | Build request frame
frame_data = bytes([slave_id]) + pdu
lrc = self._calculate_lrc(frame_data)
# ASCII编码:冒号 + 数据的十六进制表示 + LRC + CRLF | ASCII encoding: colon + hex data + LRC + CRLF
ascii_data = frame_data + bytes([lrc])
ascii_frame = b':' + ascii_data.hex().upper().encode('ascii') + b'\r\n'
self._logger.debug(
f"发送ASCII请求 | Sending ASCII request: {ascii_frame.decode('ascii', errors='ignore')}"
)
try:
# 2. 清空接收缓冲区并发送请求 | Clear receive buffer and send request
if self._serial.in_waiting > 0:
self._serial.read(self._serial.in_waiting)
self._serial.write(ascii_frame)
self._serial.flush()
# 3. 接收响应 | Receive response
function_code = pdu[0] if pdu else 0
response_pdu = self._receive_response(slave_id, function_code)
self._logger.debug(
f"接收到ASCII响应PDU | Received ASCII response PDU: {response_pdu.hex()}"
)
return response_pdu
except serial.SerialTimeoutException:
raise TimeoutError(
f"ASCII通信超时 | ASCII communication timeout: {self.timeout}s"
)
except Exception as e:
if isinstance(e, (ConnectionError, TimeoutError, CRCError, InvalidResponseError)):
raise
raise ConnectionError(f"ASCII通信错误 | ASCII communication error: {e}")
def _receive_response(self, expected_slave_id: int, function_code: int) -> bytes:
"""
接收并验证响应帧
Receive and validate response frame
Args:
expected_slave_id: 期望的从站地址 | Expected slave address
function_code: 功能码 | Function code
Returns:
响应的PDU部分 | PDU part of response
Raises:
TimeoutError: 接收超时 | Receive timeout
CRCError: LRC校验失败 | LRC validation failed
InvalidResponseError: 响应格式无效 | Invalid response format
"""
try:
# 接收完整的ASCII帧直到CRLF | Receive complete ASCII frame until CRLF
response_line = b''
start_time = time.time()
while True:
if time.time() - start_time > self.timeout:
raise TimeoutError(f"接收ASCII响应超时 | Receive ASCII response timeout: {self.timeout}s")
if self._serial.in_waiting > 0:
char = self._serial.read(1)
if not char:
continue
response_line += char
# 检查是否接收到完整帧 | Check if complete frame received
if response_line.endswith(b'\r\n'):
break
else:
time.sleep(0.001) # 短暂等待避免CPU占用过高 | Short wait to avoid high CPU usage
# 验证帧格式 | Validate frame format
if not response_line.startswith(b':'):
raise InvalidResponseError("ASCII响应帧格式无效:缺少起始冒号 | Invalid ASCII response frame format: missing start colon")
if not response_line.endswith(b'\r\n'):
raise InvalidResponseError("ASCII响应帧格式无效:缺少结束符 | Invalid ASCII response frame format: missing end markers")
# 提取十六进制数据部分 | Extract hex data part
hex_data = response_line[1:-2].decode('ascii')
if len(hex_data) % 2 != 0:
raise InvalidResponseError("ASCII响应帧格式无效:十六进制数据长度不是偶数 | Invalid ASCII response frame format: hex data length is not even")
# 将十六进制字符串转换为字节 | Convert hex string to bytes
try:
response_bytes = bytes.fromhex(hex_data)
except ValueError as e:
raise InvalidResponseError(f"ASCII响应帧格式无效:十六进制数据解析失败 | Invalid ASCII response frame format: hex data parsing failed: {e}")
if len(response_bytes) < 3: # 至少需要:地址 + 功能码 + LRC | At least need: address + function code + LRC
raise InvalidResponseError("ASCII响应帧太短 | ASCII response frame too short")
# 分离数据和LRC | Separate data and LRC
frame_data = response_bytes[:-1]
received_lrc = response_bytes[-1]
# 验证LRC | Validate LRC
expected_lrc = self._calculate_lrc(frame_data)
if received_lrc != expected_lrc:
raise CRCError(
f"LRC校验失败 | LRC validation failed: "
f"expected {expected_lrc:02X}, got {received_lrc:02X}"
)
# 验证从站地址 | Validate slave address
received_slave_id = frame_data[0]
if received_slave_id != expected_slave_id:
raise InvalidResponseError(
f"从站地址不匹配 | Slave address mismatch: expected {expected_slave_id}, got {received_slave_id}"
)
# 提取PDU | Extract PDU
response_pdu = frame_data[1:]
if len(response_pdu) == 0:
raise InvalidResponseError("响应PDU为空 | Response PDU is empty")
received_function_code = response_pdu[0]
# 检查是否为异常响应 | Check if it's an exception response
if received_function_code & 0x80:
if len(response_pdu) != 2:
raise InvalidResponseError("异常响应格式无效 | Invalid exception response format")
return response_pdu
# 验证功能码 | Validate function code
if received_function_code != function_code:
raise InvalidResponseError(
f"功能码不匹配 | Function code mismatch: expected {function_code}, got {received_function_code}"
)
return response_pdu
except serial.SerialTimeoutException:
raise TimeoutError(f"接收ASCII响应超时 | Receive ASCII response timeout: {self.timeout}s")
@staticmethod
def _calculate_lrc(data: bytes) -> int:
"""
计算LRC(纵向冗余校验)
LRC = (-(所有字节的和)) & 0xFF
Calculate LRC (Longitudinal Redundancy Check)
LRC = (-(sum of all bytes)) & 0xFF
Args:
data: 要计算LRC的数据 | Data to calculate LRC for
Returns:
LRC校验码 | LRC checksum
"""
lrc = 0
for byte in data:
lrc += byte
return (-lrc) & 0xFF
[docs]
def __repr__(self) -> str:
"""
返回传输层的字符串表示
Return string representation of transport layer
"""
status = "已连接 | Connected" if self.is_open() else "未连接 | Disconnected"
return (
f"AsciiTransport(port='{self.port}', baudrate={self.baudrate}, "
f"timeout={self.timeout}, status='{status}')"
)