美文网首页物联网与android
Android使用MQTT通讯

Android使用MQTT通讯

作者: MaybeSix | 来源:发表于2019-07-22 11:45 被阅读0次

    前言

    主要讲下Android如何使用MQTT通讯。用到的软件或者框架有:

    EMQ:https://www.emqx.io/cn/
    org.eclipse.paho的MQTT通讯框架:https://github.com/eclipse/paho.mqtt.android

    如果已经有MQTT相关服务,可以跳过第一项,从第二项开始看。

    一、安装EMQ及启动EMQ

    1.安装所需要的依赖包

    $ sudo yum install -y yum-utils device-mapper-persistent-data lvm2
    

    2.使用以下命令设置稳定存储库,以 CentOS7 为例

    $ sudo yum-config-manager --add-repo https://repos.emqx.io/emqx-ce/redhat/centos/7/emqx-ce.repo
    

    3.安装最新版本的 EMQ X

    $ sudo yum install emqx
    

    4.安装特定版本的 EMQ X

    • 查询可用版本
    $ yum list emqx --showduplicates | sort -r
    
    emqx.x86_64                     3.1.0-1.el7                        emqx-stable
    emqx.x86_64                     3.0.1-1.el7                        emqx-stable
    emqx.x86_64                     3.0.0-1.el7                        emqx-stable
    
    • 根据第二列中的版本字符串安装特定版本,例如 3.1.0
    $ sudo yum install emqx-3.1.0
    

    5.启动 EMQ X

    • 直接启动
    $ emqx start
    emqx 3.1.0 is started successfully!
    
    $ emqx_ctl status
    Node 'emqx@127.0.0.1' is started
    emqx v3.1.0 is running
    
    • systemctl 启动
    $ sudo systemctl start emqx
    
    • service 启动
    $ sudo service emqx start
    

    EMQ管理后台

    地址:xxx.xxx.xxx:18083,地址为服务器ip或者域名,端口为18083端口


    image.png

    二、Android使用MQTT

    1.在Android中导入依赖

        implementation 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
        implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'
    

    项目地址:https://github.com/eclipse/paho.mqtt.android
    2.创建MQTT连接的一个Service

    public class MqttMessageService extends Service {
     
        /**
         * 订阅的主题
         */
        private String subTopic;
        @Override
        public void onCreate() {
            super.onCreate();
            //初始化mqtt配置
            initMqtt();
            //连接mqtt
            connectMqtt();
    
        }
    
    
    
        /**
         * 连接mqtt
         */
        private void connectMqtt() {
    
            try {
                mqttAndroidClient.connect(mqttConnectOptions, null, new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
    
                    }
    
                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                        LogUtils.e("MQTT连接失败!!!!" + exception.getCause());
                        new Thread(new Runnable() {
                            @Override
                            public void run() {
                                LogUtils.e("30s后,尝试重新连接" );
                                try {
                                    Thread.sleep(1000*30);
                                } catch (InterruptedException e) {
                                    e.printStackTrace();
                                }
                                connectMqtt();
                            }
                        }).start();
                    }
                });
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
    
        /**
         * mqtt初始化
         */
        private void initMqtt() {
            try {
    
               //写上自己的url
                String uri = "";
                subTopic = "topic_test" ;
                mqttAndroidClient = new MqttAndroidClient(
                        getApplicationContext(),
                        uri, "test");
                mqttAndroidClient.setCallback(new MqttCallbackExtended() {
                    @Override
                    public void connectComplete(boolean reconnect, String serverURI) {
                        if (reconnect) {
                            LogUtils.i("MQTT重新连接成功!serverURI:" + serverURI);
                        } else {
                            LogUtils.i("MQTT连接成功!serverURI:" + serverURI);
                        }
                        subscribeAllTopics();
                    }
    
                    @Override
                    public void connectionLost(Throwable cause) {
                        LogUtils.i("MQTT连接断开!" + cause.getCause());
                    }
    
                    @Override
                    public void messageArrived(String topic, MqttMessage message) {
                        LogUtils.i("收到了消息:"+topic+ message.toString()+ message.getQos());
                    }
    
                    @Override
                    public void deliveryComplete(IMqttDeliveryToken token) {
                        try {
     //                   MqttMessage mqttMessage = token.getMessage();
    //                    LogUtils.i("消息发送成功:" + mqttMessage.toString());
                        } catch (MqttException e) {
                            e.printStackTrace();
                        }
                    }
                });
    
                // 新建连接设置
                mqttConnectOptions = new MqttConnectOptions();
                mqttConnectOptions.setUserName(MqttConfig.EMQ_USERNAME);
                mqttConnectOptions.setPassword(MqttConfig.EMQ_PASSWORD.toCharArray());
                //断开后,是否自动连接
                mqttConnectOptions.setAutomaticReconnect(true);
                //是否清空客户端的连接记录。若为true,则断开后,broker将自动清除该客户端连接信息
                mqttConnectOptions.setCleanSession(true);
                //设置超时时间,单位为秒
                mqttConnectOptions.setConnectionTimeout(15);
                //心跳时间,单位为秒。即多长时间确认一次Client端是否在线
                mqttConnectOptions.setKeepAliveInterval(30);
                //允许同时发送几条消息(未收到broker确认信息)
                mqttConnectOptions.setMaxInflight(30);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
        /**
         * 订阅所有主题
         */
        private void subscribeAllTopics() {
            //订阅主消息主题和更新消息主题
            subscribeToTopic(subTopic , 2);
        }
    
    
        /**
         * 订阅一个主主题
         *
         * @param subTopic 主题名称
         */
        private void subscribeToTopic(String subTopic, int qos) {
            try {
                mqttAndroidClient.subscribe(subTopic, qos, null, new IMqttActionListener() {
                    @Override
                    public void onSuccess(IMqttToken asyncActionToken) {
                        LogUtils.i("MQTT订阅消息成功:" + subTopic);
                    }
    
                    @Override
                    public void onFailure(IMqttToken asyncActionToken, Throwable exception) {
                        LogUtils.d("MQTT订阅消息失败!" + subTopic);
                    }
                });
            } catch (MqttException ex) {
                LogUtils.d("subscribeToTopic: Exception whilst subscribing");
                ex.printStackTrace();
            }
        }
    
        /**
         * 发布主题
         *
         * @param topic 主题
         * @param msg   内容
         * @param qos   qos
         */
        public void publishMessage(String topic, String msg, int qos) {
            if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) {
                try {
                    LogUtils.d("publishMessage: 发送" + msg);
                    mqttAndroidClient.publish(topic, msg.getBytes(), qos, false);
                } catch (Exception e) {
                    LogUtils.e("publishMessage: Error Publishing: " + e.getMessage());
                    e.printStackTrace();
                }
            } else {
                LogUtils.e("publishMessage失败,MQTT未连接 ");
            }
        }
    
        public void publishMessage(String topic, String msg) {
            publishMessage(topic, msg, 2);
        }
    
        @Override
        public int onStartCommand(Intent intent, int flags, int startId) {
            return Service.START_NOT_STICKY;
        }
    
        @Override
        public void onDestroy() {
            LogUtils.e("关闭MQTT");
            //断开mqtt连接
            try {
                if (mqttAndroidClient != null && mqttAndroidClient.isConnected()) {
                    mqttAndroidClient.disconnect();
                    mqttAndroidClient.unregisterResources();
                }
            } catch (Exception e) {
                e.printStackTrace();
            }
      
            super.onDestroy();
        }
    }
    
    

    相关文章

      网友评论

        本文标题:Android使用MQTT通讯

        本文链接:https://www.haomeiwen.com/subject/ktmilctx.html