示例
本节提供了展示ModbusLink各种功能的综合示例。
基础示例
简单TCP客户端
from modbuslink import ModbusClient, TcpTransport
# 创建TCP传输层
transport = TcpTransport(host='192.168.1.100', port=502)
client = ModbusClient(transport)
try:
# 连接到服务器
client.connect()
# 读取保持寄存器
registers = client.read_holding_registers(
slave_id=1,
start_address=0,
quantity=10
)
print(f"寄存器: {registers}")
# 写入单个寄存器
client.write_single_register(
slave_id=1,
address=0,
value=1234
)
finally:
client.disconnect()
简单RTU客户端
from modbuslink import ModbusClient, RtuTransport
# 创建RTU传输层
transport = RtuTransport(
port='COM1',
baudrate=9600,
bytesize=8,
parity='N',
stopbits=1
)
client = ModbusClient(transport)
with client:
# 读取线圈
coils = client.read_coils(
slave_id=1,
start_address=0,
quantity=8
)
print(f"线圈: {coils}")
# 写入多个线圈
client.write_multiple_coils(
slave_id=1,
start_address=0,
values=[True, False, True, False]
)
简单ASCII客户端
from modbuslink import ModbusClient, AsciiTransport
# 创建ASCII传输层
transport = AsciiTransport(
port='COM1',
baudrate=9600,
bytesize=7,
parity='E',
stopbits=1
)
client = ModbusClient(transport)
with client:
# 读取保持寄存器
registers = client.read_holding_registers(
slave_id=1,
start_address=0,
quantity=4
)
print(f"寄存器: {registers}")
# 写入单个寄存器
client.write_single_register(
slave_id=1,
address=0,
value=1234
)
高级示例
异步操作
import asyncio
from modbuslink import AsyncModbusClient, AsyncTcpTransport
async def async_modbus_operations():
transport = AsyncTcpTransport(host='192.168.1.100', port=502)
client = AsyncModbusClient(transport)
async with client:
# 并发读取操作
tasks = [
client.read_holding_registers(slave_id=1, start_address=0, quantity=10),
client.read_holding_registers(slave_id=1, start_address=10, quantity=10),
client.read_holding_registers(slave_id=1, start_address=20, quantity=10)
]
results = await asyncio.gather(*tasks)
for i, registers in enumerate(results):
print(f"块 {i}: {registers}")
# 顺序写入操作
for i in range(10):
await client.write_single_register(
slave_id=1,
address=i,
value=i * 100
)
# 运行异步函数
asyncio.run(async_modbus_operations())
异步RTU操作
import asyncio
from modbuslink import AsyncModbusClient, AsyncRtuTransport
async def async_rtu_operations():
transport = AsyncRtuTransport(
port='COM1',
baudrate=9600,
timeout=3.0
)
client = AsyncModbusClient(transport)
async with client:
# 异步读取保持寄存器
registers = await client.read_holding_registers(
slave_id=1,
start_address=0,
quantity=10
)
print(f"寄存器: {registers}")
# 异步写入多个寄存器
await client.write_multiple_registers(
slave_id=1,
start_address=0,
values=[100, 200, 300, 400]
)
asyncio.run(async_rtu_operations())
异步ASCII操作
import asyncio
from modbuslink import AsyncModbusClient, AsyncAsciiTransport
async def async_ascii_operations():
transport = AsyncAsciiTransport(
port='COM1',
baudrate=9600,
timeout=3.0
)
client = AsyncModbusClient(transport)
async with client:
# 异步读取线圈
coils = await client.read_coils(
slave_id=1,
start_address=0,
quantity=8
)
print(f"线圈: {coils}")
# 异步写入单个线圈
await client.write_single_coil(
slave_id=1,
address=0,
value=True
)
asyncio.run(async_ascii_operations())
回调机制
from modbuslink import AsyncModbusClient, AsyncTcpTransport
import asyncio
def on_data_received(data):
print(f"接收到数据: {data}")
def on_error(error):
print(f"发生错误: {error}")
async def callback_example():
transport = AsyncTcpTransport(host='192.168.1.100', port=502)
client = AsyncModbusClient(transport)
# 设置回调
client.set_data_callback(on_data_received)
client.set_error_callback(on_error)
async with client:
# 操作将触发回调
await client.read_holding_registers(
slave_id=1,
start_address=0,
quantity=10
)
asyncio.run(callback_example())
高级数据类型
from modbuslink import ModbusClient, TcpTransport
transport = TcpTransport(host='192.168.1.100', port=502)
client = ModbusClient(transport)
with client:
# Float32 操作
temperature = 25.6
client.write_float32(
slave_id=1,
start_address=100,
value=temperature
)
read_temp = client.read_float32(
slave_id=1,
start_address=100
)
print(f"温度: {read_temp}°C")
# Int32 操作,自定义字节/字序
counter_value = -123456
client.write_int32(
slave_id=1,
start_address=102,
value=counter_value,
byte_order='little',
word_order='big'
)
read_counter = client.read_int32(
slave_id=1,
start_address=102,
byte_order='little',
word_order='big'
)
print(f"计数器: {read_counter}")
# UInt32 操作
timestamp = 1640995200 # Unix时间戳
client.write_uint32(
slave_id=1,
start_address=104,
value=timestamp
)
read_timestamp = client.read_uint32(
slave_id=1,
start_address=104
)
print(f"时间戳: {read_timestamp}")
性能测试示例
批量操作性能
import time
import asyncio
from modbuslink import AsyncModbusClient, AsyncTcpTransport
async def performance_test():
transport = AsyncTcpTransport(host='192.168.1.100', port=502)
client = AsyncModbusClient(transport)
async with client:
# 测试批量读取性能
start_time = time.time()
# 并发读取多个寄存器块
tasks = []
for i in range(10):
task = client.read_holding_registers(
slave_id=1,
start_address=i*10,
quantity=10
)
tasks.append(task)
results = await asyncio.gather(*tasks)
end_time = time.time()
print(f"读取100个寄存器耗时: {end_time - start_time:.3f}秒")
print(f"平均每个寄存器: {(end_time - start_time)*1000/100:.2f}ms")
asyncio.run(performance_test())
连接池示例
import asyncio
from modbuslink import AsyncModbusClient, AsyncTcpTransport
class ModbusConnectionPool:
def __init__(self, host, port, pool_size=5):
self.host = host
self.port = port
self.pool_size = pool_size
self.connections = asyncio.Queue(maxsize=pool_size)
async def initialize(self):
for _ in range(self.pool_size):
transport = AsyncTcpTransport(host=self.host, port=self.port)
client = AsyncModbusClient(transport)
await client.connect()
await self.connections.put(client)
async def get_connection(self):
return await self.connections.get()
async def return_connection(self, client):
await self.connections.put(client)
async def close_all(self):
while not self.connections.empty():
client = await self.connections.get()
await client.disconnect()
# 使用连接池
async def use_connection_pool():
pool = ModbusConnectionPool('192.168.1.100', 502)
await pool.initialize()
try:
# 获取连接
client = await pool.get_connection()
# 执行操作
registers = await client.read_holding_registers(
slave_id=1, start_address=0, quantity=10
)
print(f"读取结果: {registers}")
# 归还连接
await pool.return_connection(client)
finally:
await pool.close_all()
asyncio.run(use_connection_pool())
错误处理示例
综合错误处理
from modbuslink import ModbusClient, TcpTransport
from modbuslink.common.exceptions import (
ConnectionError, TimeoutError, CRCError,
InvalidResponseError, ModbusException
)
import time
def robust_modbus_client():
transport = TcpTransport(host='192.168.1.100', port=502, timeout=5.0)
client = ModbusClient(transport)
max_retries = 3
retry_delay = 1.0
for attempt in range(max_retries):
try:
client.connect()
# 执行操作
registers = client.read_holding_registers(
slave_id=1,
start_address=0,
quantity=10
)
print(f"成功读取寄存器: {registers}")
break
except ConnectionError as e:
print(f"连接失败(尝试 {attempt + 1}): {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
retry_delay *= 2 # 指数退避
except TimeoutError as e:
print(f"操作超时(尝试 {attempt + 1}): {e}")
if attempt < max_retries - 1:
time.sleep(retry_delay)
except CRCError as e:
print(f"检测到CRC错误: {e}")
# CRC错误通常表示通信问题
break
except InvalidResponseError as e:
print(f"接收到无效响应: {e}")
break
except ModbusException as e:
print(f"Modbus协议错误: {e}")
print(f"异常码: {e.exception_code}")
break
except Exception as e:
print(f"意外错误: {e}")
break
finally:
try:
client.disconnect()
except:
pass
robust_modbus_client()
日志配置
from modbuslink import ModbusClient, TcpTransport
from modbuslink.utils.logger import setup_logger
import logging
# 配置日志
setup_logger(
name='modbuslink',
level=logging.DEBUG,
log_file='modbus_operations.log',
console_output=True
)
# 创建启用日志的客户端
transport = TcpTransport(host='192.168.1.100', port=502)
client = ModbusClient(transport)
with client:
# 所有操作都将被记录
registers = client.read_holding_registers(
slave_id=1,
start_address=0,
quantity=10
)
client.write_single_register(
slave_id=1,
address=0,
value=1234
)
集成示例
数据采集系统
import asyncio
import json
from datetime import datetime
from modbuslink import AsyncModbusClient, AsyncTcpTransport
class DataAcquisitionSystem:
def __init__(self, host, port):
self.transport = AsyncTcpTransport(host=host, port=port)
self.client = AsyncModbusClient(self.transport)
self.data_buffer = []
async def start_acquisition(self, interval=1.0):
async with self.client:
while True:
try:
# 读取多个数据点
temperature = await self.client.read_float32(
slave_id=1, start_address=100
)
pressure = await self.client.read_float32(
slave_id=1, start_address=102
)
flow_rate = await self.client.read_float32(
slave_id=1, start_address=104
)
# 创建数据记录
record = {
'timestamp': datetime.now().isoformat(),
'temperature': temperature,
'pressure': pressure,
'flow_rate': flow_rate
}
self.data_buffer.append(record)
print(f"数据已采集: {record}")
# 定期保存数据
if len(self.data_buffer) >= 10:
await self.save_data()
except Exception as e:
print(f"采集错误: {e}")
await asyncio.sleep(interval)
async def save_data(self):
if self.data_buffer:
filename = f"data_{datetime.now().strftime('%Y%m%d_%H%M%S')}.json"
with open(filename, 'w') as f:
json.dump(self.data_buffer, f, indent=2)
print(f"已保存 {len(self.data_buffer)} 条记录到 {filename}")
self.data_buffer.clear()
# 使用方法
async def main():
daq = DataAcquisitionSystem('192.168.1.100', 502)
await daq.start_acquisition(interval=2.0)
asyncio.run(main())
服务器示例
基础TCP服务器
from modbuslink import AsyncTcpModbusServer, ModbusDataStore
import asyncio
import logging
async def main():
# 设置日志
logging.basicConfig(level=logging.INFO)
# 创建数据存储
data_store = ModbusDataStore(
coils_size=1000,
discrete_inputs_size=1000,
holding_registers_size=1000,
input_registers_size=1000
)
# 设置初始数据
data_store.write_coils(0, [True, False, True, False, True, False, True, False])
data_store.write_holding_registers(0, [100, 200, 300, 400, 500])
data_store.write_input_registers(0, [250, 251, 252, 253, 254])
data_store.write_discrete_inputs(0, [False, True, False, True, False, True, False, True])
# 创建TCP服务器
server = AsyncTcpModbusServer(
host="localhost",
port=5020,
data_store=data_store,
slave_id=1,
max_connections=5
)
print("启动TCP服务器: localhost:5020")
print("从站地址: 1")
try:
# 启动服务器
await server.start()
print("TCP服务器启动成功!")
# 永久运行
await server.serve_forever()
except KeyboardInterrupt:
print("\n收到停止信号")
finally:
print("正在停止服务器...")
await server.stop()
print("服务器已停止")
asyncio.run(main())
RTU服务器示例
from modbuslink import AsyncRtuModbusServer, ModbusDataStore
import asyncio
import random
import math
async def simulate_industrial_data(data_store):
"""模拟工业设备数据"""
cycle = 0
while True:
try:
cycle += 1
# 模拟温度传感器数据
base_temps = [248, 179, 318, 447, 682]
temp_variations = [base + random.randint(-5, 5) + int(3 * math.sin(cycle * 0.1)) for base in base_temps]
data_store.write_input_registers(0, temp_variations)
# 模拟压力传感器数据
base_pressures = [1015, 1027, 996, 1043, 1004]
pressure_variations = [base + random.randint(-3, 3) + int(2 * math.cos(cycle * 0.15)) for base in base_pressures]
data_store.write_input_registers(10, pressure_variations)
# 模拟电机转速变化
current_speeds = data_store.read_holding_registers(0, 5)
new_speeds = [speed + random.randint(-50, 50) for speed in current_speeds]
new_speeds = [max(500, min(4000, speed)) for speed in new_speeds]
data_store.write_holding_registers(0, new_speeds)
if cycle % 20 == 0:
print(f"工业数据更新 #{cycle}")
print(f" 温度: {temp_variations}")
print(f" 压力: {pressure_variations}")
print(f" 电机转速: {new_speeds}")
await asyncio.sleep(2.0)
except Exception as e:
print(f"数据模拟错误: {e}")
await asyncio.sleep(2.0)
async def main():
# 创建数据存储
data_store = ModbusDataStore(
coils_size=1000,
discrete_inputs_size=1000,
holding_registers_size=1000,
input_registers_size=1000
)
# 初始化工业设备数据
data_store.write_coils(0, [True, False, True, True, False, False, True, False]) # 电机状态
data_store.write_coils(8, [False, True, False, True, True, False, False, True]) # 阀门状态
data_store.write_holding_registers(0, [1500, 2800, 3600, 1200, 750]) # 电机参数
data_store.write_input_registers(0, [248, 179, 318, 447, 682]) # 温度传感器
data_store.write_discrete_inputs(0, [True, False, True, True, False, True, False, True]) # 限位开关
# 创建RTU服务器
server = AsyncRtuModbusServer(
port="COM3", # 根据实际情况修改
baudrate=9600,
data_store=data_store,
slave_id=1,
parity="N",
stopbits=1,
bytesize=8,
timeout=1.0
)
print("RTU服务器配置:")
print(" 串口: COM3")
print(" 波特率: 9600")
print(" 数据位: 8, 停止位: 1, 校验位: 无")
print(" 从站地址: 1")
try:
# 启动服务器
await server.start()
print("RTU服务器启动成功!")
# 启动数据模拟任务
simulation_task = asyncio.create_task(simulate_industrial_data(data_store))
server_task = asyncio.create_task(server.serve_forever())
# 等待任务完成
await asyncio.gather(simulation_task, server_task)
except KeyboardInterrupt:
print("\n收到停止信号")
except Exception as e:
print(f"\n服务器运行错误: {e}")
finally:
print("正在停止服务器...")
await server.stop()
print("服务器已停止")
asyncio.run(main())
ASCII服务器示例
from modbuslink import AsyncAsciiModbusServer, ModbusDataStore
import asyncio
import random
import math
async def simulate_laboratory_experiment(data_store):
"""模拟实验室实验过程"""
experiment_time = 0
while True:
try:
experiment_time += 1
# 模拟温度控制过程
target_temps = data_store.read_holding_registers(0, 5)
current_temps = data_store.read_input_registers(0, 5)
# 温度逐渐趋向目标值
new_temps = []
for i, (current, target) in enumerate(zip(current_temps, target_temps)):
diff = target - current
change = diff * 0.1 + random.randint(-2, 2) + math.sin(experiment_time * 0.05) * 1
new_temp = current + change
new_temps.append(int(max(0, min(500, new_temp))))
data_store.write_input_registers(0, new_temps)
# 模拟湿度变化
base_humidity = [45, 52, 38, 48, 55]
humidity_variations = [base + random.randint(-5, 5) + int(2 * math.cos(experiment_time * 0.08)) for base in base_humidity]
humidity_variations = [max(0, min(100, h)) for h in humidity_variations]
data_store.write_input_registers(10, humidity_variations)
# 模拟pH值变化
base_ph = [700, 650, 720, 680, 710]
ph_variations = [base + random.randint(-10, 10) + int(3 * math.sin(experiment_time * 0.03)) for base in base_ph]
ph_variations = [max(0, min(1400, ph)) for ph in ph_variations]
data_store.write_input_registers(20, ph_variations)
if experiment_time % 15 == 0:
print(f"实验过程模拟 #{experiment_time}")
print(f" 温度: {new_temps}")
print(f" 湿度: {humidity_variations}%")
print(f" pH: {[ph/100.0 for ph in ph_variations]}")
await asyncio.sleep(3.0)
except Exception as e:
print(f"实验模拟错误: {e}")
await asyncio.sleep(3.0)
async def main():
# 创建数据存储
data_store = ModbusDataStore(
coils_size=1000,
discrete_inputs_size=1000,
holding_registers_size=1000,
input_registers_size=1000
)
# 初始化实验室设备数据
data_store.write_coils(0, [False, True, False, True, False, False, True, False]) # 加热器控制
data_store.write_coils(8, [True, True, False, False, True, True, False, False]) # 风扇控制
data_store.write_holding_registers(0, [250, 300, 180, 220, 350]) # 目标温度
data_store.write_input_registers(0, [248, 298, 178, 218, 348]) # 实际温度
data_store.write_discrete_inputs(0, [False, True, False, False, True, True, False, True]) # 门开关状态
# 创建ASCII服务器
server = AsyncAsciiModbusServer(
port="COM4", # 根据实际情况修改
baudrate=9600,
data_store=data_store,
slave_id=2,
parity="E",
stopbits=1,
bytesize=7,
timeout=2.0
)
print("ASCII服务器配置:")
print(" 串口: COM4")
print(" 波特率: 9600")
print(" 数据位: 7, 停止位: 1, 校验位: 偶校验")
print(" 从站地址: 2")
try:
# 启动服务器
await server.start()
print("ASCII服务器启动成功!")
# 启动实验模拟任务
simulation_task = asyncio.create_task(simulate_laboratory_experiment(data_store))
server_task = asyncio.create_task(server.serve_forever())
# 等待任务完成
await asyncio.gather(simulation_task, server_task)
except KeyboardInterrupt:
print("\n收到停止信号")
except Exception as e:
print(f"\n服务器运行错误: {e}")
finally:
print("正在停止服务器...")
await server.stop()
print("服务器已停止")
asyncio.run(main())
多服务器示例
from modbuslink import (
AsyncTcpModbusServer,
AsyncRtuModbusServer,
AsyncAsciiModbusServer,
ModbusDataStore
)
import asyncio
import random
class MultiServerManager:
"""多服务器管理器"""
def __init__(self):
self.servers = {}
self.data_stores = {}
self.running = False
async def setup_servers(self):
"""设置所有服务器"""
# TCP服务器
tcp_data_store = ModbusDataStore(coils_size=1000, discrete_inputs_size=1000,
holding_registers_size=1000, input_registers_size=1000)
tcp_data_store.write_coils(0, [True, False, True, False] * 10)
tcp_data_store.write_holding_registers(0, list(range(100, 150)))
tcp_server = AsyncTcpModbusServer(
host="localhost", port=5020, data_store=tcp_data_store, slave_id=1
)
self.servers["tcp"] = tcp_server
self.data_stores["tcp"] = tcp_data_store
# RTU服务器
rtu_data_store = ModbusDataStore(coils_size=1000, discrete_inputs_size=1000,
holding_registers_size=1000, input_registers_size=1000)
rtu_data_store.write_coils(0, [False, True, False, True] * 8)
rtu_data_store.write_holding_registers(0, [1500, 2800, 3600, 1200, 750])
rtu_server = AsyncRtuModbusServer(
port="COM3", baudrate=9600, data_store=rtu_data_store, slave_id=2
)
self.servers["rtu"] = rtu_server
self.data_stores["rtu"] = rtu_data_store
# ASCII服务器
ascii_data_store = ModbusDataStore(coils_size=1000, discrete_inputs_size=1000,
holding_registers_size=1000, input_registers_size=1000)
ascii_data_store.write_coils(0, [True, True, False, False] * 8)
ascii_data_store.write_holding_registers(0, [250, 300, 180, 220, 350])
ascii_server = AsyncAsciiModbusServer(
port="COM4", baudrate=9600, data_store=ascii_data_store, slave_id=3,
parity="E", stopbits=1, bytesize=7
)
self.servers["ascii"] = ascii_server
self.data_stores["ascii"] = ascii_data_store
async def start_all_servers(self):
"""启动所有服务器"""
print("启动所有服务器...")
for server_type, server in self.servers.items():
try:
await server.start()
print(f"{server_type.upper()}服务器启动成功")
except Exception as e:
print(f"{server_type.upper()}服务器启动失败: {e}")
self.running = True
async def stop_all_servers(self):
"""停止所有服务器"""
print("停止所有服务器...")
for server_type, server in self.servers.items():
try:
await server.stop()
print(f"{server_type.upper()}服务器已停止")
except Exception as e:
print(f"{server_type.upper()}服务器停止失败: {e}")
self.running = False
async def simulate_data_changes(self):
"""模拟数据变化"""
cycle = 0
while self.running:
try:
cycle += 1
# 更新各服务器数据
for server_type, store in self.data_stores.items():
if server_type == "tcp":
# 网络监控数据
traffic_data = [random.randint(100, 1000) for _ in range(10)]
store.write_input_registers(50, traffic_data)
elif server_type == "rtu":
# 工业过程数据
temp_data = [random.randint(200, 400) for _ in range(5)]
store.write_input_registers(0, temp_data)
elif server_type == "ascii":
# 实验室数据
lab_data = [random.randint(180, 350) for _ in range(5)]
store.write_input_registers(0, lab_data)
# 更新计数器
store.write_holding_registers(999, [cycle])
if cycle % 20 == 0:
print(f"数据模拟周期 #{cycle}")
await asyncio.sleep(2.0)
except Exception as e:
print(f"数据模拟错误: {e}")
await asyncio.sleep(2.0)
async def serve_forever(self):
"""永久运行"""
# 启动数据模拟任务
simulation_task = asyncio.create_task(self.simulate_data_changes())
# 启动所有服务器的serve_forever任务
server_tasks = []
for server_type, server in self.servers.items():
try:
if await server.is_running():
server_tasks.append(asyncio.create_task(server.serve_forever()))
except Exception as e:
print(f"{server_type.upper()}服务器serve_forever启动失败: {e}")
# 等待所有任务完成
all_tasks = [simulation_task] + server_tasks
await asyncio.gather(*all_tasks, return_exceptions=True)
async def main():
manager = MultiServerManager()
try:
# 配置和启动服务器
await manager.setup_servers()
await manager.start_all_servers()
print("\n连接信息:")
print(" TCP服务器: localhost:5020 (从站地址 1)")
print(" RTU服务器: COM3@9600,8,N,1 (从站地址 2)")
print(" ASCII服务器: COM4@9600,7,E,1 (从站地址 3)")
print("\n按 Ctrl+C 停止所有服务器")
# 永久运行
await manager.serve_forever()
except KeyboardInterrupt:
print("\n收到停止信号")
except Exception as e:
print(f"\n多服务器运行错误: {e}")
finally:
await manager.stop_all_servers()
asyncio.run(main())
过程控制系统
import asyncio
from modbuslink import AsyncModbusClient, AsyncTcpTransport
class ProcessController:
def __init__(self, host, port):
self.transport = AsyncTcpTransport(host=host, port=port)
self.client = AsyncModbusClient(self.transport)
self.setpoints = {
'temperature': 25.0,
'pressure': 1013.25
}
async def control_loop(self):
async with self.client:
while True:
try:
# 读取过程变量
current_temp = await self.client.read_float32(
slave_id=1, start_address=100
)
current_pressure = await self.client.read_float32(
slave_id=1, start_address=102
)
# 简单比例控制
temp_error = self.setpoints['temperature'] - current_temp
pressure_error = self.setpoints['pressure'] - current_pressure
# 计算控制输出
heater_output = max(0, min(100, 50 + temp_error * 10))
pump_output = max(0, min(100, 50 + pressure_error * 5))
# 写入控制输出
await self.client.write_float32(
slave_id=1, start_address=200, value=heater_output
)
await self.client.write_float32(
slave_id=1, start_address=202, value=pump_output
)
print(f"温度: {current_temp:.2f}°C (设定值: {self.setpoints['temperature']}°C), "
f"加热器: {heater_output:.1f}%")
print(f"压力: {current_pressure:.2f} mbar (设定值: {self.setpoints['pressure']} mbar), "
f"泵: {pump_output:.1f}%")
except Exception as e:
print(f"控制错误: {e}")
await asyncio.sleep(1.0) # 1秒控制循环
def set_temperature_setpoint(self, value):
self.setpoints['temperature'] = value
def set_pressure_setpoint(self, value):
self.setpoints['pressure'] = value
# 使用方法
async def main():
controller = ProcessController('192.168.1.100', 502)
# 启动控制循环
control_task = asyncio.create_task(controller.control_loop())
# 模拟设定值变化
await asyncio.sleep(10)
controller.set_temperature_setpoint(30.0)
await asyncio.sleep(10)
controller.set_pressure_setpoint(1020.0)
# 运行一段时间
await asyncio.sleep(30)
control_task.cancel()
asyncio.run(main())