美文网首页
基于python实现mqtt连接类

基于python实现mqtt连接类

作者: 不上进的码农 | 来源:发表于2021-04-08 15:06 被阅读0次

    本篇文章介绍mqtt基于python语言在生产环境的使用

    paho-mqtt的安装

    pip install paho-mqtt #可以指定版本paho-mqtt==1.5.0

    import re
    import json
    from inspect import isfunction
    import paho.mqtt.client as mqtt
    from settings import Config
    
    
    class EMQClient(object):
        '''
        EMQx Mqtt client class
        usage:
            client = EMQClient(config=config)
        '''
        # MQTT client
        client = None
        # MQTT message callback handlers
        message_handlers = {'common': []}
        # MQTT config dict
        config = None
        # callback handler
        handlers = {}
    
        def __init__(self, config, **kwargs):
            self.config = {kv[0].lower(): kv[1] for kv in config.items()}
            if 'handlers' in kwargs and isinstance(kwargs['handlers'], dict):
                self.handlers = kwargs['handlers']
    
            self.client = mqtt.Client(
                client_id=self.config['client_id'] if 'client_id' in self.config else None)
            self.client.on_connect = self.on_connect
            self.client.on_disconnect = self.on_disconnect
            self.client.on_message = self.on_message
            self.client.on_subscribe = self.on_subscribe
    
        def connect(self):
            '''connect to the mqtt server. '''
            self.client.username_pw_set(
                self.config['username'] if 'username' in self.config else None, self.config['password'])
            # 'admin', '123456')
            self.client.connect(
                self.config['host'], self.config['port'],
                keepalive=self.config['keepalive'] if 'keepalive' in self.config else 60)
            self.client.loop_start()
    
        def disconnect(self):
            self.client.loop_stop()
    
        def set_handler(self, handler, func):
            '''
            add callback hanlder
            :param type: handler type
            :param handler: handle message function
            '''
            if handler not in ['connect_handler', 'disconnect_handler', 'subscribe_handler']:
                return False
    
            self.handlers[handler] = func
    
        def add_message_handler(self, handler, topic='common'):
            '''
            add message hanlder
            :param handler: handle message function
            :param topic: handle the assigned topic message, `common` means will handle all messages
            '''
            if topic not in self.message_handlers:
                self.message_handlers[topic] = []
    
            self.message_handlers[topic].append(handler)
    
        def on_connect(self, client, userdata, flags, rc):
            '''
            callback func when client connected to the mqtt server.
            rc  0:连接成功
                1:连接被拒绝-协议版本不正确
                2:连接被拒绝-无效的客户端标识符
                3:连接被拒绝-服务器不可用
                4:连接被拒绝-用户名或密码错误
                5:连接被拒绝-未经授权
                6-255:当前未使用。
            '''
            print(client, userdata, flags, rc)
            try:
                'connect_handler' in self.handlers and isfunction(self.handlers['connect_handler']) and self.handlers[
                    'connect_handler'](self)
            except Exception as e:
                print(e)
    
        def on_disconnect(self, client, userdata, rc):
            'disconnect_handler' in self.handlers and isfunction(self.handlers['disconnect_handler']) and self.handlers[
                'disconnect_handler'](userdata=userdata, rc=rc)
    
        def on_subscribe(self, client, userdata, mid, granted_qos):
            'subscribe_handler' in self.handlers and isfunction(self.handlers['subscribe_handler']) and self.handlers[
                'subscribe_handler'](userdata=userdata, mid=mid, granted_qos=granted_qos)
    
        def on_message(self, client, userdata, message):
            '''
            handle the message when a PUBLISH message is received from the server
            handlers should not set blocked
            '''
            # print(message.payload)
            for topic in self.message_handlers.keys():
                if topic == 'common':
                    continue
    
                regex = re.compile(
                    '^{}$'.format(topic.replace('+', '[^\/\s]+').replace('#', '\S+').replace('/', '\/').replace('$', '\$')))
                if regex.match(message.topic):
                    for handler in self.message_handlers.get(topic, []):
                        handler(message.payload)
    
            # call common topic handlers
            for handler in self.message_handlers['common']:
                handler(message)
    
        def subscribe(self, topic, **kwargs):
            '''
            subscribe topic message
            default set qos to 0
            '''
            print(topic)
            return self.client.subscribe(
                topic, **kwargs)
    
        def publish(self, topic, message, **kwargs):
            '''publish message through topic'''
            return self.client.publish(topic, json.dumps(message, ensure_ascii=False), **kwargs)
    
        def unsubscribe(self, topic, properties=None):
            '''unsubscribe topic'''
            return self.client.unsubscribe(topic, properties=properties)
    
    
    emq_client = EMQClient(config=Config.MQTT_CONFIG)
    emq_client.set_handler('connect_handler', connect_handler)
    emq_client.connect()
    

    相关文章

      网友评论

          本文标题:基于python实现mqtt连接类

          本文链接:https://www.haomeiwen.com/subject/hxqlbktx.html