美文网首页Android打包MQTT
Android实现MQTT客户端

Android实现MQTT客户端

作者: itfitness | 来源:发表于2022-01-28 10:59 被阅读0次

    目录

    MQTT简介

    MQTT是机器对机器(M2M)/物联网(IoT)连接协议。它被设计为一个极其轻量级的发布/订阅消息传输协议。对于需要较小代码占用空间和/或网络带宽非常宝贵的远程连接非常有用,是专为受限设备和低带宽、高延迟或不可靠的网络而设计。这些原则也使该协议成为新兴的“机器到机器”(M2M)或物联网(IoT)世界的连接设备,以及带宽和电池功率非常高的移动应用的理想选择。例如,它已被用于通过卫星链路与代理通信的传感器、与医疗服务提供者的拨号连接,以及一系列家庭自动化和小型设备场景。它也是移动应用的理想选择,因为它体积小,功耗低,数据包最小,并且可以有效地将信息分配给一个或多个接收器。

    效果演示

    这是我连接的MQTT中文网的公共服务器

    基础知识

    这里开发客户端需要的知识不多,paho的核心库都封装好了,我们只需要了解下基础的知识就行了。

    1.连接

    这里我使用的MQTT3.1.1协议,服务器地址是以tcp开头的末尾加上端口号1883

    val server = "tcp://mqtt.p2hp.com:1883" //服务端地址
    

    其他的一些参数如下:
    clientId:(作为客户端的标识),这里我使用的是AndroidID,获取不到的话就使用生成的一个UUID
    CleanSession:设置不持久化的话,每次都是一次新会话
    keepAliveInterval:发送心跳包的时间

     val androidId = DeviceUtils.getAndroidID()
            val clientId = if(!TextUtils.isEmpty(androidId)){
                androidId
            }else{
                UUID.randomUUID().toString()
            }
            mqttClient = MqttAndroidClient(context,serverUrl,clientId)
            connectOptions = MqttConnectOptions().apply {
                isCleanSession = false //是否会话持久化
                connectionTimeout = 30 //连接超时时间
                keepAliveInterval = 10 //发送心跳时间
                userName = name //如果设置了认证,填的用户名
                password = pass.toCharArray() //用户密码
            }
    
    2.订阅和发布

    (1) 订阅
    这里的概念就好像你微博关注了一个博主,然后当博主发布新的动态,你这就可以收到,而这里的订阅就是类似关注,订阅的主题格式跟文件路径差不多,比如订阅一个topic/1,当然这里也有带通配符的订阅方式比如topic/##的意思就是匹配所有,也就是当你订阅了topic/#的主题你就可以收到所有topic开头的主题消息,像topic/1topic/2topic/3等。
    (2) 发布
    发布消息的时候也需要指定一个主题,比如topic/1,但是不能指定带通配符的主题。
    (3) QOS
    另外还有一个比较重要的概念就是QOS(服务质量),这里有3个值(0,1,2),代表的意义如下:
    0:代表,Sender 发送的一条消息,Receiver 最多能收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,也就算了。
    1:代表,Sender 发送的一条消息,Receiver 至少能收到一次,也就是说 Sender 向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,但是因为重传的原因,Receiver 有可能会收到重复的消息。
    2:代表,Sender 发送的一条消息,Receiver 确保能收到而且只收到一次,也就是说 Sender 尽力向 Receiver 发送消息,如果发送失败,会继续重试,直到 Receiver 收到消息为止,同时保证 Receiver 不会因为消息重传而收到重复的消息。

    实现步骤

    1.引入依赖

    这里本来我是用的这个库https://github.com/eclipse/paho.mqtt.android,但是这个库不适配Android12因此我下载了源码调整了下,重新自己封装了一个,如下:

    allprojects {
            repositories {
                ...
                maven { url 'https://jitpack.io' }
            }
        }
    
    
    dependencies {
                implementation 'com.github.itfitness:MQTTAndroid:1.0.0'
        }
    
    2.封装方法

    我对这个库进行了一些封装,如下:

    class MQTTHelper{
        private val mqttClient: MqttAndroidClient
        private val connectOptions: MqttConnectOptions
        private var mqttActionListener: IMqttActionListener? = null
        constructor(context: Context, serverUrl:String, name:String, pass:String){
            val macAddress = DeviceUtils.getAndroidID()
            val clientId = if(!TextUtils.isEmpty(macAddress)){
                macAddress
            }else{
                UUID.randomUUID().toString()
            }
            mqttClient = MqttAndroidClient(context,serverUrl,clientId)
            connectOptions = MqttConnectOptions().apply {
                isCleanSession = false
                connectionTimeout = 30
                keepAliveInterval = 10
                userName = name
                password = pass.toCharArray()
            }
        }
    
        /**
         * 连接
         * @param mqttCallback 接到订阅的消息的回调
         * @param isFailRetry 失败是否重新连接
         */
        fun connect(topic: Topic, qos: Qos, isFailRetry:Boolean, mqttCallback: MqttCallback){
            mqttClient.setCallback(mqttCallback)
            if(mqttActionListener == null){
                mqttActionListener = object :IMqttActionListener{
                    override fun onSuccess(asyncActionToken: IMqttToken?) {
                        LogUtils.eTag("连接","连接成功")
                        subscribe(topic,qos)
                    }
                    override fun onFailure(asyncActionToken: IMqttToken?, exception: Throwable?) {
                        //失败重连
                        LogUtils.eTag("连接","连接失败重试${exception?.message}")
                        if (isFailRetry){
                            mqttClient.connect(connectOptions,null,mqttActionListener)
                        }
                    }
                }
            }
            mqttClient.connect(connectOptions,null,mqttActionListener)
        }
    
        /**
         * 订阅
         */
        private fun subscribe(topic: Topic,qos:Qos){
            mqttClient.subscribe(topic.value(),qos.value())
        }
    
        /**
         * 发布
         */
        fun publish(topic:Topic,message:String,qos:Qos){
            val msg = MqttMessage()
            msg.isRetained = false
            msg.payload = message.toByteArray()
            msg.qos = qos.value()
            mqttClient.publish(topic.value(),msg)
        }
    
        /**
         * 断开连接
         */
        fun disconnect(){
            mqttClient.disconnect()
        }
    }
    
    enum class Qos{
        QOS_ZERO{
            override fun value():Int{
                return 0
            }
        },
        QOS_ONE{
            override fun value():Int{
                return 1
            }
        },
        QOS_TWO{
            override fun value():Int{
                return 2
            }
        };
        abstract fun value(): Int
    }
    
    enum class Topic{
        //订阅主题
        TOPIC_MSG{
            override fun value():String{
                return "testtopic/#"
            }
        },
        //发布主题
        TOPIC_SEND{
            override fun value():String{
                return "testtopic/1"
            }
        };
        abstract fun value(): String
    }
    
    3.使用

    在Activity中的使用如下:

    class MainActivity : AppCompatActivity() {
        @RequiresApi(Build.VERSION_CODES.N)
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_main)
            val server = "tcp://mqtt.p2hp.com:1883" //服务端地址
            val mqttHelper = MQTTHelper(this,server,"123","123")
            mqttHelper.connect(Topic.TOPIC_MSG, Qos.QOS_TWO,false,object : MqttCallback {
                override fun connectionLost(cause: Throwable?) {
    
                }
    
                override fun messageArrived(topic: String?, message: MqttMessage?) {
                    //收到消息
                    message?.payload?.let { ToastUtils.showShort(String(it)) }
                    LogUtils.eTag("消息", message?.payload?.let { String(it) })
                }
    
                override fun deliveryComplete(token: IMqttDeliveryToken?) {
    
    
    
                }
            })
            val etMsg = findViewById<EditText>(R.id.et_msg)
            findViewById<Button>(R.id.tv_send).setOnClickListener {
                //发送消息
                mqttHelper.publish(Topic.TOPIC_SEND,etMsg.text.toString(),Qos.QOS_TWO)
            }
        }
    }
    

    案例源码

    https://github.com/itfitness/MQTTAndroid

    相关文章

      网友评论

        本文标题:Android实现MQTT客户端

        本文链接:https://www.haomeiwen.com/subject/ozgzhrtx.html