架构设计
1. 概述
ModbusLink 采用**现代分层架构**设计,严格遵循**单一职责原则**和**开闭原则**,确保代码的可维护性、可扩展性和可测试性。
2. 整体架构图
┌─────────────────────────────────────────────────────────────┐
│ 应用层 (Application Layer) │
│ 用户代码和业务逻辑 │
└─────────────────────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────────────┐
│ 客户端层 (Client Layer) │
│ ModbusClient, AsyncModbusClient │
│ • 高级API封装 • 数据类型转换 • 错误处理 │
└─────────────────────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────────────┐
│ 协议层 (Protocol Layer) │
│ Modbus协议实现 │
│ • PDU构造 • 功能码处理 • 数据验证 │
└─────────────────────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────────────┐
│ 传输层 (Transport Layer) │
│ TCP, RTU, ASCII传输实现 │
│ • 连接管理 • 数据帧处理 • 校验计算 │
└─────────────────────────────────────────────────────────────┘
│
┌─────────────────────────────────────────────────────────────┐
│ 物理层 (Physical Layer) │
│ 以太网 / 串口 / 其他 │
└─────────────────────────────────────────────────────────────┘
3. 核心设计原则
分层架构 (Layered Architecture) - 每层只与相邻层交互 - 上层依赖于下层,下层不依赖上层 - 便于独立测试和替换
依赖注入 (Dependency Injection) - 客户端通过构造函数接收传输层实例 - 降低耦合度,提高可测试性
面向接口编程 (Interface-Oriented Programming) - 使用抽象基类定义接口 - 具体实现可插拔替换
策略模式 (Strategy Pattern) - 不同传输方式作为不同策略 - 运行时切换传输策略
5. 传输层设计
5.1 传输层架构
SyncBaseTransport (ABC)
├── SyncTcpTransport
├── SyncRtuTransport
└── SyncAsciiTransport
AsyncBaseTransport (ABC)
├── AsyncTcpTransport
├── AsyncRtuTransport
└── AsyncAsciiTransport
5.2 核心接口
同步传输接口
class SyncBaseTransport(ABC):
@abstractmethod
def open(self) -> None:
"""打开传输连接"""
@abstractmethod
def close(self) -> None:
"""关闭传输连接"""
@abstractmethod
def is_open(self) -> bool:
"""检查连接状态"""
@abstractmethod
def flush(self) -> int:
"""同步清空接收缓冲区中的所有待处理数据"""
@abstractmethod
def send_and_receive(self, slave_id: int, pdu: bytes, timeout: Optional[float] = None) -> bytes:
"""发送PDU并接收响应"""
异步传输接口
class AsyncBaseTransport(ABC):
@abstractmethod
async def open(self) -> None:
"""异步打开传输连接"""
@abstractmethod
async def close(self) -> None:
"""异步关闭传输连接"""
@abstractmethod
def is_open(self) -> bool:
"""检查连接状态"""
@abstractmethod
async def flush(self) -> int:
"""异步清空接收缓冲区中的所有待处理数据"""
@abstractmethod
async def send_and_receive(self, slave_id: int, pdu: bytes, timeout: Optional[float] = None) -> bytes:
"""异步发送PDU并接收响应"""
5.3 传输实现细节
TCP传输 (Modbus TCP)
MBAP Header (7 bytes) + PDU
┌──────┬──────┬──────┬──────┬──────────┬─────────────┐
│ TID │ PID │ Length │ Unit ID │ Function │
│ (2) │ (2) │ (2) │ (1) │ Code + Data │
└──────┴──────┴──────┴──────┴──────────┴─────────────┘
TID: 事务标识符,用于匹配请求响应
PID: 协议标识符,Modbus为0
Length: 后续字节长度
Unit ID: 单元标识符(从站地址)
RTU传输 (Modbus RTU)
RTU Frame
┌─────────┬─────────────┬─────────────┐
│ Address │ Function │ CRC-16 │
│ (1) │ Code + Data │ (2) │
└─────────┴─────────────┴─────────────┘
Address: 从站地址 (1-247)
CRC-16: 循环冗余校验,使用Modbus多项式
帧间隔: 至少3.5个字符时间
ASCII传输 (Modbus ASCII)
ASCII Frame
┌───┬─────────┬─────────────┬─────┬─────┬───┐
│ : │ Address │ Function │ LRC │ CR │ LF│
│ │ (2) │ Code + Data │ (2) │ │ │
└───┴─────────┴─────────────┴─────┴─────┴───┘
Start: 冒号字符 ':'
LRC: 纵向冗余校验
End: 回车换行 (CR LF)
6. 客户端层设计
6.1 客户端架构
┌─────────────────────────────────────┐
│ 高级数据类型API │
│ read_float32, write_string, etc. │
└─────────────────────────────────────┘
│
┌─────────────────────────────────────┐
│ 标准Modbus API │
│ read_holding_registers, etc. │
└─────────────────────────────────────┘
│
┌─────────────────────────────────────┐
│ 协议处理层 │
│ PDU构造, 响应解析, 异常处理 │
└─────────────────────────────────────┘
│
┌─────────────────────────────────────┐
│ 传输层接口 │
│SyncBaseTransport/AsyncBaseTransport │
└─────────────────────────────────────┘
6.2 设计模式应用
模板方法模式
class SyncModbusClient:
def _execute_request(self, slave_id: int, function_code: int,
data: bytes) -> bytes:
"""模板方法:定义请求执行流程"""
# 1. 构造PDU
pdu = self._build_pdu(function_code, data)
# 2. 发送并接收
response = self._transport.send_and_receive(pdu)
# 3. 验证响应
self._validate_response(response, function_code)
# 4. 解析数据
return self._parse_response(response)
装饰器模式
def connection_required(func):
"""确保连接存在的装饰器"""
def wrapper(self, *args, **kwargs):
if not self._transport.is_connected():
raise ConnectionError("Not connected")
return func(self, *args, **kwargs)
return wrapper
7. 服务器层设计
7.1 服务器架构
┌─────────────────────────────────────┐
│ 协议处理器 │
│ 请求解析、响应构造、异常处理 │
└─────────────────────────────────────┘
│
┌─────────────────────────────────────┐
│ 数据存储层 │
│ 线圈、寄存器的读写操作 │
└─────────────────────────────────────┘
│
┌─────────────────────────────────────┐
│ 传输服务器 │
│ TCP/RTU/ASCII服务器实现 │
└─────────────────────────────────────┘
7.2 数据存储设计
class ModbusDataStore:
"""线程安全的Modbus数据存储"""
def __init__(self, coils_size: int = 65536,
discrete_inputs_size: int = 65536,
holding_registers_size: int = 65536,
input_registers_size: int = 65536):
self._coils = [False] * coils_size
self._discrete_inputs = [False] * discrete_inputs_size
self._holding_registers = [0] * holding_registers_size
self._input_registers = [0] * input_registers_size
self._lock = threading.RLock() # 可重入锁
8. 工具层设计
8.1 工具组件
CRC计算器 - 实现ModbusRTU标准CRC-16算法 - 支持表查找优化
LRC计算器 - 实现ModbusASCII标准LRC算法 - 支持验证数据
数据编码器 - 大小端字节序转换 - 各种数据类型编解码
9. 错误处理架构
9.1 异常层次结构
ModbusLinkError (基础异常基类)
├── CommunicationError (通信错误基类)
│ ├── ConnectError (连接错误异常)
│ └── TimeOutError (超时错误异常)
├── ValidationError (数据校验错误基类)
│ ├── CrcError (CRC校验错误异常)
│ ├── LrcError (LRC校验错误异常)
│ └── InvalidReplyError (无效响应错误异常)
└── ModbusException (协议错误)
9.2 异常处理策略
传输层异常: 网络、串口通信错误
协议层异常: Modbus协议格式错误
应用层异常: 业务逻辑错误
10. 性能优化设计
10.1 连接池
class ModbusConnectionPool:
"""Modbus连接池,支持连接复用"""
def __init__(self, max_connections: int = 10):
self._pool = asyncio.Queue(maxsize=max_connections)
self._connections = set()
async def acquire(self) -> AsyncModbusClient:
"""获取连接"""
async def release(self, client: AsyncModbusClient):
"""释放连接"""
10.2 批量操作优化
async def batch_read_registers(self, requests: List[ReadRequest]) -> List[List[int]]:
"""批量读取寄存器,自动优化为最少请求数"""
# 合并连续地址的请求
optimized_requests = self._optimize_requests(requests)
# 并发执行
tasks = [self._read_registers(**req) for req in optimized_requests]
return await asyncio.gather(*tasks)
11. 扩展性设计
11.1 插件架构
class ModbusPlugin(ABC):
"""Modbus插件基类"""
@abstractmethod
def on_request(self, request: SyncModbusRequest) -> SyncModbusRequest:
"""请求预处理"""
@abstractmethod
def on_response(self, response: SyncModbusResponse) -> SyncModbusResponse:
"""响应后处理"""
11.2 自定义传输层
class WebSocketTransport(AsyncBaseTransport):
"""WebSocket传输层示例"""
async def connect(self):
self._websocket = await websockets.connect(self._uri)
async def send_and_receive(self, data: bytes) -> bytes:
await self._websocket.send(data)
response = await self._websocket.recv()
return response
12. 总结
ModbusLink的架构设计具有以下优势:
清晰的分层结构,每层职责明确
高度的可扩展性,支持自定义传输层和协议扩展
优秀的性能表现,支持异步和连接池
完善的错误处理,提供详细的异常信息
良好的可测试性,每层都可以独立测试
这种设计确保了ModbusLink既能满足当前需求,又具备未来扩展的灵活性。