美文网首页
Android Mqtt 订阅接收消息

Android Mqtt 订阅接收消息

作者: satisfying | 来源:发表于2019-01-10 10:40 被阅读0次

    添加引用

            compile 'org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.1.0'
            compile 'org.eclipse.paho:org.eclipse.paho.android.service:1.1.1'
    

    功能封装,我这里订阅的topic 和clientId一样,有需要的可以自行修改

      class PushClient : MqttCallback {
    
        private val TAG = "PushClient"
    
    
        //mqtt连接client
        var mClient: MqttAndroidClient? = null
        //mqtt连接参数设置
        var options: MqttConnectOptions? = null;
        /*连接成功之后设置连接断开的缓冲配置*/
        var disconnectedBufferOptions: DisconnectedBufferOptions? = null
        var context: Context? = null
        var countDownTimer: CountDownTimer? = null//倒计时重连
        var exit = false//是否退出
    
        constructor(host: String, port: Int, context: Context, clientId: String) {
    
            this.context = context;
            exit = false
            var uri = "tcp://$host:$port"
    //        var uri = "ssl://$host:$port"
            mClient = MqttAndroidClient(context, uri, clientId)
    
            mClient?.setCallback(this)
    
            //mqtt连接参数设置
            options = MqttConnectOptions();
            //设置自动重连
            options?.isAutomaticReconnect = false;
            // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录
            // 这里设置为true表示每次连接到服务器都以新的身份连接
            options?.isCleanSession = false;
            // 设置超时时间 单位为秒
            options?.connectionTimeout = 5;
            // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制
            options?.keepAliveInterval = 20;
    
    
            /*连接成功之后设置连接断开的缓冲配置*/
            disconnectedBufferOptions = DisconnectedBufferOptions();
            //开启
            disconnectedBufferOptions?.isBufferEnabled = true;
            //离线后最多缓存100调
            disconnectedBufferOptions?.bufferSize = 100;
            //不一直持续留存
            disconnectedBufferOptions?.isPersistBuffer = false;
            //删除旧消息
            disconnectedBufferOptions?.isDeleteOldestMessages = false;
        }
    
        companion object {
            fun create(host: String, port: Int, context: Context, clientId: String): PushClient {
                return PushClient(host = host, port = port, context = context, clientId = clientId)
            }
        }
    
        /**
         * 链接
         */
        fun connect() {
    
            Log.e(TAG, "链接${mClient?.serverURI}")
            mClient?.connect(options, null, object : IMqttActionListener {
                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.e(TAG, "链接失败")
                    resetConnect()
                }
    
                override fun onSuccess(asyncActionToken: IMqttToken?) {
    
                    Log.e(TAG, "链接成功")
                    if (countDownTimer != null) {
                        countDownTimer?.cancel()
                        countDownTimer = null
                    }
                    subscribeToTopic()
                }
            })
        }
        
    
        /**
         * 3s后重新链接
         */
        fun resetConnect() {
            countDownTimer = object : CountDownTimer(3000, 1000) {
                override fun onTick(millisUntilFinished: Long) {
                    Log.e(TAG, "等待重新链接  $millisUntilFinished")
                }
    
                override fun onFinish() {
                    Log.e(TAG, "重新链接")
                    connect()
                }
            }.start()
        }
    
        override fun messageArrived(topic: String?, message: MqttMessage?) {
            Log.e(TAG, "messageArrived")
        }
    
        /**
         * @desc 连接断开回调
         * 可在这里做一些重连等操作
         */
        override fun connectionLost(cause: Throwable?) {
            Log.e(TAG, "connectionLost")
            if (!exit) {
                resetConnect()
            }
        }
    
        override fun deliveryComplete(token: IMqttDeliveryToken?) {
            Log.e(TAG, "deliveryComplete")
        }
    
    
        /**
         * 断开链接
         */
        fun disConnect() {
            exit = true
            if (countDownTimer != null) {
                countDownTimer?.cancel()
                countDownTimer = null
            }
            Log.e(TAG, "断开链接")
            mClient?.disconnect(null, object : IMqttActionListener {
                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.e(TAG, "断开链接失败")
    
                }
    
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.e(TAG, "断开链接成功")
                }
    
            })
        }
    
    
        /**
         * 订阅主题
         */
        var set = false
    
        fun subscribeToTopic() {
            Log.e(TAG, "消息订阅${mClient?.clientId}")
    
            mClient?.setBufferOpts(disconnectedBufferOptions)
    
            //主题、QOS、context,订阅监听,消息监听
            var token = mClient?.subscribe(mClient?.clientId, 2, null, object : IMqttActionListener {
                override fun onSuccess(asyncActionToken: IMqttToken?) {
                    Log.e(TAG, "订阅成功")
                }
    
                override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                    Log.e(TAG, "订阅失败")
                }
            }) { topic, message ->
    
                Log.e(TAG, "$topic  消息到达 $message")
                showMessage(message = message.toString())
            }
            Log.e(TAG, "token ${token?.isComplete}")
        }
    
        /**
         * 是否链接
         */
        fun isConnected(): Boolean {
            return if (mClient == null) {
                false
            } else {
                mClient!!.isConnected
            }
        }
    
        private fun showMessage(message: String) {
    //项目之前有极光推送,为了方便就直接极光本地推送
            val jsonObject = JSONObject(message)
            val ln = JPushLocalNotification()
            ln.extras = message
            ln.builderId = 0
            ln.content = jsonObject.getString("content")
    //        ln.extras = " {\"content\":\"您收到一个案件处理任务,请及时处理\",\"messageId\":215,\"realId\":285,\"summary\":\"\",\"title\":\"处理案件提醒\",\"type\":200,\"url\":\"\"}";
            ln.title = jsonObject.getString("title")
            ln.notificationId = 11111111
            ln.broadcastTime = System.currentTimeMillis() + 1000 * 1
            JPushInterface.addLocalNotification(context, ln)
        }
    
    }
    

    相关文章

      网友评论

          本文标题:Android Mqtt 订阅接收消息

          本文链接:https://www.haomeiwen.com/subject/amuorqtx.html