美文网首页
基于python实现mqtt连接类

基于python实现mqtt连接类

作者: 不上进的码农 | 来源:发表于2021-04-08 15:06 被阅读0次

本篇文章介绍mqtt基于python语言在生产环境的使用

paho-mqtt的安装

pip install paho-mqtt #可以指定版本paho-mqtt==1.5.0

import re
import json
from inspect import isfunction
import paho.mqtt.client as mqtt
from settings import Config


class EMQClient(object):
    '''
    EMQx Mqtt client class
    usage:
        client = EMQClient(config=config)
    '''
    # MQTT client
    client = None
    # MQTT message callback handlers
    message_handlers = {'common': []}
    # MQTT config dict
    config = None
    # callback handler
    handlers = {}

    def __init__(self, config, **kwargs):
        self.config = {kv[0].lower(): kv[1] for kv in config.items()}
        if 'handlers' in kwargs and isinstance(kwargs['handlers'], dict):
            self.handlers = kwargs['handlers']

        self.client = mqtt.Client(
            client_id=self.config['client_id'] if 'client_id' in self.config else None)
        self.client.on_connect = self.on_connect
        self.client.on_disconnect = self.on_disconnect
        self.client.on_message = self.on_message
        self.client.on_subscribe = self.on_subscribe

    def connect(self):
        '''connect to the mqtt server. '''
        self.client.username_pw_set(
            self.config['username'] if 'username' in self.config else None, self.config['password'])
        # 'admin', '123456')
        self.client.connect(
            self.config['host'], self.config['port'],
            keepalive=self.config['keepalive'] if 'keepalive' in self.config else 60)
        self.client.loop_start()

    def disconnect(self):
        self.client.loop_stop()

    def set_handler(self, handler, func):
        '''
        add callback hanlder
        :param type: handler type
        :param handler: handle message function
        '''
        if handler not in ['connect_handler', 'disconnect_handler', 'subscribe_handler']:
            return False

        self.handlers[handler] = func

    def add_message_handler(self, handler, topic='common'):
        '''
        add message hanlder
        :param handler: handle message function
        :param topic: handle the assigned topic message, `common` means will handle all messages
        '''
        if topic not in self.message_handlers:
            self.message_handlers[topic] = []

        self.message_handlers[topic].append(handler)

    def on_connect(self, client, userdata, flags, rc):
        '''
        callback func when client connected to the mqtt server.
        rc  0:连接成功
            1:连接被拒绝-协议版本不正确
            2:连接被拒绝-无效的客户端标识符
            3:连接被拒绝-服务器不可用
            4:连接被拒绝-用户名或密码错误
            5:连接被拒绝-未经授权
            6-255:当前未使用。
        '''
        print(client, userdata, flags, rc)
        try:
            'connect_handler' in self.handlers and isfunction(self.handlers['connect_handler']) and self.handlers[
                'connect_handler'](self)
        except Exception as e:
            print(e)

    def on_disconnect(self, client, userdata, rc):
        'disconnect_handler' in self.handlers and isfunction(self.handlers['disconnect_handler']) and self.handlers[
            'disconnect_handler'](userdata=userdata, rc=rc)

    def on_subscribe(self, client, userdata, mid, granted_qos):
        'subscribe_handler' in self.handlers and isfunction(self.handlers['subscribe_handler']) and self.handlers[
            'subscribe_handler'](userdata=userdata, mid=mid, granted_qos=granted_qos)

    def on_message(self, client, userdata, message):
        '''
        handle the message when a PUBLISH message is received from the server
        handlers should not set blocked
        '''
        # print(message.payload)
        for topic in self.message_handlers.keys():
            if topic == 'common':
                continue

            regex = re.compile(
                '^{}$'.format(topic.replace('+', '[^\/\s]+').replace('#', '\S+').replace('/', '\/').replace('$', '\$')))
            if regex.match(message.topic):
                for handler in self.message_handlers.get(topic, []):
                    handler(message.payload)

        # call common topic handlers
        for handler in self.message_handlers['common']:
            handler(message)

    def subscribe(self, topic, **kwargs):
        '''
        subscribe topic message
        default set qos to 0
        '''
        print(topic)
        return self.client.subscribe(
            topic, **kwargs)

    def publish(self, topic, message, **kwargs):
        '''publish message through topic'''
        return self.client.publish(topic, json.dumps(message, ensure_ascii=False), **kwargs)

    def unsubscribe(self, topic, properties=None):
        '''unsubscribe topic'''
        return self.client.unsubscribe(topic, properties=properties)

emq_client = EMQClient(config=Config.MQTT_CONFIG)
emq_client.set_handler('connect_handler', connect_handler)
emq_client.connect()

相关文章

网友评论

      本文标题:基于python实现mqtt连接类

      本文链接:https://www.haomeiwen.com/subject/hxqlbktx.html