MQTT简析

作者: 一只好奇的茂 | 来源:发表于2017-06-16 17:04 被阅读411次

    由于部门做的设备需要使用到MQTT,在此总结下。

    基本介绍

    MQTT(Message Queue Telemetry Transport),遥测传输协议,提供订阅/发布模式,更为简约、轻量,易于使用,针对受限环境(带宽低、网络延迟高、网络通信不稳定),可以简单概括为物联网打造,官方总结特点如下:

    1. 使用发布/订阅消息模式,提供一对多的消息发布,解除应用程序耦合。
    1. 对负载内容屏蔽的消息传输。
    2. 使用 TCP/IP 提供网络连接。
    3. 有三种消息发布服务质量:
      “至多一次”,消息发布完全依赖底层 TCP/IP 网络。会发生消息丢失或重复。这一级别可用于如下情况,环境传感器数据,丢失一次读记录无所谓,因为不久后还会有第二次发送。
      “至少一次”,确保消息到达,但消息重复可能会发生。
      “只有一次”,确保消息到达一次。这一级别可用于如下情况,在计费系统中,消息重复或丢失会导致不正确的结果。
    4. 小型传输,开销很小(固定长度的头部是 2 字节),协议交换最小化,以降低网络流量。
    5. 使用 Last Will 和 Testament 特性通知有关各方客户端异常中断的机制。

    XMPP和MQTT对比

    XMPP
    优点:

    • 要涉及到上层业务,设计到用户分群分组,用户层次关系的维护,xmpp已经为你做了很多很多,后期的开发会很省心;
    • 协议成熟、强大、可扩展性强、目前主要应用于许多聊天系统中,且已有开源的Java版的开发实例androidpn;
    • 协议成熟,强大,可扩展性强,并且有成熟的开源方案。
    • 分布式:任何人都可以运行自己的XMPP服务器,它没有主服务器
    • 安全性高:使用TLS等技术
    • 跨平台

    缺点:

    • 协议虽然完整扩展性虽然好,它耗费网络流量很大,交互此说太多,跑起来比MQTT慢很多;另外有高达70%的流量是耗费在XMPP本身的标签和编解码上面。
    • 协议较复杂、冗余(基于XML)、费流量、费电,部署硬件成本高;

    MQTT
    优点:

    • 专门针对移动互联网开发的轻量级传输协议,二进制、协议简洁、小巧、可扩展性强、省流量、省电,适合做大量节点弱网络差的场景,非常适合现在移动互联网的基础设施
    • 开源的协议和实现,扩展方便且轻量级;
    • MQTT代码量也比XMPP小很多,使用容易。
    • 支持的设备从智能硬件到智能手机无所不包。

    MQTT快速示例

    市面上有相当多的高质量MQTT代理,其中mosquitto是一个开源的轻量级的C实现,完全兼容了MQTT 3.1和MQTT 3.1.1。下面我们就以mosquitto为例演示一下MQTT的使用。

    • 安装mosquitto以及搭配的客户端:
    apt-get install mosquitto
    apt-get install mosquitto-clients
    
    • 订阅一个主题:
    mosquitto_sub -d -t baidu/chatroom
    Received CONNACK
    Received SUBACK
    Subscribed (mid: 1): 0
    
    • 另外打开一个SSH连接然后在这个主题里面打个招呼:
    mosquitto_pub -d -t baidu/chatroom -m "Hello World"
    Received CONNACK
    Sending PUBLISH (d0, q0, r0, m1, 'baidu/chatroom', ... (11 bytes))
    
    • 此时回到第一个SSH客户端可以看到信息已经接收到了,之后便是心跳消息:
    Received PUBLISH (d0, q0, r0, m0, 'baidu/chatroom', ... (11 bytes))
    Hello World
    Sending PINGREQ
    Received PINGRESP
    
    • 需要注意的是mosquitto客户端默认使用QoS 0,下面我们使用QoS 2订阅这个主题:
    mosquitto_sub -d -q 2 -t baidu/chatroom
    Received CONNACK
    Received SUBACK
    Subscribed (mid: 1): 2
    
    • 切换到另外SSH连接然后在这个主题里面打个招呼:
    mosquitto_pub -d -q 2 -t baidu/chatroom -m "Hello World"
    Received CONNACK
    Sending PUBLISH (d0, q2, r0, m1, 'baidu/chatroom', ... (11 bytes))
    Received PUBREC (Mid: 1)
    Sending PUBREL (Mid: 1)
    Received PUBCOMP (Mid: 1)
    
    • 此时回到第一个SSH客户端可以看到信息已经接收到了,以及相应的多次握手消息:
    Received PUBLISH (d0, q2, r0, m1, 'baidu/chatroom', ... (11 bytes))
    Sending PUBREC (Mid: 1)
    Received PUBREL (Mid: 1)
    Hello World
    Sending PUBCOMP (Mid: 1)
    

    MQTT Java客户端实现

    使用开源项目https://www.eclipse.org/paho/提供的MQTT服务端tcp://iot.eclipse.org:1883,进行如下实验:

    import org.eclipse.paho.client.mqttv3.MqttClient;
    import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
    import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttMessage;
    import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
    
    
    /**
     *@Description:
     *@author lx
     *@date 2017-1-12 下午1:19:42
     */
    public class TestMQTT {
    
        public static void main(String args[]){
    
            //消息的类型
              String topic        = "TOPIC MQTT Examples";
              //消息内容
              String content      = "XX发布了消息";
              //消息发送的模式   选择消息发送的次数,依据不同的使用环境使用不同的模式
              int qos             = 2;
              //服务器地址
              String broker       = "tcp://iot.eclipse.org:1883";
              //客户端的唯一标识
              String clientId     = "CLIENTID JavaSample";
              //消息缓存的方式  内存缓存
              MemoryPersistence persistence = new MemoryPersistence();
    
              try {
                  //创建以恶搞MQTT客户端
                  MqttClient sampleClient = new MqttClient(broker, clientId, persistence);
                  //消息的配置参数
                  MqttConnectOptions connOpts = new MqttConnectOptions();
                  //不记忆上一次会话
                  connOpts.setCleanSession(true);
                  System.out.println("Connecting to broker: "+broker);
                  //链接服务器
                  sampleClient.connect(connOpts);
                  System.out.println("Connected");
                  System.out.println("Publishing message: "+content);
                  //创建消息
                  MqttMessage message = new MqttMessage(content.getBytes());
                  //给消息设置发送的模式
                  message.setQos(qos);
                  //发布消息到服务器
                  sampleClient.publish(topic, message);
                  System.out.println("Message published");
                  //断开链接
                  sampleClient.disconnect();
                  System.out.println("Disconnected");
                  System.exit(0);
              } catch(MqttException me) {
                  System.out.println("reason "+me.getReasonCode());
                  System.out.println("msg "+me.getMessage());
                  System.out.println("loc "+me.getLocalizedMessage());
                  System.out.println("cause "+me.getCause());
                  System.out.println("excep "+me);
                  me.printStackTrace();
              }
    
        }
    }
    

    参考https://github.com/eclipse/paho.mqtt.android

    服务端添加SSL加密

    这里采用了Moqutte推荐的SSL加密方式(http://andsel.github.io/moquette/),属于SSL加密中的单向加密。

    1. 生成服务器的keystore
      执行命令:
    $JAVA_HOME/bin/keytool -keystore serverkeystore.jks -alias testserver -genkey -keyalg RSA
    

    过程中提示输入名字时(CN),必须填写服务器的域名,本地调试时可填写localhost。
    然后修改工程里的 /config/moquette.conf 中的jks_path 对应值为serverkeystore.jks 的路径,把同时serverkeystore.jks复制到工程根目录下。

    1. 生成客户端的keystore
      1)导出服务器keystore的证书
      执行命令:
    $JAVA_HOME/bin/keytool -export -alias testserver -keystore serverkeystore.jks -file testserver.crt
    

    2)生成客户端的keystore
    执行命令:

    $JAVA_HOME/bin/keytool -keystore clientkeystore.jks -genkey -keyalg RSA
    

    3)向客户端的keystore 导入服务器keystore的证书,使客户端信任证书。
    执行命令:

    $JAVA_HOME/bin/keytool -keystore clientkeystore.jks -import -alias testserver -file testserver.crt -trustcacerts
    

    4)客户端启动时加载clientkeystore.jks然后再与服务器的SSL端口进行连接即可。
    客户端代码参考:sslSimplePublisher.groovy

    客户端添加ssl加密

    1. 生成 .bks文件:
      a、根据上一节,拿到服务器生成的 .jks证书,
      b、到官网下载 bcprov-ext-jdk15on-146.jar ,将该文件放到jdk1.6.0_03\jre\lib\ext目录下.
      c、配置bcprov
      在 jdk_home\jre\lib\security\目录中找到 java.security 在内容增加一行(数字可以自己定义)
    security.provider.11=org.bouncycastle.jce.provider.BouncyCastleProvider
    

    d、生成android平台的证书:

    keytool -exportcert -alias testserver -file test.cert -keystore  local_clientkeystore.jks
    keytool -importcert -keystore test.bks -file test.cert -storetype BKS -provider org.bouncycastle.jce.provider.BouncyCastleProvider
    
    1. 在Android工程中加载证书:
      a、生成SSLSocketFactory对象
    public class SslUtil {
        public static SSLSocketFactory createSocketFactory(Context context) {
            SSLContext sslContext;
            try {
                KeyStore ks = KeyStore.getInstance("BKS");
                ks.load(context.getResources().openRawResource(R.raw.peer),
                        "123456".toCharArray());                           //该字符串应随机生成,保证每次session唯一;
                KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory.getDefaultAlgorithm());
                //    kmf.init(ks, "passw0rd".toCharArray());
                kmf.init(ks, "123456".toCharArray());
                TrustManagerFactory tmf = TrustManagerFactory
                        .getInstance("X509");
                tmf.init(ks);
                TrustManager[] tm = tmf.getTrustManagers();
                sslContext = SSLContext.getInstance("TLS");
    
                sslContext.init(kmf.getKeyManagers(), tm, null);
                // SocketFactory factory= SSLSocketFactory.getDefault();
    
                // Socket socket =factory.createSocket("localhost", 10000);
                SSLSocketFactory ssf = sslContext.getSocketFactory();
                return  ssf;
            }catch (Exception e){
                e.printStackTrace();
                return  null;
            }
        }
    }
    

    b、在MqttOptions中添加SSL配置:

    mOptions = new MqttConnectOptions();
    mOptions.setCleanSession(true);
    SSLSocketFactory socketFactory = SslUtil.createSocketFactory(getApplicationContext());
    if (socketFactory != null) {
        LogUtil.d("socketFactory is not null");
        mOptions.setSocketFactory(socketFactory);
    }
    
    1. 修改url地址前缀:
      修改url地址由:
      tcp://yoursite.com:8402

      ssl://yoursite.com:2883
    2. 使用wireshark抓包验证,加密成功!

    参考文章

    MQTT协议笔记之头部信息
    MQTT快速入门

    相关文章

      网友评论

        本文标题:MQTT简析

        本文链接:https://www.haomeiwen.com/subject/iczhxxtx.html