美文网首页Android常用功能
Android——MQTT推送

Android——MQTT推送

作者: 海晨忆 | 来源:发表于2018-05-11 16:23 被阅读17次

    个人博客:haichenyi.com。感谢关注

    整体介绍

      最近公司用到的推送MQTT。不想过多的介绍背景什么的,我就直接讲怎么实现这个功能。

      他这个原理长连接,这个不用多讲,用法类似于EventBus,需要先订阅,然后通过topic再发送消息。topic是什么呢?我先来讲讲整体流程:

    1. 先连接服务器,要先建立长连接

    2. 然后需要订阅topic,连接之后才能订阅topic

    3. 最后就是通过topic推送消息,接收消息

    一步一步讲:

    第一步,与服务器建立连接

      先丢代码,然后看注释:

    private void initPush() {
            // 服务器地址(协议+地址+端口号)
            String uri = host;
            client = new MqttAndroidClient(this, uri, clientId);
            // 设置MQTT监听并且接受消息
            client.setCallback(mqttCallback);
            //Mqtt的一些设置
            conOpt = new MqttConnectOptions();
            conOpt.setAutomaticReconnect(true);
            // 清除缓存
            conOpt.setCleanSession(true);
            // 设置超时时间,单位:秒
            conOpt.setConnectionTimeout(10);
            // 心跳包发送间隔,单位:秒
            conOpt.setKeepAliveInterval(20);
            myTopic = String.format(TOPIC_SUB, mDeviceId);
            Log.e(TAG,"myTopic_________"+myTopic);
            doClientConnection();
        }
    

      上面的这些参数,我碰到了两个问题。

      上面的这些参数,我碰到了两个问题。

      上面的这些参数,我碰到了两个问题。

    1. 第一个问题,与服务器建立连接,你得先有一个服务器吧?我根据网上的步骤,创建了一个apache-apollo服务器,并且启动了,也启动成功了,我建立连接的时候,总是失败。然后,找啊找,找啊找。问题没有解决,但是,我找到了一个可以用的服务器,也就是这里的uri,不要设置MqttConnectOptions的用户名和密码,设置了他会拒绝
    private String host = "tcp://test.mosquitto.org:1883";
    
    1. 第二个问题,我连接成功之后,不一会,他就会自动断开连接,或者,推送完消息之后,他就会断开连接。然后,网上搜原因,找啊找,诶,我找到了。MqttAndroidClient的构造方法:
    /**
         * Constructor - create an MqttAndroidClient that can be used to communicate with an MQTT server on android
         * 
         * @param context 
         *            object used to pass context to the callback. 
         * @param serverURI
         *            specifies the protocol, host name and port to be used to
         *            connect to an MQTT server
         * @param clientId
         *            specifies the name by which this connection should be
         *            identified to the server
         */
        public MqttAndroidClient(Context context, String serverURI,
                String clientId) {
            this(context, serverURI, clientId, null, Ack.AUTO_ACK);
        }
    

    看第三个参数,clientId,指定一个名字,用来连接服务器的身份标识。就是说,你设置的这个值,是你在服务器的唯一标识,不能跟其他用户的相同。我把这个clientId直接用uuid生成,就没问题了。

    第二步,订阅topic

      回到上面,接着往下面走,

    /**
         * 连接MQTT服务器
         */
        private void doClientConnection() {
            if (!client.isConnected() && isConnectIsNormal()) {
                try {
                    client.connect(conOpt, null, iMqttActionListener);
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
    
        }
        
        /**
         * 判断网络是否连接
         */
        private boolean isConnectIsNormal() {
            ConnectivityManager connectivityManager = (ConnectivityManager) this.getApplicationContext()
                    .getSystemService(Context.CONNECTIVITY_SERVICE);
            if (connectivityManager != null) {
                NetworkInfo info = connectivityManager.getActiveNetworkInfo();
                if (info != null && info.isAvailable()) {
                    String name = info.getTypeName();
                    Log.e(TAG, "MQTT当前网络名称:" + name);
                    return true;
                } else {
                    Log.e(TAG, "MQTT 没有可用网络");
                    return false;
                }
            } else {
                return false;
            }
        }
    

      这个方法就是用来连接服务器的,首先判断是否正在连接,后面那个是判断当前有没有网络。再就是这个iMqttActionListener监听了

    // MQTT是否连接成功
        private IMqttActionListener iMqttActionListener = new IMqttActionListener() {
    
            @Override
            public void onSuccess(IMqttToken arg0) {
                Log.e(TAG, "连接成功 ");
                try {
                    // 订阅myTopic话题
                    client.subscribe(myTopic, 0);
                } catch (MqttException e) {
                    e.printStackTrace();
                }
            }
    
            @Override
            public void onFailure(IMqttToken arg0, Throwable arg1) {
                Log.e(TAG, "连接失败");
                arg1.printStackTrace();
                // 连接失败,重连
            }
        };
    

      讷,就是这里,你如果服务器有问题,他一直走onFailure方法。服务器连接成功之后,就是订阅topic。我来说说这个

    client.subscribe(myTopic, 0);
    

      首先,这个主题,是你自己跟服务器商量好的,随便什么都可以。为什么要订阅主题呢?我提前给你瞅瞅推送消息是怎么推送的

      第二个参数,消息的类型qos,有三种:0、1、2

    1. 0代表“至多一次”,消息发布完全依赖底层 TCP/IP 协议。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送
    2. 1代表“至少一次”,确保消息到达,但消息重复可能会发生
    3. 2代表“只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。

      简单说明下,如果发送的是临时的消息,例如给某topic所有在线的设备发送一条消息,丢失的话也无所谓,0就可以了。如果需要客户端保证能接收消息,需要指定QoS为1

    client.publish(topic, new MqttMessage(msg.getBytes()));
    

      讷,推送消息,是根据topic推送的,第二个参数,就是你要推送的具体消息。我个人认为,你可以理解成就类似于键值对的形式,

    不同的用户可以订阅相同的主题

    不同的用户可以订阅相同的主题

    不同的用户可以订阅相同的主题

      这个就是跟其他长连接不同的地方,底层,其实都一样,虽然我没有看底层的代码。想也想的到,服务器肯定是根据这个主题,去找对应的用户,然后推送消息。而其他的长连接就是直接指定用户。跑题了,跑题了。

    第三步,推送、接收消息

      当你连接服务器成功之后,就要推送消息了,我用的EventBus发的

    private void publishData(String msg) {
            String topic = myTopic;
            try {
                Log.e(TAG,"给__"+topic+"__topic发送的消息为:"+msg);
                client.publish(topic, new MqttMessage(msg.getBytes()));
            } catch (MqttException e) {
                e.printStackTrace();
            }
        }
        
        // MQTT监听并且接受消息
        private MqttCallback mqttCallback = new MqttCallback() {
    
            @Override
            public void messageArrived(String topic, MqttMessage message) throws Exception {
                Log.e(TAG,"接受到__"+topic+"__topic的消息为:"+new String(message.getPayload()));
            }
    
            @Override
            public void deliveryComplete(IMqttDeliveryToken arg0) {
                Log.e(TAG,"deliveryComplete");
            }
    
            @Override
            public void connectionLost(Throwable arg0) {
                // 失去连接,重连
                Log.e(TAG,"失去连接");
            }
        };
    

      当你的clientId重复的时候,他就会一直走connectionLost方法。到这里,基本上就讲完了,要注意的是,退出的时候,记得要释放资源

    @Override
        public void onDestroy() {
            try {
                if (client != null && client.isConnected()) {
                    client.disconnect();
                }
            } catch (MqttException e) {
                e.printStackTrace();
            }
            EventBus.getDefault().unregister(this);
            super.onDestroy();
        }
    

    网上很多都是直接讲整体流程,重来不讲中间碰到的问题。难受

    项目链接

    相关文章

      网友评论

        本文标题:Android——MQTT推送

        本文链接:https://www.haomeiwen.com/subject/kqofdftx.html