MQTT简介和使用

作者: Heweii | 来源:发表于2019-07-14 20:25 被阅读0次

    前言

    最近在做的智能家居项目中有用到MQTT做消息的推送,主要是为了实现低流量下的智能家居控制(我们用到的是劳沃协议),在使用的时候也是遇到很多坑(特别是重连),这里讲讲自己的个人经验和解决问题的方式。

    一.MQTT介绍

    1.简介

    MQTT(message queuing telemetry transport)是IBM开发的即时通讯协议,是一种发布/订阅极其轻量级的消息传输协议,专门为网络受限设备、低宽带以及高延迟和不可靠的网络而设计的。由于以上轻量级的特点,是实现智能家居的首选传输协议,相比于XMPP,更加轻量级而且占用宽带低。

    2.特点

    a.由于采用发布/订阅的消息模式,可以提供一对多的消息发布
    b.轻量级,网络开销小
    c.对负载内容会有屏蔽的消息传输
    d.有三种消息发布质量(Qos):
    qos=0:“至多一次”,这一级别会发生消息丢失或重复,消息发布依赖于TCP/IP网络
    qos=1:“至少一次”,确保消息到达,但消息重复可能会发生
    qos=2:“只有一次”,确保消息到达一次
    e.通知机制,异常中断时会通知双方

    3.原理

    14523188625918865.png

    MQTT协议有三种身份:发布者、代理、订阅者,发布者和订阅者都为客户端,代理为服务器,同时消息的发布者也可以是订阅者(为了节约内存和流量发布者和订阅者一般都会定义在一起)。
    MQTT传输的消息分为主题(Topic,可理解为消息的类型,订阅者订阅后,就会收到该主题的消息内容(payload))和负载(payload,可以理解为消息的内容)两部分。

    二.MQTT通用使用

    通用的使用方式的重连机制在安卓系统中需要自己去编写,下一篇会详细讲解阿里专门针对Android客户端的实现方式MqttAndroidClient

    1.集成

    1.build.grade中导入

    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'
    

    2.添加权限

    <uses-permission android:name="android.permission.INTERNET" />
    <uses-permission android:name="android.permission.ACCESS_NETWORK_STATE" />
    <uses-permission android:name="android.permission.WAKE_LOCK" />
    

    2.使用

    直接上代码,以下就是发布者和订阅者一体的实现方式,自己添加了重连机制,对应的注释也比较清晰,如果对于MQTT入门最好看下阿里MQTT的官方文档

    public class MQTTManager {
    
        private String broker = "";//固定配置
        private String secretKey = "";//固定配置
        private String acessKey = "";//固定配置
    
        private String topic = "";//自己定义
        private String groupId = "";//自己定义
        private String clientId = "";//自己定义
    
        private MqttClient mqttClient;
        private volatile static MQTTManager manager;
    
        private int[] qos = {0, 0};//订阅个数就是数组的长度
    
        private ScheduledExecutorService reconnectPool;//重连线程池
    
        public static MQTTManager getInstance() {
            if (manager == null) {
                synchronized (MQTTManager.class) {
                    if (manager == null)
                        manager = new MQTTManager();
                }
            }
            return manager;
        }
    
        public MQTTManager() {
            clientId = String.format("%s@@@%s", groupId, MQTTCons.Sep_SEND);//这是根据自己需求定义的clientId
        }
    
        /**
         * 发送信息
         * @param msg
         */
        public void sendMessage(String msg) {
            MqttMessage message = new MqttMessage(msg.getBytes());
            message.setQos(0);
            try {
                if (mqttClient != null)
                    mqttClient.publish(topic, message);
            } catch (MqttException e) {
                e.printStackTrace();
                TVLog.i("MqttException-sendMQTT-" + e);
            }
        }
    
        /**
         * 开启MQTT连接和订阅
         */
        public void startSendMQTT() {
            try {
                closeMQTT();//断开和关闭连接的操作,由于我们需求需要,切换用户要重新创建新的连接,一般应用中基本都会始终订阅一条
                MemoryPersistence persistence = new MemoryPersistence();
                mqttClient = new MqttClient(broker, clientId, persistence);
                final MqttConnectOptions connOpts = new MqttConnectOptions();
                String sign = MacSignature.macSignature(clientId.split("@@@")[0], secretKey);
                connOpts.setUserName(acessKey);
                connOpts.setServerURIs(new String[]{broker});
                connOpts.setPassword(sign.toCharArray());
                connOpts.setCleanSession(true);
                connOpts.setKeepAliveInterval(20);
                connOpts.setConnectionTimeout(10);
                connOpts.setMqttVersion(MQTT_VERSION_3_1_1);
                connOpts.setAutomaticReconnect(false);//禁用自带重连机制,用于TV端会出现不稳定性,所以自己写了重连
                mqttClient.setCallback(new MqttCallbackExtended() {
                    public void connectComplete(boolean reconnect, String serverURI) {
                        TVLog.i("Send connect success" + topic);
                        closeReconnectTask();
                        subscribeFilter();//mqtt每次连接成功都得订阅Topic
                    }
    
                    public void connectionLost(Throwable throwable) {
                        TVLog.i("mqtt connection lost");
                        startReconnectTask();
                    }
    
                    public void messageArrived(String topic, MqttMessage mqttMessage) throws Exception {
                        TVLog.i("messageArrived:" + topic + "------" + new String(mqttMessage.getPayload()));
                    }
    
                    public void deliveryComplete(IMqttDeliveryToken iMqttDeliveryToken) {
                        try {
                            TVLog.i("deliveryComplete:" + iMqttDeliveryToken.getMessage().toString());
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }
                    }
                });
                mqttClient.connect(connOpts);
            } catch (Exception me) {
                me.printStackTrace();
            }
        }
    
        /**
         * 订阅Topic
         */
        private void subscribeFilter() {
            String registerTopic = "";//自定义
            String controlTopic = "";//自定义,作为示例订阅了两个
            String[] topicFilters = new String[]{registerTopic, controlTopic};
    
            try {
                mqttClient.subscribe(topicFilters, qos);
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    
        private synchronized void startReconnectTask() {//开启重连任务
            if (reconnectPool != null) return;
            reconnectPool = Executors.newScheduledThreadPool(1);
            reconnectPool.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    try {
                        if (mqttClient == null || mqttClient.isConnected()) return;
                        mqttClient.reconnect();
                        TVLog.d("reconnectSendMQTT" + topic);
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            }, 0, 5 * 1000, TimeUnit.MILLISECONDS);
        }
    
        public synchronized void closeReconnectTask() {//程序销毁的时候也记得关闭
            if (reconnectPool != null) {
                reconnectPool.shutdownNow();
                reconnectPool = null;
            }
        }
    
        public void closeMQTT() {//在程序销毁的时候也记得调用
            try {
                closeReconnectTask();
                if (mqttClient != null) {
                    mqttClient.disconnect();
                    mqttClient.close();
                    mqttClient = null;
                    TVLog.d("closeSendMQTT" + topic);
                }
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    
    }
    

    注意:mqttClient.disconnect()和mqttClient.connect(connOpts)都为耗时操作,网络差的时候会阻塞主线程,可以新开线程或者直接用线程池ScheduledExecutorService管理

    结语

    以上就是MQTT的简介、以及由官方Java的通用实现Demo改写后,在Android客户端的实现方式,下一篇会详细讲解阿里专门针对Android客户端的实现方式MqttAndroidClient

    相关文章

      网友评论

        本文标题:MQTT简介和使用

        本文链接:https://www.haomeiwen.com/subject/fwbekctx.html