目录
- 服务器模拟
- 1. 环境准备
- 2. 服务器设置
- 3. 服务器初始化
- 4. 节点操作
- 5. 读取CSV文件
- 6. 运行服务器
- 查看服务器
- 客户端
- 总结
在工业自动化和物联网(IoT)领域,OPC UA(开放平台通信统一架构)已经成为一种广泛采用的数据交换标准。它提供了一种安全、可靠且独立于平台的方式来访问实时数据。在本文中,我们将探讨如何使用Python和OPC UA库来创建一个高效的数据服务器,该服务器能够从CSV文件读取数据,并允许OPC UA客户端访问这些数据。
服务器模拟
1. 环境准备
确保您的Python环境中已安装opcua
库。如果未安装,请使用以下命令进行安装:
pip install opcua
导入python库
import argparseimport csvimport timefrom typing import Listfrom opcua import Server, Node, ua
2. 服务器设置
首先,我们定义命令行参数来配置服务器地址、端口和数据源路径。
parser = argparse.ArgumentParser(description="OPCUA模拟服务器")# 添加参数parser.add_argument("--ip", "-i", default="0.0.0.0", help="服务器地址")parser.add_argument("--port", "-p", default=4840, type=int, help="服务器端口")parser.add_argument("--source", "-s", default="opcua_data.csv", help="数据源路径")parser.add_argument("--type", "-t", default="fanuc", help="模拟类型")# 解析命令行参数args = parser.parse_args()CSV_FILE = args.sourceSERVER_URL = "opc.tcp://" + str(args.ip) + ":" + str(args.port)SERVER_TYPE: str = args.type
参数:
3. 服务器初始化
接着,我们初始化OPC UA服务器,并设置命名空间。
# 创建服务器实例server = Server()# 设置服务器端口server.set_endpoint(SERVER_URL)# 创建一个命名空间uri = "http://opcua.simulator.com"idx = server.register_namespace(uri)
4. 节点操作
定义函数来处理节点信息,并添加或更新节点。
# 获取对象节点,它通常是根节点的第一个孩子objects = server.get_objects_node()node_cache_dict = {}def get_node_dict(node_info: str): pairs = node_info.split(";") if len(pairs) <= 1: return {"ns": idx, "s": node_info} data = {} for pair in pairs: key, value = pair.split("=") data[key] = value return datadef add_node(parent: Node, node_name, value, node_dict: dict, node_type="obj"): node_path = node_dict.get("s") node_ns = node_dict.get("ns") node_id = node_dict.get("i") # 检查节点是否已经存在 children: List[Node] = parent.get_children() for child in children: browse_name: ua.QualifiedName = child.get_browse_name() if browse_name.Name == node_name: return child # 返回已存在的节点 # 如果节点不存在,创建它 if node_type == "var": if SERVER_TYPE.upper() == "FANUC": node_idx = f"ns={node_ns};i={node_id}" if node_id else idx else: node_idx = f"ns={node_ns};s={node_path}" new_node = parent.add_variable(node_idx, node_name, str(value)) else: new_node: Node = parent.add_object(idx, node_name) # new_node.set_writable() # 设置变量为可写 return new_nodedef set_node_value(node_info: str, value): node_dict = get_node_dict(node_info) node_path: str = node_dict.get("s") if node_path.startswith("/Server/"): return try: last_node: Node = node_cache_dict.get(node_path) if last_node is None: # 递归查找或创建节点 parent = objects parts = node_path.split("/") for part in parts[:-1]: # 遍历除了最后一个节点的所有节点 if part == "": continue parent = add_node( parent, part, value, node_dict ) # 创建节点(如果不存在) # 设置或更新最后一个节点的值 last_node_name = parts[-1] last_node = add_node(parent, last_node_name, value, node_dict, "var") node_cache_dict[node_path] = last_node last_node.set_value(str(value)) except Exception as e: print(f"Error setting node value: {e}")
5. 读取CSV文件
使用csv
模块读取CSV文件,并更新OPC UA节点的值。
csv.field_size_limit(500 * 1024 * 1024)try: with open(CSV_FILE, "r") as csvfile: csvreader = csv.reader(csvfile) header = next(csvreader) row = next(csvreader) for i in range(len(header)): set_node_value(header[i], row[i]) # 首次创建节点 server.start() # 启动服务器(先创建结构再启动server) print(f"Server started at {SERVER_URL}") # 每秒读取csv中的一行,更新节点值 while True: try: row = next(csvreader) for i in range(len(header)): set_node_value(header[i], row[i]) time.sleep(1) except StopIteration: csvfile.seek(0) # 回到文件开头 next(csvreader) # 跳过表头finally: server.stop() if server else None
6. 运行服务器
执行脚本,服务器将启动,并开始从CSV文件读取数据更新OPC UA节点。
python opcua_simulator.py --ip 0.0.0.0 --port 4840 --source opcua_data.csv
查看服务器
可以使用 UA Sample Client
(下载链接)软件连接到创建的opcua服务器查看结构和数据。
数据监控:
客户端
【Python】OPC UA 服务器与客户端的实现
总结
本文介绍了如何使用Python和OPC UA库创建一个模拟服务器。通过读取CSV文件,服务器能够动态更新OPC UA节点的值,这对于开发和测试OPC UA客户端应用程序非常有用。您可以按照上述步骤进行实践,并根据具体需求调整代码。