需要用到的库
pip install opcua
直接上代码
# encoding=utf-8
from opcua import ua, Server
from loguru import logger
from json_config import JsonConf
from datetime import datetime
class myObject():
def __init__(self,server,name,groups,quantity):
self.server = server
self.name = name
self.groups = groups
self.quantity = int(quantity)
self.Nodes = {}
self.create_Node()
def create_Node(self):
objects = server.get_objects_node()
# populating our address space
myobj = objects.add_object(idx, self.name)
for ID in self.groups:
self.Nodes[str(ID)] = []
subobj = myobj.add_object(idx, str(ID))
for addr in range(0,self.quantity):
myvar = subobj.add_variable(idx, str(addr), 0)
myvar.set_writable()
self.Nodes[str(ID)].append(myvar)
# myvar = myobj.add_variable(idx, "MyVariable", 6.7)
# myvar.set_writable() # Set MyVariable to be writable by clients
#t1 = threading.Thread(target=self.set_val) # 创建线程
#t1.setDaemon(True) # 设置为后台线程,这里默认是False,设置为True之后则主线程不用等待子线程
#t1.start() # 开启线程
# for ID in self.groups:
# try:
# for addr in range(0, self.quantity):
#
# datavalue = ua.DataValue(0)
# date_time = datetime.strptime('2021-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')
# datavalue.SourceTimestamp = date_time
# # tag.set_value(datavalue)
# self.Nodes[str(ID)][addr].set_value(datavalue)
# self.Nodes[str(ID)][addr].set_value(0)
# except Exception as e:
# logger.info('{},{},{}'.format(self.name, ID, e))
if __name__ == "__main__":
# setup our server
server = Server()
server.set_endpoint("opc.tcp://0.0.0.0:48408/freeopcua/server/")
# setup our own namespace, not really necessary but should as spec
# uri = "http://examples.freeopcua.github.io"
uri = "WBOPC_UA"
idx = server.register_namespace(uri)
config_json = JsonConf.load()
# print(config_json)
for user in config_json['OPC_UA_Server']:
my_obj = myObject(server, user["name"], user["groups"],user["quantity"])
server.start()
这里为了方便可配置测点结构和数量补充下json_config.py
#!/usr/bin/env python3
# -*- coding: utf-8 -*-
'''
json配置文件类,调用方法
data_dict = {"a":"1", "b":"2"}
JsonConf.set(data_dict)
即可在当前目录下生成json文件:config.json
'''
import json
import os
class JsonConf:
'''json配置文件类'''
@staticmethod
def store(data):
with open("config.json", 'w') as json_file:
json_file.write(json.dumps(data, indent=4))
@staticmethod
def load():
if not os.path.exists('config.json'):
with open("config.json", 'w') as json_file:
pass
with open('config.json',encoding='utf-8') as json_file:
try:
data = json.load(json_file)
except Exception as e:
print(e)
data = {}
return data
@staticmethod
def set(data_dict):
json_obj = JsonConf.load()
for key in data_dict:
json_obj[key] = data_dict[key]
JsonConf.store(json_obj)
json.dumps(json_obj, indent=4)
# if __name__ == "__main__":
# # data = {"a": "1", "f": "100", "b": "3000"}
# # JsonConf.set(data)
# print(JsonConf.load())
附上测试配置文件 config.json:
{
"OPC_UA_Server": [
{"name": "test","groups": [1,2,3,4,5,6,7,8,9,10],"quantity": 100},
{"name": "user1","groups": [1,2,3,4,5,6,7,8,9,10],"quantity": 100},
{"name": "user2","groups": [1,2,3,4,5,6,7,8,9,10],"quantity": 100},
{"name": "user3","groups": [1,2,3,4,5,6,7,8,9,10],"quantity": 100}
]
}
网友评论