美文网首页AndroidAndroid 复习学习使用
Eclipse Paho 实现Android推送

Eclipse Paho 实现Android推送

作者: 贝贝beibei96 | 来源:发表于2018-01-03 16:05 被阅读384次

    Eclipse Paho:是Eclipse提供的一个访问MQTT服务器的一种开源客户端库。

    MQTT Client 对照表

    Eclipse目前提供十种不同语言平台的客户端类库,
    对于Java平台而言和MQTT服务器交互的开源框架还有很多, 例如:
    Eclipse Paho Java、 Xenqtt、 MeQanTT、 Fusesource mqtt -client、 moquette 等等...
    但是, 根据GIthub上使用次数来讲Eclipse Paho无疑是主流, 就个人使用而已, Eclipse Paho集成非常方便、简单。
    对MQTT协议不是很了解的可以看一下
    Eclipse Paho 官网
    Paho.client.mqttv3 在线API

    注意:

    QoS:服务质量,其作用是当网络过载或拥塞时,QoS 能确保重要业务量不受延迟,同时保证网络的高效运行

    • 服务质量0 - 表示一条消息最多应该发送一次(零次或一次)。该消息不会持久保存到磁盘,并且不会通过网络进行确认。这种QoS是最快的,但只能用于无价值的消息
    • 服务质量1 - 表示一条消息应至少传递一次(一次或多次)。该消息只能在持久存在的情况下才能被安全地传递,因此应用程序必须提供一种持久化方法。如果未指定持久性机制,则在发生客户端故障时不会传递消息。该消息将通过网络得到确认。这是默认的QoS。
    • 服务质量2 - 表示应该传递一次消息。该消息将被保存到磁盘,并且将受到整个网络的两阶段确认。该消息只能在持久存在的情况下才能被安全地传递,因此应用程序必须提供一种持久化方法。如果未指定持久性机制,则在发生客户端故障时不会传递消息。

    如果未配置持久性,则在发生网络或服务器问题时,仍会传送QoS 1和2消息,因为客户端将在内存中保持状态。如果MQTT客户端关闭或失败,并且未配置持久性,则由于客户端状态将丢失,因此无法保持QoS 1和2消息的传输。

    publish: 向服务器上的主题发布消息, 主要用于即时通讯(IoT、 聊天...)

    subscribe: 订阅主题, 服务器通过你订阅的主题向你发布消息, 这样就能实现服务器向APP推送信息了

    ClientId: 客户端ID应该是唯一的, 多个用户同时使用同一个ID创建连接会不断挤线互踢导致连接不断中断, 服务器也是通过ID来区分不同用户的操作


    开始使用Eclipse Paho



    1. 配置maven库

    
    repositories {
    
    ...
    
        maven {
    
            url "https://repo.eclipse.org/content/repositories/paho-snapshots/"
    
        }
    
    ...
    
    }
    
    



    2.添加依赖

    implementation 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0'  // MQTT Eclipse paho
    



    3.添加权限

    <uses-permission android:name="android.permission.WAKE_LOCK"/>
    <uses-permission android:name="android.permission.WRITE_EXTERNAL_STORAGE"/>
    <uses-permission android:name="android.permission.INTERNET"/>
    



    4.编写逻辑代码

    4.1新建一个MqttManager类, 用于MQTT操作的管理

    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MqttDefaultFilePersistence;
    
    /**
     * @author Ai(陈祥林)
     * @date 2018/1/3  10:37
     * @email Webb@starcc.cc
     */
    public class MqttManager {
    
        private static MqttManager mInstance = null;
        /**
         * Mqtt回调
         */
        private MqttCallback mCallback;
        /**
         * Mqtt客户端
         */
        private static MqttClient client;
        /**
         * Mqtt连接选项
         */
        private MqttConnectOptions conOpt;
    
        private MqttManager() {
            mCallback = new MqttCallbackBus();
        }
    
        public static MqttManager getInstance() {
            if (null == mInstance) {
                synchronized (MqttManager.class) {
                    if (mInstance == null) {
                        mInstance = new MqttManager();
                    }
                }
            }
            return mInstance;
        }
    
        /**
         * 释放单例, 及其所引用的资源
         */
        public static void release() {
            try {
                if (mInstance != null) {
                    disConnect();
                    mInstance = null;
                }
            } catch (Exception e) {
                Log.e("MqttManager", "release : " + e.toString());
            }
        }
    
        /**
         * 创建Mqtt 连接
         *
         * @param brokerUrl Mqtt服务器地址(tcp://xxxx:1863)
         * @param userName  用户名
         * @param password  密码
         * @param clientId  客户端Id
         */
        public void creatConnect(String brokerUrl, String userName,
                                 String password, String clientId, String topic) {
            // 获取默认的临时文件路径
            String tmpDir = System.getProperty("java.io.tmpdir");
    
            /*
             * MqttDefaultFilePersistence:
             * 将数据包保存到持久化文件中,
             * 在数据发送过程中无论程序是否奔溃、 网络好坏
             * 只要发送的数据包客户端没有收到,
             * 这个数据包会一直保存在文件中,
             * 直到发送成功为止。
             */
            // Mqtt的默认文件持久化
            MqttDefaultFilePersistence dataStore = new MqttDefaultFilePersistence(tmpDir);
            try {
                // 构建包含连接参数的连接选择对象
                conOpt = new MqttConnectOptions();
                // 设置Mqtt版本
                conOpt.setMqttVersion(MqttConnectOptions.MQTT_VERSION_3_1_1);
                // 设置清空Session,false表示服务器会保留客户端的连接记录,true表示每次以新的身份连接到服务器
                conOpt.setCleanSession(false);
                // 设置会话心跳时间,单位为秒
                // 客户端每隔10秒向服务端发送心跳包判断客户端是否在线
                conOpt.setKeepAliveInterval(10);
                // 设置账号
                if (userName != null) {
                    conOpt.setUserName(userName);
                }
                // 设置密码
                if (password != null) {
                    conOpt.setPassword(password.toCharArray());
                }
                // 最后的遗言(连接断开时, 发动"close"给订阅了topic该主题的用户告知连接已中断)
                conOpt.setWill(topic, "close".getBytes(), 2, true);
                // 客户端是否自动尝试重新连接到服务器 
                conOpt.setAutomaticReconnect(true);
                // 创建MQTT客户端
                client = new MqttClient(brokerUrl, clientId, dataStore);
                // 设置回调
                client.setCallback(mCallback);
                // 连接
                doConnect();
         
            } catch (MqttException e) {
                Log.e("MqttManager", "creatConnect : " + e.toString());
    
            }
    
        }
    
    
        /**
         * 建立连接
         */
        public void doConnect() {
            if (client != null) {
                try {
                    client.connect(conOpt);
                } catch (Exception e) {
                    Log.e("MqttManager", "doConnect : " + e.toString());
                }
            }
        }
    
    
        /**
         * 发布消息
         *
         * @param topicName 主题名称
         * @param qos       质量(0,1,2)
         * @param payload   发送的内容
         */
        public void publish(String topicName, int qos, byte[] payload) {
            if (client != null && client.isConnected()) {
                // 创建和配置一个消息
                MqttMessage message = new MqttMessage(payload);
                message.setPayload(payload);
                message.setQos(qos);
                try {
                    client.publish(topicName, message);
                } catch (MqttException e) {
                    Log.e("MqttManager", "publish : " + e.toString());
                }
            }
        }
    
    
         public void publish(String topicName, int qos, String payload) {
            if (client != null && client.isConnected()) {
                // 创建和配置一个消息
                MqttMessage message = new MqttMessage(payload.getBytes);
                message.setPayload(payload.getBytes);
                message.setQos(qos);
                try {
                    client.publish(topicName, message);
                } catch (MqttException e) {
                    Log.e("MqttManager", "publish : " + e.toString());
                }
            }
        }
    
    
        /**
         * 订阅主题
         *
         * @param topicName 主题名称
         * @param qos      消息质量
         */
        public void subscribe(String topicName, int qos) {
            if (client != null && client.isConnected()) {
                try {
                    client.subscribe(topicName, qos);
                } catch (MqttException e) {
                    Log.e("MqttManager", "subscribe : " + e.toString());
                }
            }
        }
    
    
        /**
         * 取消连接
         */
        public static void disConnect() throws MqttException {
            if (client != null && client.isConnected()) {
                client.disconnect();
            }
        }
    
    
        /**
         * 判断是否连接
         */
        public static boolean isConnected() {
            return client != null && client.isConnected();
        }
    }
    

    4.2新建一个MqttCallbackBus类用于MQTT异步回调
    我是使用自己封装的EventBus发送粘性事件的方式来处理回调的内容
    对EventBus不熟悉可以使用Handler来处理

    import com.zhanyun.key.eventbus.EventModel;
    import com.zhanyun.key.eventbus.EventBusUtil;
    
    import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
    import org.eclipse.paho.client.mqttv3.MqttCallback;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    
    /**
     * @author Ai(陈祥林)
     * @date 2018/1/3  10:45
     * @email Webb@starcc.cc
     */
    public class MqttCallbackBus implements MqttCallback {
    
        /**
         * 连接中断
         */
        @Override
        public void connectionLost(Throwable cause) {
            Log.e("MqttManager", "cause : " + cause.toString());
            // 可在此方法内写重连的逻辑 
        }
    
    
        /**
         * 消息送达
         */
        @Override
        public void messageArrived(String topic, MqttMessage message) throws Exception {
            Log.e("MqttManager", "topic : " + topic + "\t MqttMessage : " + message.toString());
            EventBusUtil.sendStickyEvent(new EventModel(10001, topic));
            EventBusUtil.sendStickyEvent(new EventModel(10010, message));
        }
    
    
        /**
         * 交互完成
         */
        @Override
        public void deliveryComplete(IMqttDeliveryToken token) {
            Log.e("MqttManager", "token : " + token.toString());
        }
    }
    



    5.使用
    在Activity中调用

    /**
    * 第一个参数是服务器地址
    * 第二个参数是用户名
    * 第三个参数是密码
    * 第四个参数是客户端ID
    * 第五个参数是主题
    **/
    MqttManager
            .getInstance()
            .creatConnect(
                  brokerUrl,
                  userName,
                  password,
                  clientId, 
                  topic);
    
    
    
    运行后打印服务端返回的数据


    6.总结
    MQTT是基于TCP/IP的, 手机锁屏时会阻塞TCP, 导致MQTT中断, MqttConnectOptions设置isAutomaticReconnect()为true时可自动重连, 但多个相同的ClientID同时创建连接时会无限的连接中断和自动连接(注意)

    相关文章

      网友评论

      • yoyochoo:需要加这个吗?implementation'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
        贝贝beibei96:@yoyochoo 里面暂时没用到, 可以不加
      • guguyu:不错
        贝贝beibei96:@guguyu 还有优化的, 还没有时间修改, hhhhh
      • vvengzt:好文好文

      本文标题:Eclipse Paho 实现Android推送

      本文链接:https://www.haomeiwen.com/subject/dxghnxtx.html