在工业自动化领域,OPC UA已经成为了一种广泛采用的通信协议。它提供了一种标准化、跨平台的通信方式,允许不同厂商的设备和系统之间进行数据交换。本文将介绍如何使用Python的opcua库来创建一个简单的OPC UA服务器和客户端。
OPC UA服务器
首先,我们创建一个OPC UA服务器。这个服务器会在本地端口4840上监听连接请求。它定义了一个名为“Sinumerik”的对象,并在该对象下添加了三个变量(RandomValue1、RandomValue2和RandomValue3)。这些变量的值会每5秒更新一次,取值为0到100的随机整数。
from opcua import Serverimport randomimport time# 创建并配置服务器server = Server()server.set_endpoint("opc.tcp://0.0.0.0:4840")idx = server.register_namespace("http://examples.sinumerik")# 添加对象和变量objects_node = server.get_objects_node()myobj = objects_node.add_object(idx, "Sinumerik")var1 = myobj.add_variable(idx, "RandomValue1", 0)var2 = myobj.add_variable(idx, "RandomValue2", 0)obj = myobj.add_object(idx, "Test")var3 = obj.add_variable(idx, "RandomValue3", 0)# 启动服务器server.start()print("Server started at opc.tcp://0.0.0.0:4840")# 无限循环,更新变量值try: while True: var1.set_value(random.randint(0, 100)) var2.set_value(random.randint(0, 100)) var3.set_value(random.randint(0, 100)) print("Variables updated") time.sleep(5)except KeyboardInterrupt: server.stop()
OPC UA客户端
接下来,我们实现一个OPC UA客户端。这个客户端会连接到我们刚刚创建的服务器,并读取“Sinumerik”对象下的所有变量。读取到的数据会被写入到一个CSV文件中,每5秒更新一次。
import csvfrom datetime import datetimeimport osimport timefrom opcua import Client, ua# 创建客户端并连接到服务器client = Client("opc.tcp://localhost:4840/")client.connect()# 获取服务器对象节点objects = client.get_objects_node()sinumerik_node = objects.get_child(["2:Sinumerik"])# 遍历所有子节点并返回变量节点的路径和数值def get_variables(node, path=""): variables = {} children = node.get_children() for child in children: new_path = f"{path}/{child.get_browse_name().Name}" if child.get_node_class() == ua.NodeClass.Variable: value = child.get_value() variables[new_path] = value else: variables.update(get_variables(child, new_path)) return variables# 处理CSV文件filename = 'opcua_data.csv'if os.path.exists(filename): modification_time = os.path.getmtime(filename) modification_time_str = datetime.fromtimestamp(modification_time).strftime('%Y%m%d%H%M%S') new_filename = f"{filename}_{modification_time_str}" os.rename(filename, new_filename)# 写入数据到CSV文件with open(filename, mode='w', newline='') as csvfile: first_run = True try: while True: variables = get_variables(sinumerik_node) if first_run: fieldnames = variables.keys() writer = csv.DictWriter(csvfile, fieldnames=fieldnames) writer.writeheader() first_run = False if writer: writer.writerow(variables) csvfile.flush() time.sleep(5) except KeyboardInterrupt: client.disconnect()
通过以上代码,我们创建了一个简单的OPC UA服务器和客户端。服务器定期更新变量的值,而客户端则定期读取这些值并保存到CSV文件中。这只是一个起点,你可以根据实际需求对代码进行扩展和优化。希望这篇文章能帮助你快速入门OPC UA的Python编程。