美文网首页移动互联网Android开发mqtt
Android上简化使用Mqtt client

Android上简化使用Mqtt client

作者: LichFaker | 来源:发表于2016-03-28 00:49 被阅读10627次

    关于Mqtt

    Mqtt3.1.1中文文档
    这是一个客户端服务端架构的发布/订阅模式的消息传输协议;轻巧、开放、简单、规范,易于实现,国内很多关于Mqtt的文章都是讲推送的,其实Mqtt远远不止这个功能,具体详情可自行文档查阅。

    场景说明

    在Android上,往往具体的业务逻辑要回到Activity中,而Mqtt连接则是建立在service中(考虑到内存资源及退出应用后续操作),同时Mqtt的消息透传回调也是在service中。
    通常做法是要写很多类(通常是Boardcast)来实现确保service同Activity组件之间的通信,就像Eclipse paho的Android service一样。

    改进方案

    本文基于Eclipse的paho框架的java client端,在Android上引入EventBus来做事件分发,简化了代码量,同时方便实现。
    以下是我写的一个简单的工具操作类:

    public class MqttManager {
        // 单例
        private static MqttManager mInstance = null;
        // 回调
        private MqttCallback mCallback;
        // Private instance variables
        private MqttClient client;
        private MqttConnectOptions conOpt;
        private boolean clean = true; 
        private MqttManager() {
            mCallback = new MqttCallbackBus();
        }
        public static MqttManager getInstance() {
            if (null == mInstance) {
                mInstance = new MqttManager();
            }
            return mInstance;
        }
        /**     * 释放单例, 及其所引用的资源     */
        public static void release() {
            try {
                if (mInstance != null) {
                    mInstance.disConnect();
                    mInstance = null;
                }
            } catch (Exception e) {
            }
        }
        /**
         * 创建Mqtt 连接
         *
         * @param brokerUrl Mqtt服务器地址(tcp://xxxx:1863)
         * @param userName  用户名
         * @param password  密码
         * @param clientId  clientId
         * @return
         */
        public boolean creatConnect(String brokerUrl, String userName, String password, String clientId) {
            boolean flag = false;
            String tmpDir = System.getProperty("java.io.tmpdir");
            MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
            try {
                // Construct the connection options object that contains connection parameters
                // such as cleanSession and LWT
                conOpt = new MqttConnectOptions();
                conOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
                conOpt.setCleanSession(clean);
                if (password != null) {
                    conOpt.setPassword(password.toCharArray());
                }
                if (userName != null) {
                    conOpt.setUserName(userName);
                }
                // Construct an MQTT blocking mode client
                client = new MqttClient(brokerUrl, clientId, dataStore);
                // Set this wrapper as the callback handler
                client.setCallback(mCallback);
                flag = doConnect();
            } catch (MqttException e) {
                Logger.e(e.getMessage());
            }
            return flag;
        }
        /**
         * 建立连接
         *
         * @return
         */
        public boolean doConnect() {
            boolean flag = false;
            if (client != null) {
                try {
                    client.connect(conOpt);
                    Logger.d("Connected to " + client.getServerURI() + " with client ID " + client.getClientId());
                    flag = true;
                } catch (Exception e) {
                }
            }
            return flag;
        }
        /**
         * Publish / send a message to an MQTT server
         *
         * @param topicName the name of the topic to publish to
         * @param qos       the quality of service to delivery the message at (0,1,2)
         * @param payload   the set of bytes to send to the MQTT server
         * @return boolean
         */
        public boolean publish(String topicName, int qos, byte[] payload) {
            boolean flag = false;
            if (client != null && client.isConnected()) {
                Logger.d("Publishing to topic \"" + topicName + "\" qos " + qos); 
               // Create and configure a message
                MqttMessage message = new MqttMessage(payload);            message.setQos(qos);
                // Send the message to the server, control is not returned until
                // it has been delivered to the server meeting the specified
                // quality of service.
                try {
                    client.publish(topicName, message);
                    flag = true;
                } catch (MqttException e) {
                }
            }
            return flag;
        }
        /**
         * Subscribe to a topic on an MQTT server
         * Once subscribed this method waits for the messages to arrive from the server
         * that match the subscription. It continues listening for messages until the enter key is
         * pressed.
         *
         * @param topicName to subscribe to (can be wild carded)
         * @param qos       the maximum quality of service to receive messages at for this subscription
         * @return boolean
         */
        public boolean subscribe(String topicName, int qos) {
            boolean flag = false;
            if (client != null && client.isConnected()) {
                // Subscribe to the requested topic
                // The QoS specified is the maximum level that messages will be sent to the client at.
                // For instance if QoS 1 is specified, any messages originally published at QoS 2 will
                // be downgraded to 1 when delivering to the client but messages published at 1 and 0
                // will be received at the same level they were published at.
                Logger.d("Subscribing to topic \"" + topicName + "\" qos " + qos);
                try {
                    client.subscribe(topicName, qos);
                    flag = true;
                } catch (MqttException e) {
                }
            }
            return flag;
        }
        /**
         * 取消连接
         *
         * @throws MqttException
         */
        public void disConnect() throws MqttException {
            if (client != null && client.isConnected()) {  
              client.disconnect();
            }
        }
    }
    

    在Mqtt Callback方法中发送Evenet事件:

    public class MqttCallbackBus implements MqttCallback {
        @Override
        public void connectionLost(Throwable cause) {
            Logger.e(cause.getMessage());
        }
        @Override
        public void messageArrived(String topic, MqttMessage message) {
            Logger.d(topic + "====" + message.toString());
            EventBus.getDefault().post(message);
        }
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
        }
    }
    

    问题

    1. 由于EventBus不支持跨进程通信,所以当在service是独立进程时,无法在主进程中接收到EventBus分发的事件。
      可以考虑通过广播做中转,即在Mqtt的回调中发送一个广播,在广播中进行事件分发。
    • 连接等操作是较费时的,为了避免ANR,应在子线程中完成操作,也可以考虑使用RxJava来简化。

    Demo

    https://github.com/LichFaker/MqttClientAndroid

    相关文章

      网友评论

      • tmyzh:服务器要自己搭建吗 有没有可以测试的服务器地址
      • 710a3cdd1bef:楼主,如果要做加密,该怎么写
        710a3cdd1bef:try {

        //取得SSL的SSLContext实例
        SSLContext sslContext = SSLContext.getInstance("TLS");
        //取得KeyManagerFactory和TrustManagerFactory的X509密钥管理器实例
        TrustManagerFactory trustManager = TrustManagerFactory.getInstance("X509");
        //取得BKS密库实例
        KeyStore tks = KeyStore.getInstance("BKS");
        //加客户端载证书和私钥,通过读取资源文件的方式读取密钥和信任证书
        tks.load(MyApplication.getInstance()
        .getResources()
        .openRawResource(R.raw.client),"123456".toCharArray());
        //初始化密钥管理器
        trustManager.init(tks);
        //初始化SSLContext
        sslContext.init(null,trustManager.getTrustManagers(),null);
        //生成SSLSocket
        factory = sslContext.getSocketFactory();

        } catch (Exception e) {
        e.printStackTrace();
        Log.e("SSL",e.getMessage());
        }
        factory是这样写的吗?
        710a3cdd1bef:@LichFaker options.setSocketFactory(factory);是加上这么一句吗?然后factory自己写
        LichFaker:@炽雨焰流 如果是ssl,paho本身是支持的。
      • 天空之城破晓:咨询一个问题:
        客户端和服务器订阅相同的主题 然后客户端向这个主题发布消息 客户端会连接中断:connectionLost
        LichFaker:贴下具体服务器的日志
      • 丨灬柳夢Oo:你好 请问一下 安卓端收不到 服务端发送的 queue 消息 是安卓端不支持吗?
      • juwuguo1988:您好,你这demo链接成功之后,大约两秒之后MqttCallbackBus: 已断开连接,不知道是哪的问题,谢谢
      • Jonas_aaa9:我这里测试后,发现apollo作为mqtt服务器,不支持离线消息。请问下LichFaker,你那里可以接受离线消息么?即A向B发送消息,当前B离线状态,当B上线的时候并不能收到A之前发送的消息
      • 37bb1b7550b5:连接时提示 不可识别的包 是什么意思啊
      • 学霸_你好:请问下,我自己创建了Apollo服务器,client.connect(conOpt);好像有异常,不知道是什么原因呢,是我的网络问题吗,我连的是无线网
      • flu鲁少:老哥请教你下,,我在用mqtt向服务器发消息时,如果发送一条消息,没有收到ACK的话,会出现ANR的情况,请问你有什么好的办法吗
        flu鲁少:@LichFaker 这个是因为我之前用的java的那个client,,后来client 换成androidclient了,问题就没得了
        LichFaker:网络io不是在子线程中进行的吗,没有阻塞主线程,怎么会ANR?
      • leohan1992:我就想说,为什么没人写 断线重连的操作。。。。
        很容易出现 代理程序不可用 (3)的情况,或者reconnect很久没有反应
        LichFaker:@leohan1992 具体什么问题?
        leohan1992:@LichFaker 当服务器用activemq的时候会出问题,在Android端不建议直接用Java的client,我自己封装的它Android的那个
        LichFaker:断线重连自己写啊,策略按实际需求来,在断开连接的那个回调里。 这部分一般会涉及到具体业务接口, 没有放demo上
      • Doctor_ZN:您好看了下您的demo我想咨询下问题:
        1为什么倒包指引入org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0',我看很多项目都引入了compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1';
        2如果反复断网重连的话有什么好的方法重连么,释放资源还需要client.unresoucse之后在disconnect 么;
        3.这种方式连接订阅没有用到回调方式,那么我们不用管是么,成功了怎么知道成功呢。。
        只有一个MqttCallback,不用ConnectCallBackHandler么,

        LichFaker:@Doctor_ZN 1. android.service 是官方针对android写的方便连接调用的库吧,本质上用的就是java client。。。 我没有用那个, 觉得EventBus的方式更方便
        2. 连接状态变更有回调的, 在那处理就可以了, paho本身并不支持重连
        3. 剩下的不知道怎么回答, 看不懂你的提问 :)
        Doctor_ZN:补充一下:看到了本文用的Eclipse的paho框架的java client端,那个有个Android的那个包的那个不好吗?,这个的话会不会有问题呢,那个引入service的,我在连接断开的时候弄蒙圈了
      • c753625a0595:05-24 06:04:02.285 2363-2400/com.lichfaker.mqttclientandroid E/EGL_emulation: tid 2400: eglSurfaceAttrib(1174): error 0x3009 (EGL_BAD_MATCH)
        05-24 06:04:02.285 2363-2400/com.lichfaker.mqttclientandroid W/OpenGLRenderer: Failed to set EGL_SWAP_BEHAVIOR on surface 0xaf81c120, error=EGL_BAD_MATCH
        05-24 06:04:02.301 2363-2363/com.lichfaker.mqttclientandroid W/art: Before Android 4.1, method int android.support.v7.widget.ListViewCompat.lookForSelectablePosition(int, boolean) would have incorrectly overridden the package-private method in android.widget.ListView
        05-24 06:04:50.580 2363-3050/com.lichfaker.mqttclientandroid D/MainActivity.java: ╔════════════════════════════════════════════════════════════════════════════════════════
        05-24 06:04:50.580 2363-3050/com.lichfaker.mqttclientandroid D/MainActivity.java: ║[ (MainActivity.java:40)#Run ]
        05-24 06:04:50.580 2363-3050/com.lichfaker.mqttclientandroid D/MainActivity.java: ╟────────────────────────────────────────────────────────────────────────────────────────
        05-24 06:04:50.586 2363-3050/com.lichfaker.mqttclientandroid D/MainActivity.java: ║ isConnected: false
        05-24 06:04:50.587 2363-3050/com.lichfaker.mqttclientandroid D/MainActivity.java: ╚══════════════════════════════════════════════════════
        这是啥情况?就是连接不了
        LichFaker:@岁月长同你我 跟你的环境有关, 自行搜索 `error 0x3009 (EGL_BAD_MATCH)`, 谢谢
      • a4d0aa3f910a:我也在用mqtt,一直想找connect成功的接口一直没找到,能不能指点一下
        LichFaker:@请叫我汪二 没有, 正常来说连接是在新的线程中操作的,不会阻塞主线程
        0af299bc448b:不知楼主有没有遇到MQTT 连接 ANR问题。从有网到没网的状态下,几次后就出现ANR问题
        LichFaker:试试这个:q.emqtt.com:1883,地址:http://emqtt.com/。 也可以自己搭建一个

      本文标题:Android上简化使用Mqtt client

      本文链接:https://www.haomeiwen.com/subject/qwkjlttx.html