MQTT基于安卓的使用

作者: Heweii | 来源:发表于2019-07-28 21:41 被阅读0次

    前言

    上一篇提到了MQTT的通用方式,由于智能家居TV的项目网络波动频繁,通用的方式已经无法满足需求,经常会出现重复订阅导致收到多条消息,那就只能另辟蹊径了,最终找到了梦寐以求的MqttAndroidClient。

    1.集成

    集成方式和上一篇的MQTT简介和使用要新增配置,build.gradle新增

    implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
    

    manifest文件里面需要注册服务

    <!-- Mqtt Service -->
            <service android:name="org.eclipse.paho.android.service.MqttService" />
    

    2.MqttAndroidClient重要源码解析

    MqttAndroidClient是专门对MQTTClient的再封装拓展类,包含了订阅、连接以及多线程的处理,直接看MqttAndroidClient对于连接的封装源码,非关键代码已省略

        public IMqttToken connect(MqttConnectOptions options, Object userContext,
                IMqttActionListener callback) throws MqttException {
            if (mqttService == null) { 
            }
            else {
                pool.execute(new Runnable() {
                    @Override
                    public void run() {
                        doConnect();
                    }
                });
            }
            return token;
        }
    

    doConnect()连接操作放在子线程,有效避免网络波动连接时间过长阻塞主线程

    private void doConnect() {
            ...
                mqttService.connect(clientHandle, connectOptions, null,
                        activityToken);
            ...
        }
    

    阿里专门针对安卓客户端写了一个MQTTService,方便统一管理,除了连接操作,重连,断开连接都是在MQTTService中完成。

    public void connect(String clientHandle, MqttConnectOptions connectOptions,
          String invocationContext, String activityToken)throws MqttSecurityException, MqttException {
            MqttConnection client = getConnection(clientHandle);
            client.connect(connectOptions, null, activityToken);
      }
    
    public void connect(MqttConnectOptions options, String invocationContext,
                String activityToken) {
                ...
                if (myClient != null) {
                    if (isConnecting ) {
                    }else if(!disconnected){
                    }
                    else {                  
                        service.traceDebug(TAG, "myClient != null and the client is not connected");
                        service.traceDebug(TAG,"Do Real connect!");
                        setConnectingState(true);
                        myClient.connect(connectOptions, invocationContext, listener);
                    }
                }
                ...
        }
    
    public IMqttToken connect(MqttConnectOptions options, Object userContext, IMqttActionListener callback)throws MqttException, MqttSecurityException {
            final String methodName = "connect";
            if (comms.isConnected()) {
                throw ExceptionHelper.createMqttException(MqttException.REASON_CODE_CLIENT_CONNECTED);
            }
            if (comms.isConnecting()) {
                throw new MqttException(MqttException.REASON_CODE_CONNECT_IN_PROGRESS);
            }
            if (comms.isDisconnecting()) {
                throw new MqttException(MqttException.REASON_CODE_CLIENT_DISCONNECTING);
            }
            if (comms.isClosed()) {
                throw new MqttException(MqttException.REASON_CODE_CLIENT_CLOSED);
            }
            ...
            connectActionListener.connect();
            return userToken;
        }
    

    源码自身对于isConnected、isConnecting、isDisconnecting、isClosed做了异常处理,避免正在连接或者断开连接时连接造成重复连接。后面的源码就没贴的必要了,就是开启一个连接线程。

    3.MqttAndroidClient的使用

    一行代码完成MQTT的连接

    mqttAndroidClient.connect(mqttConnectOptions, null, iMqttActionListener);
    

    当然,这样是远远不够的,在实际应用中发现有一个问题,断网一段时间后重连网络MQTT不会自动重连,所以还得我们来做手动优化。思路很简单,在断开的节点开启重连线程,连接成功后关闭重连线程。以下贴的是完整代码,主要注意重连机制和订阅前记得取消订阅再订阅。(部分包含自己的代码,可以忽略,注释很详细)

    public class MQTTManager {
    
        private Context mContext;
        private MqttAndroidClient mqttAndroidClient;
        private String clientId;//自定义
    
        private MqttConnectOptions mqttConnectOptions;
    
        private ScheduledExecutorService reconnectPool;//重连线程池
    
        public MQTTManager(Context mContext) {
            this.mContext = mContext;
        }
    
        public void buildClient() {
            closeMQTT();//先关闭上一个连接
    
            buildMQTTClient();
        }
    
        private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
            @Override
            public void onSuccess(IMqttToken asyncActionToken) {
                TVLog.i("connect-"+"onSuccess");
                closeReconnectTask();
                subscribeToTopic();
            }
    
            @Override
            public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                //connect-onFailure-MqttException (0) - java.net.UnknownHostException
                TVLog.i("connect-"+ "onFailure-"+exception);
                startReconnectTask();
            }
        };
    
        private MqttCallback mqttCallback = new MqttCallback() {
            @Override
            public void connectionLost(Throwable cause) {
                //close-connectionLost-等待来自服务器的响应时超时 (32000)
                //close-connectionLost-已断开连接 (32109)
                TVLog.i("close-"+"connectionLost-"+cause);
                if (cause != null) {//null表示被关闭
                    startReconnectTask();
                }
            }
    
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                String body = new String(message.getPayload());
                TVLog.i("messageArrived-"+message.getId()+"-"+body);
            }
    
            @Override
            public void deliveryComplete(IMqttDeliveryToken token) {
                try {
                    TVLog.i("deliveryComplete-"+token.getMessage().toString());
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        };
    
        private void buildMQTTClient(){
            mqttAndroidClient = new MqttAndroidClient(mContext, MQTTCons.Broker, clientId);
            mqttAndroidClient.setCallback(mqttCallback);
    
            mqttConnectOptions = new MqttConnectOptions();
            mqttConnectOptions.setConnectionTimeout(10);
            mqttConnectOptions.setKeepAliveInterval(20);
            mqttConnectOptions.setCleanSession(true);
            try {
                mqttConnectOptions.setUserName("Signature|" + MQTTCons.AcessKey + "|" + MQTTCons.instanceId);
                mqttConnectOptions.setPassword(MacSignature.macAndSignature(clientId, MQTTCons.SecretKey).toCharArray());
            } catch (Exception e) {
            }
            doClientConnection();
        }
    
        private synchronized void startReconnectTask(){
            if (reconnectPool != null)return;
            reconnectPool = Executors.newScheduledThreadPool(1);
            reconnectPool.scheduleAtFixedRate(new Runnable() {
                @Override
                public void run() {
                    doClientConnection();
                }
            } , 0 , 5*1000 , TimeUnit.MILLISECONDS);
        }
    
        private synchronized void closeReconnectTask(){
            if (reconnectPool != null) {
                reconnectPool.shutdownNow();
                reconnectPool = null;
            }
        }
    
        /**
         * 连接MQTT服务器
         */
        private synchronized void doClientConnection() {
            if (!mqttAndroidClient.isConnected()) {
                try {
                    mqttAndroidClient.connect(mqttConnectOptions, null, iMqttActionListener);
                    TVLog.d("mqttAndroidClient-connecting-"+mqttAndroidClient.getClientId());
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        }
    
        private void subscribeToTopic() {//订阅之前会取消订阅,避免重连导致重复订阅
            try {
                String registerTopic = "";//自定义
                String controlTopic = "";//自定义
                String[] topicFilter=new String[]{registerTopic , controlTopic };
                int[] qos={0,0};
                mqttAndroidClient.unsubscribe(topicFilter, null, new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
                        TVLog.i("unsubscribe-"+"success");
                    }
    
                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                        TVLog.i("unsubscribe-"+"failed-"+exception);
                    }
                });
                mqttAndroidClient.subscribe(topicFilter, qos, null, new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {//订阅成功
                        TVLog.i("subscribe-"+"success");
                    }
    
                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
    //                    startReconnectTask();
                        TVLog.i("subscribe-"+"failed-"+exception);
                    }
                });
    
            } catch (MqttException ex) {
            }
        }
    
        public void sendMQTT(String topicSep, String msg) {
            try {
                if (mqttAndroidClient == null)return;
                MqttMessage message = new MqttMessage();
                message.setPayload(msg.getBytes());
                String topic = "";//自定义
                mqttAndroidClient.publish(topic, message, null, new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
    //                    TVLog.i("sendMQTT-"+"success:" + msg);
                    }
    
                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
    //                    startReconnectTask();
                        TVLog.i("sendMQTT-"+"failed:" + msg);
                    }
                });
            } catch (MqttException e) {
            }
        }
    
        public void closeMQTT(){
            closeReconnectTask();
            if (mqttAndroidClient != null){
                try {
                    mqttAndroidClient.unregisterResources();
                    mqttAndroidClient.disconnect();
                    TVLog.i("closeMQTT-"+mqttAndroidClient.getClientId());
                    mqttAndroidClient = null;
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
        }
    
    }
    

    调用方式如下,想要做线程安全的单例的可以自己封装

    if (mqttManager == null)
                mqttManager = new MQTTManager(getApplicationContext());
    mqttManager.buildClientId();
    

    结语

    基于安卓部分也算是完结了,里面也夹杂着一些源码解释,后续会写更多关于源码的解析。

    相关文章

      网友评论

        本文标题:MQTT基于安卓的使用

        本文链接:https://www.haomeiwen.com/subject/rctkrctx.html