"""
ModbusLink 异步客户端实现
提供用户友好的异步Modbus客户端API。
ModbusLink Asynchronous Client Implementation
Provides user-friendly asynchronous Modbus client API.
"""
import struct
from typing import List, Optional, Callable, Any
from typing_extensions import Self
import asyncio
from ..transport.async_base import AsyncBaseTransport
from ..common.exceptions import InvalidResponseError
from ..utils.coder import PayloadCoder
from ..utils.logging import get_logger
[文档]
class AsyncModbusClient:
"""
异步Modbus客户端
提供简洁、用户友好的异步Modbus操作接口。通过依赖注入的方式接收异步传输层实例,支持异步TCP等传输方式。
所有方法都使用Python原生数据类型(int, list等),将底层的字节操作完全封装,并支持回调机制。
Asynchronous Modbus Client
Provides a concise, user-friendly asynchronous Modbus operation interface. Receives
async transport layer instances through dependency injection, supporting async
transport methods such as async TCP.
All methods use Python native data types (int, list, etc.),
completely encapsulating underlying byte operations, and support callback mechanisms.
"""
[文档]
def __init__(self, transport: AsyncBaseTransport):
"""
初始化异步Modbus客户端
Initialize Async Modbus Client
Args:
transport: 异步传输层实例(AsyncTcpTransport等) | Async transport layer instance (AsyncTcpTransport, etc.)
"""
self.transport = transport
self._logger = get_logger("client.async")
[文档]
async def read_coils(
self,
slave_id: int,
start_address: int,
quantity: int,
callback: Optional[Callable[[List[bool]], None]] = None,
) -> List[bool]:
"""
异步读取线圈状态(功能码0x01) | Async Read Coil Status (Function Code 0x01)
Args:
slave_id: 从站地址 | Slave address
start_address: 起始地址 | Starting address
quantity: 读取数量(1-2000) | Quantity to read (1-2000)
callback: 可选的回调函数,在收到响应后调用 | Optional callback function, called after receiving response
Returns:
线圈状态列表,True表示ON,False表示OFF
List of coil status, True for ON, False for OFF
Example:
>>> coils = await client.read_coils(1, 0, 8)
[True, False, True, False, False, False, True, False]
"""
if not (1 <= quantity <= 2000):
raise ValueError(
"线圈数量必须在1-2000之间 | Coil quantity must be between 1-2000"
)
# 构建PDU:功能码 + 起始地址 + 数量 | Build PDU: function code + starting address + quantity
pdu = struct.pack(">BHH", 0x01, start_address, quantity)
# 异步发送请求并接收响应 | Async send request and receive response
response_pdu = await self.transport.send_and_receive(slave_id, pdu)
# 解析响应:功能码 + 字节数 + 数据 | Parse response: function code + byte count + data
if len(response_pdu) < 2:
raise InvalidResponseError(
"响应PDU长度不足 | Response PDU length insufficient"
)
function_code = response_pdu[0]
byte_count = response_pdu[1]
if function_code != 0x01:
raise InvalidResponseError(
f"功能码不匹配: 期望 0x01, 收到 0x{function_code:02X} | Function code mismatch: expected 0x01, received 0x{function_code:02X}"
)
if len(response_pdu) != 2 + byte_count:
raise InvalidResponseError(
"响应数据长度不匹配 | Response data length mismatch"
)
# 解析线圈数据 | Parse coil data
coil_data = response_pdu[2:]
coils: list[bool] = []
for byte_idx, byte_val in enumerate(coil_data):
for bit_idx in range(8):
if (
len(coils) >= quantity
): # 只返回请求的数量 | Only return requested quantity
break
coils.append(bool(byte_val & (1 << bit_idx)))
result = coils[:quantity]
# 如果提供了回调函数,在后台任务中调用 | If callback is provided, call it in background task
if callback:
asyncio.create_task(self._call_callback(callback, result))
return result
[文档]
async def read_holding_registers(
self,
slave_id: int,
start_address: int,
quantity: int,
callback: Optional[Callable[[List[int]], None]] = None,
) -> List[int]:
"""
异步读取保持寄存器(功能码0x03) | Async Read Holding Registers (Function Code 0x03)
Args:
slave_id: 从站地址 | Slave address
start_address: 起始地址 | Starting address
quantity: 读取数量(1-125) | Quantity to read (1-125)
callback: 可选的回调函数,在收到响应后调用 | Optional callback function, called after receiving response
Returns:
寄存器值列表,每个值为16位无符号整数(0-65535)
List of register values, each value is a 16-bit unsigned integer (0-65535)
Example:
>>> registers = await client.read_holding_registers(1, 0, 4)
[1234, 5678, 9012, 3456]
"""
if not (1 <= quantity <= 125):
raise ValueError(
"寄存器数量必须在1-125之间 | Register quantity must be between 1-125"
)
# 构建PDU:功能码 + 起始地址 + 数量 | Build PDU: function code + starting address + quantity
pdu = struct.pack(">BHH", 0x03, start_address, quantity)
# 异步发送请求并接收响应 | Async send request and receive response
response_pdu = await self.transport.send_and_receive(slave_id, pdu)
# 解析响应:功能码 + 字节数 + 数据 | Parse response: function code + byte count + data
if len(response_pdu) < 2:
raise InvalidResponseError(
"响应PDU长度不足 | Response PDU length insufficient"
)
function_code = response_pdu[0]
byte_count = response_pdu[1]
if function_code != 0x03:
raise InvalidResponseError(
f"功能码不匹配: 期望 0x03, 收到 0x{function_code:02X} | Function code mismatch: expected 0x03, received 0x{function_code:02X}"
)
expected_byte_count = quantity * 2
if byte_count != expected_byte_count:
raise InvalidResponseError(
f"字节数不匹配: 期望 {expected_byte_count}, 收到 {byte_count} | Byte count mismatch: expected {expected_byte_count}, received {byte_count}"
)
if len(response_pdu) != 2 + byte_count:
raise InvalidResponseError(
"响应数据长度不匹配 | Response data length mismatch"
)
# 解析寄存器数据 | Parse register data
register_data = response_pdu[2:]
registers = []
for i in range(0, len(register_data), 2):
register_value = struct.unpack(">H", register_data[i : i + 2])[0]
registers.append(register_value)
# 如果提供了回调函数,在后台任务中调用 | If callback is provided, call it in background task
if callback:
asyncio.create_task(self._call_callback(callback, registers))
return registers
[文档]
async def write_single_coil(
self,
slave_id: int,
address: int,
value: bool,
callback: Optional[Callable[[], None]] = None,
) -> None:
"""
异步写单个线圈(功能码0x05) | Async Write Single Coil (Function Code 0x05)
Args:
slave_id: 从站地址 | Slave address
address: 线圈地址 | Coil address
value: 线圈值,True表示ON,False表示OFF | Coil value, True for ON, False for OFF
callback: 可选的回调函数,在操作完成后调用 | Optional callback function, called after operation completes
Example:
>>> await client.write_single_coil(1, 0, True) # 设置线圈0为ON | Set coil 0 to ON
"""
# 构建PDU:功能码 + 地址 + 值 | Build PDU: function code + address + value
coil_value = 0xFF00 if value else 0x0000
pdu = struct.pack(">BHH", 0x05, address, coil_value)
# 异步发送请求并接收响应 | Async send request and receive response
response_pdu = await self.transport.send_and_receive(slave_id, pdu)
# 验证响应(应该与请求相同) | Verify response (should be same as request)
if response_pdu != pdu:
raise InvalidResponseError(
"写单个线圈响应不匹配 | Write single coil response mismatch"
)
# 如果提供了回调函数,在后台任务中调用 | If callback is provided, call it in background task
if callback:
asyncio.create_task(self._call_callback(callback, None))
[文档]
async def write_single_register(
self,
slave_id: int,
address: int,
value: int,
callback: Optional[Callable[[], None]] = None,
) -> None:
"""
异步写单个寄存器(功能码0x06) | Async Write Single Register (Function Code 0x06)
Args:
slave_id: 从站地址 | Slave address
address: 寄存器地址 | Register address
value: 寄存器值(0-65535) | Register value (0-65535)
callback: 可选的回调函数,在操作完成后调用 | Optional callback function, called after operation completes
Example:
>>> await client.write_single_register(1, 0, 1234)
"""
if not (0 <= value <= 65535):
raise ValueError(
"寄存器值必须在0-65535之间 | Register value must be between 0-65535"
)
# 构建PDU:功能码 + 地址 + 值 | Build PDU: function code + address + value
pdu = struct.pack(">BHH", 0x06, address, value)
# 异步发送请求并接收响应 | Async send request and receive response
response_pdu = await self.transport.send_and_receive(slave_id, pdu)
# 验证响应(应该与请求相同) | Verify response (should be same as request)
if response_pdu != pdu:
raise InvalidResponseError(
"写单个寄存器响应不匹配 | Write single register response mismatch"
)
# 如果提供了回调函数,在后台任务中调用 | If callback is provided, call it in background task
if callback:
asyncio.create_task(self._call_callback(callback, None))
[文档]
async def write_multiple_coils(
self,
slave_id: int,
start_address: int,
values: List[bool],
callback: Optional[Callable[[], None]] = None,
) -> None:
"""
异步写多个线圈(功能码0x0F) | Async Write Multiple Coils (Function Code 0x0F)
Args:
slave_id: 从站地址 | Slave address
start_address: 起始地址 | Starting address
values: 线圈值列表,True表示ON,False表示OFF | List of coil values, True for ON, False for OFF
callback: 可选的回调函数,在操作完成后调用 | Optional callback function, called after operation completes
Example:
>>> await client.write_multiple_coils(1, 0, [True, False, True, False])
"""
quantity = len(values)
if not (1 <= quantity <= 1968):
raise ValueError(
"线圈数量必须在1-1968之间 | Coil quantity must be between 1-1968"
)
# 计算需要的字节数 | Calculate required byte count
byte_count = (quantity + 7) // 8
# 将布尔值列表转换为字节数据 | Convert boolean list to byte data
coil_bytes = []
for byte_idx in range(byte_count):
byte_val = 0
for bit_idx in range(8):
value_idx = byte_idx * 8 + bit_idx
if value_idx < quantity and values[value_idx]:
byte_val |= 1 << bit_idx
coil_bytes.append(byte_val)
# 构建PDU:功能码 + 起始地址 + 数量 + 字节数 + 数据 | Build PDU: function code + starting address + quantity + byte count + data
pdu = struct.pack(">BHHB", 0x0F, start_address, quantity, byte_count)
pdu += bytes(coil_bytes)
# 异步发送请求并接收响应 | Async send request and receive response
response_pdu = await self.transport.send_and_receive(slave_id, pdu)
# 验证响应:功能码 + 起始地址 + 数量 | Verify response: function code + starting address + quantity
expected_response = struct.pack(">BHH", 0x0F, start_address, quantity)
if response_pdu != expected_response:
raise InvalidResponseError(
"写多个线圈响应不匹配 | Write multiple coils response mismatch"
)
# 如果提供了回调函数,在后台任务中调用 | If callback is provided, call it in background task
if callback:
asyncio.create_task(self._call_callback(callback, None))
[文档]
async def write_multiple_registers(
self,
slave_id: int,
start_address: int,
values: List[int],
callback: Optional[Callable[[], None]] = None,
) -> None:
"""
异步写多个寄存器(功能码0x10) | Async Write Multiple Registers (Function Code 0x10)
Args:
slave_id: 从站地址 | Slave address
start_address: 起始地址 | Starting address
values: 寄存器值列表,每个值为0-65535 | List of register values, each value 0-65535
callback: 可选的回调函数,在操作完成后调用 | Optional callback function, called after operation completes
Example:
>>> await client.write_multiple_registers(1, 0, [1234, 5678, 9012])
"""
quantity = len(values)
if not (1 <= quantity <= 123):
raise ValueError(
"寄存器数量必须在1-123之间 | Register quantity must be between 1-123"
)
# 验证所有值都在有效范围内 | Verify all values are within valid range
for i, value in enumerate(values):
if not (0 <= value <= 65535):
raise ValueError(
f"寄存器值[{i}]必须在0-65535之间: {value} | Register value[{i}] must be between 0-65535: {value}"
)
byte_count = quantity * 2
# 构建PDU:功能码 + 起始地址 + 数量 + 字节数 + 数据 | Build PDU: function code + starting address + quantity + byte count + data
pdu = struct.pack(">BHHB", 0x10, start_address, quantity, byte_count)
# 添加寄存器数据 | Add register data
for value in values:
pdu += struct.pack(">H", value)
# 异步发送请求并接收响应 | Async send request and receive response
response_pdu = await self.transport.send_and_receive(slave_id, pdu)
# 验证响应:功能码 + 起始地址 + 数量 | Verify response: function code + starting address + quantity
expected_response = struct.pack(">BHH", 0x10, start_address, quantity)
if response_pdu != expected_response:
raise InvalidResponseError(
"写多个寄存器响应不匹配 | Write multiple registers response mismatch"
)
# 如果提供了回调函数,在后台任务中调用 | If callback is provided, call it in background task
if callback:
asyncio.create_task(self._call_callback(callback, None))
async def _call_callback(self, callback: Callable, result: Any) -> None:
"""安全地调用回调函数 | Safely call callback function"""
try:
if result is None:
callback()
else:
callback(result)
except Exception as e:
self._logger.error(
f"回调函数执行错误 | Callback function execution error: {e}"
)
# 高级数据类型API | Advanced Data Type APIs
[文档]
async def read_float32(
self,
slave_id: int,
start_address: int,
byte_order: str = "big",
word_order: str = "high",
callback: Optional[Callable[[float], None]] = None,
) -> float:
"""
异步读取32位浮点数(占用2个连续寄存器) | Async Read 32-bit float (occupies 2 consecutive registers)
Args:
slave_id: 从站地址 | Slave address
start_address: 起始寄存器地址 | Starting register address
byte_order: 字节序,'big'或'little' | Byte order, 'big' or 'little'
word_order: 字序,'high'或'low' | Word order, 'high' or 'low'
callback: 可选的回调函数,在收到响应后调用 | Optional callback function, called after receiving response
Returns:
32位浮点数值 | 32-bit float value
"""
registers = await self.read_holding_registers(slave_id, start_address, 2)
result = PayloadCoder.decode_float32(registers, byte_order, word_order)
# 如果提供了回调函数,在后台任务中调用 | If callback is provided, call it in background task
if callback:
asyncio.create_task(self._call_callback(callback, result))
return result
[文档]
async def write_float32(
self,
slave_id: int,
start_address: int,
value: float,
byte_order: str = "big",
word_order: str = "high",
callback: Optional[Callable[[], None]] = None,
) -> None:
"""
异步写入32位浮点数(占用2个连续寄存器) | Async Write 32-bit float (occupies 2 consecutive registers)
Args:
slave_id: 从站地址 | Slave address
start_address: 起始寄存器地址 | Starting register address
value: 要写入的浮点数值 | Float value to write
byte_order: 字节序,'big'或'little' | Byte order, 'big' or 'little'
word_order: 字序,'high'或'low' | Word order, 'high' or 'low'
callback: 可选的回调函数,在操作完成后调用 | Optional callback function, called after operation completes
"""
registers = PayloadCoder.encode_float32(value, byte_order, word_order)
await self.write_multiple_registers(
slave_id, start_address, registers, callback
)
[文档]
async def read_int32(
self,
slave_id: int,
start_address: int,
byte_order: str = "big",
word_order: str = "high",
callback: Optional[Callable[[int], None]] = None,
) -> int:
"""
异步读取32位有符号整数(占用2个连续寄存器) | Async Read 32-bit signed integer (occupies 2 consecutive registers)
Args:
slave_id: 从站地址 | Slave address
start_address: 起始寄存器地址 | Starting register address
byte_order: 字节序,'big'或'little' | Byte order, 'big' or 'little'
word_order: 字序,'high'或'low' | Word order, 'high' or 'low'
callback: 可选的回调函数,在收到响应后调用 | Optional callback function, called after receiving response
Returns:
32位有符号整数值 | 32-bit signed integer value
"""
registers = await self.read_holding_registers(slave_id, start_address, 2)
result = PayloadCoder.decode_int32(registers, byte_order, word_order)
# 如果提供了回调函数,在后台任务中调用 | If callback is provided, call it in background task
if callback:
asyncio.create_task(self._call_callback(callback, result))
return result
[文档]
async def write_int32(
self,
slave_id: int,
start_address: int,
value: int,
byte_order: str = "big",
word_order: str = "high",
callback: Optional[Callable[[], None]] = None,
) -> None:
"""
异步写入32位有符号整数(占用2个连续寄存器) | Async Write 32-bit signed integer (occupies 2 consecutive registers)
Args:
slave_id: 从站地址 | Slave address
start_address: 起始寄存器地址 | Starting register address
value: 要写入的整数值 | Integer value to write
byte_order: 字节序,'big'或'little' | Byte order, 'big' or 'little'
word_order: 字序,'high'或'low' | Word order, 'high' or 'low'
callback: 可选的回调函数,在操作完成后调用 | Optional callback function, called after operation completes
"""
registers = PayloadCoder.encode_int32(value, byte_order, word_order)
await self.write_multiple_registers(
slave_id, start_address, registers, callback
)
[文档]
async def __aenter__(self) -> Self:
"""异步上下文管理器入口 | Async context manager entry"""
await self.transport.open()
return self
[文档]
async def __aexit__(
self,
exc_type: Optional[type],
exc_val: Optional[BaseException],
exc_tb: Optional[Any],
) -> None:
"""异步上下文管理器出口 | Async context manager exit"""
await self.transport.close()