美文网首页
python 实现OPCUA server

python 实现OPCUA server

作者: 时尚灬IT男 | 来源:发表于2022-03-07 11:20 被阅读0次

    需要用到的库
    pip install opcua

    直接上代码

    # encoding=utf-8
    
    from opcua import ua, Server
    from loguru import logger
    from json_config import JsonConf
    from datetime import datetime
    
    class myObject():
        def __init__(self,server,name,groups,quantity):
            self.server = server
            self.name = name
            self.groups = groups
            self.quantity = int(quantity)
            self.Nodes = {}
            self.create_Node()
    
        def create_Node(self):
    
            objects = server.get_objects_node()
            # populating our address space
            myobj = objects.add_object(idx, self.name)
            for ID in self.groups:
                self.Nodes[str(ID)] = []
                subobj = myobj.add_object(idx, str(ID))
                for addr in range(0,self.quantity):
                    myvar = subobj.add_variable(idx, str(addr), 0)
                    myvar.set_writable()
                    self.Nodes[str(ID)].append(myvar)
    
            # myvar = myobj.add_variable(idx, "MyVariable", 6.7)
            # myvar.set_writable()  # Set MyVariable to be writable by clients
            #t1 = threading.Thread(target=self.set_val)  # 创建线程
            #t1.setDaemon(True)  # 设置为后台线程,这里默认是False,设置为True之后则主线程不用等待子线程
            #t1.start()  # 开启线程
    
            # for ID in self.groups:
            #     try:
            #         for addr in range(0, self.quantity):
            #
            #             datavalue = ua.DataValue(0)
            #             date_time = datetime.strptime('2021-01-01 00:00:00', '%Y-%m-%d %H:%M:%S')
            #             datavalue.SourceTimestamp = date_time
            #         # tag.set_value(datavalue)
            #             self.Nodes[str(ID)][addr].set_value(datavalue)
            #             self.Nodes[str(ID)][addr].set_value(0)
            #     except Exception as e:
            #         logger.info('{},{},{}'.format(self.name, ID, e))
    
    if __name__ == "__main__":
    
        # setup our server
        server = Server()
        server.set_endpoint("opc.tcp://0.0.0.0:48408/freeopcua/server/")
        # setup our own namespace, not really necessary but should as spec
        # uri = "http://examples.freeopcua.github.io"
        uri = "WBOPC_UA"
        idx = server.register_namespace(uri)
    
        config_json = JsonConf.load()
        # print(config_json)
        for user in config_json['OPC_UA_Server']:
            my_obj = myObject(server, user["name"],  user["groups"],user["quantity"])
    
        server.start()
    
    
    

    这里为了方便可配置测点结构和数量补充下json_config.py

    #!/usr/bin/env python3
    # -*- coding: utf-8 -*-
    '''
    json配置文件类,调用方法
    data_dict = {"a":"1", "b":"2"}
    JsonConf.set(data_dict)
    即可在当前目录下生成json文件:config.json
    '''
    import json
    import os
    
    class JsonConf:
        '''json配置文件类'''
    
        @staticmethod
        def store(data):
            with open("config.json", 'w') as json_file:
                json_file.write(json.dumps(data, indent=4))
    
        @staticmethod
        def load():
            if not os.path.exists('config.json'):
                with open("config.json", 'w') as json_file:
                    pass
            with open('config.json',encoding='utf-8') as json_file:
                try:
                    data = json.load(json_file)
                except Exception as e:
                    print(e)
                    data = {}
                return data
    
        @staticmethod
        def set(data_dict):
            json_obj = JsonConf.load()
            for key in data_dict:
                json_obj[key] = data_dict[key]
            JsonConf.store(json_obj)
            json.dumps(json_obj, indent=4)
    
    
    # if __name__ == "__main__":
    #     # data = {"a": "1", "f": "100", "b": "3000"}
    #     # JsonConf.set(data)
    #     print(JsonConf.load())
    
    

    附上测试配置文件 config.json:

    {
        "OPC_UA_Server": [
            {"name": "test","groups": [1,2,3,4,5,6,7,8,9,10],"quantity": 100},
            {"name": "user1","groups": [1,2,3,4,5,6,7,8,9,10],"quantity": 100},
            {"name": "user2","groups": [1,2,3,4,5,6,7,8,9,10],"quantity": 100},
        {"name": "user3","groups": [1,2,3,4,5,6,7,8,9,10],"quantity": 100}
        ]
    }
    
    

    相关文章

      网友评论

          本文标题:python 实现OPCUA server

          本文链接:https://www.haomeiwen.com/subject/dxogrrtx.html