美文网首页
MQTT使用说明及EMQ-X部署

MQTT使用说明及EMQ-X部署

作者: Mr_Michael | 来源:发表于2019-11-26 16:32 被阅读0次

    一、MQTT 简介

    MQTT协议(Message Queuing Telemetry Transport),叫做遥信消息队列传输。 是工作在TCP/IP协议族上的一种轻量级的发布/订阅协议,需要最小的占用空间和带宽来连接 IoT 设备。与 HTTP 的请求/响应范式不同,MQTT 是事件驱动的,可以将消息推送到客户端。这种类型的架构将客户端彼此解耦,以实现高度可扩展的解决方案,而数据生产者和数据消费者之间没有依赖关系。

    1.MQTT 的主要优点

    • 开放消息协议,简单易实现
    • 发布订阅模式,一对多消息发布
    • 基于TCP/IP网络连接
    • 1字节固定报头,2字节心跳报文,报文结构紧凑
    • 消息QoS支持,可靠传输保证

    2.MQTT 的基本概念

    MQTT 的核心是MQTT 代理MQTT 客户端。代理负责在发送方和合法接收方之间分发消息。MQTT 客户端向代理发布消息,其他客户端可以订阅该代理以接收消息。每个 MQTT 消息都包含一个主题。客户端向特定主题发布消息,MQTT 客户端订阅他们想要接收的主题。MQTT 代理使用主题和订阅者列表将消息分派到适当的客户端。

    image

    MQTT传输的消息分为:主题(Topic)和负载(payload)两部分:

    • Topic,可以理解为消息的类型,订阅者订阅(Subscribe)后,就会收到该主题的消息内容(payload);

    • payload,可以理解为消息的内容,是指订阅者具体要使用的内容,最大支持256MB。

    MQTT 代理能够缓冲无法分派到未连接的 MQTT 客户端的消息。这对于网络连接不可靠的情况非常有用。为了支持可靠的消息传递,该协议支持 3 种不同类型的服务质量消息:0 - 最多一次,1 - 至少一次,2 - 恰好一次。

    • Qos0消息发布订阅

      image
    • Qos1消息发布订阅

      image
    • Qos2消息发布订阅

      image

    1)MQTT 客户端

    一个使用MQTT协议的应用程序或者设备,它总是建立到服务器的网络连接。

    MQTT 客户端生命周期

    • 建立连接
      • 指定 MQTT Broker 基本信息接入地址与端口
      • 指定传输类型是 TCP 还是 MQTT over WebSocket
      • 如果启用 TLS 需要选择协议版本并携带相应的的证书
      • Broker 启用了认证鉴权则客户端需要携带相应的 MQTT Username Password 信息
      • 配置客户端参数如 keepalive 时长、clean session 回话保留标志位、MQTT 协议版本、遗嘱消息(LWT)等
    • 订阅主题:连接建立成功后可以订阅主题,需要指定主题信息
      • 指定主题过滤器 Topic,订阅的时候支持主题通配符 +# 的使用
      • 指定 QoS,根据客户端库和 Broker 的实现可选 Qos 0 1 2
      • 订阅主题可能因为网络问题、Broker 端 ACL 规则限制而失败
    • 接收消息并处理
      • 一般是在连接时指定处理函数,依据客户端库与平台的网络编程模型不同此部分处理方式略有不同
    • 发布消息:向指定主题发布消息
      • 指定目标主题,注意该主题不能包含通配符 +#,若主题中包含通配符可能会导致消息发布失败、客户端断开等情况(视 Broker 与客户端库实现方式)
      • 指定消息 QoS 级别
      • 指定消息体内容,消息体内容大小不能超出 Broker 设置最大消息大小
      • 指定消息 Retain 保留消息标志位
    • 取消订阅
      • 指定目标主题即可
    • 断开连接
      • 客户端主动断开连接,服务器端将发布遗嘱消息(LWT)

    Client libraries

    C

    • Eclipse Paho C
    • Eclipse Paho Embedded C
    • libmosquitto
    • libemqtt - an embedded C client
    • MQTT-C - A portable MQTT C client for embedded systems and PCs alike.
    • wolfMQTT - Embedded C client
    • MQTT over lwIP - MQTT C client for embedded systems using FreeRTOS, lwIP and mbedtls
    • libsmartfactory - easy to use library for different Smart Factory/Industry 4.0 technologies including a MQTT client implementation
    • libumqtt - A Lightweight and fully asynchronous MQTT client C library based on libev

    C++

    • Eclipse Paho C++
    • libmosquittopp
    • Eclipse Paho Embedded C++
    • mqtt_cpp - MQTT client and server library based on C++14 and Boost.Asio. It supports MQTT v3.1.1 and v5.

    Go

    • Eclipse Paho Go
    • mqtt by jeffallen
    • MQTT

    Python

    • Eclipse Paho Python - originally the mosquitto Python client
    • gmqtt
    • nyamuk
    • MQTT for twisted python
    • HBMQTT
    • mqttools

    2)MQTT 代理

    MQTT服务器以称为"消息代理"(Broker),可以是一个应用程序或一台设备。它是位于消息发布者和订阅者之间。

    MQTT代理可以:

    (1)接受来自客户的网络连接;

    (2)接受客户发布的应用信息;

    (3)处理来自客户端的订阅和退订请求;

    (4)向订阅的客户转发应用程序消息。

    如果需要下载MQTT服务器,可以直接去MQTT Software下载MQTT协议衍生出来的各个不同版本。

    常用Servers/ Brokers

    • EMQ X

      EMQ X Broker是一个完全开源、高扩展、高可用的分布式 MQTT 消息代理,适用于物联网、M2M 和移动应用程序,可处理数千万并发客户端。

      从 3.0 版本开始,EMQ X broker 全面支持 MQTT V5.0 协议规范,向下兼容 MQTT V3.1 和 V3.1.1,以及其他通信协议,如 MQTT-SN、CoAP、LwM2M、WebSocket 和 STOMP。EMQ X Broker 3.0 版本可以在一个集群上扩展到 10+ 百万并发 MQTT 连接。

    • HIVEMQ

      HiveMQ 是一个 MQTT 代理,它从头开始构建,考虑了最大的可扩展性和企业级安全性。它带有原生 Web 套接字支持和开源插件 SDK,以扩展其功能或将其与其他组件集成。

    • Mosquitto

      Mosquitto 是一个开源 MQTT 服务器。还提供公共托管测试服务器。

    • RabbitMQ

      RabbitMQ 是一个 AMQP 消息代理,带有一个MQTT 插件,还提供公共测试服务器

    3)MQTT协议中的订阅、主题、会话

    • 订阅(Subscription)

      订阅包含主题筛选器(Topic Filter)和最大服务质量(QoS)。订阅会与一个会话(Session)关联。一个会话可以包含多个订阅。每一个会话中的每个订阅都有一个不同的主题筛选器。

    • 会话(Session)

      每个客户端与服务器建立连接后就是一个会话,客户端和服务器之间有状态交互。会话存在于一个网络之间,也可能在客户端和服务器之间跨越多个连续的网络连接。

    • 主题名(Topic Name)

      连接到一个应用程序消息的标签,该标签与服务器的订阅相匹配。服务器会将消息发送给订阅所匹配标签的每个客户端。

    • 主题筛选器(Topic Filter)

      一个对主题名通配符筛选器,在订阅表达式中使用,表示订阅所匹配到的多个主题。

    • 负载(Payload)

      消息订阅者所具体接收的内容。

    4)连接方式

    MQTT客户端与代理可以基于MQTT-TCP或MQTT-WebSocket协议建立连接。

    • WebSocket协议是基于TCP的一种应用层网络协议。WebSocket使得客户端和服务器之间的数据交换变得更加简单,允许服务端主动向客户端推送数据。在WebSocket API中,浏览器和服务器只需要完成一次握手,两者之间就可以创建持久性的连接,并进行双向数据传输。
    • WebSocket协议支持Web浏览器(或其他客户端应用程序)与Web服务器之间的交互,具有较低的开销,便于实现客户端与服务器的实时数据传输。通信通过TCP端口80或443完成,这在防火墙阻止非Web网络连接的环境下是有益的。
    image

    二、EMQ X Broker使用

    1.安装EMQ X

    # 1.Shell 脚本一键安装 (Linux)
    wget https://repos.emqx.io/install_emqx.sh 
    sudo bash install_emqx.sh
    
    # 2.或者手动安装
    # 安装所需要的依赖包
    $ sudo apt update && sudo apt install -y \
        apt-transport-https \
        ca-certificates \
        curl \
        gnupg-agent \
        software-properties-common
    # 添加 EMQ X 的官方 GPG 密钥
    $ curl -fsSL https://repos.emqx.io/gpg.pub | sudo apt-key add -
    # 验证密钥
    $ sudo apt-key fingerprint 3E640D53
    pub   rsa2048 2019-04-10 [SC]
        FC84 1BA6 3775 5CA8 487B  1E3C C0B4 0946 3E64 0D53
    uid           [ unknown] emqx team <support@emqx.io>
    # 添加存储库
    $ sudo add-apt-repository \
        "deb [arch=amd64] https://repos.emqx.io/emqx-ce/deb/ubuntu/ \
        ./$(lsb_release -cs) \
        stable"
    $ sudo apt update
    $ sudo apt install emqx
    

    安装目录说明

    $ tree /var/lib/emqx/
    /var/lib/emqx/
    ├── configs
    │   ├── app.2021.09.16.19.06.05.config
    │   ├── app.2021.09.16.20.07.27.config
    │   ├── vm.2021.09.16.19.06.05.args
    │   └── vm.2021.09.16.20.07.27.args
    ├── emqx_erl_pipes
    │   └── emqx@127.0.0.1
    │       ├── erlang.pipe.1.r
    │       └── erlang.pipe.1.w
    ├── loaded_modules
    ├── loaded_plugins
    ├── mnesia
    │   └── emqx@127.0.0.1
    │       ├── DECISION_TAB.LOG
    │       ├── emqx_activated_alarm.DCD
    │       ├── emqx_activated_alarm.DCL
    │       ├── emqx_banned.DCD
    │       ├── emqx_deactivated_alarm.DCD
    │       ├── emqx_mod_delayed.DCD
    │       ├── emqx_resource.DCD
    │       ├── emqx_rule.DCD
    │       ├── emqx_telemetry.DCD
    │       ├── emqx_telemetry.DCL
    │       ├── LATEST.LOG
    │       ├── mqtt_admin.DCD
    │       ├── mqtt_admin.DCL
    │       ├── mqtt_app.DCD
    │       ├── mqtt_app.DCL
    │       └── schema.DAT
    ├── patches
    └── scripts
    
    $ tree /usr/lib/emqx/ -L 1
    /usr/lib/emqx/
    ├── bin
    ├── erts-11.1.8
    ├── lib
    └── releases
    
    $ tree /etc/emqx/ -L 1
    /etc/emqx/
    ├── acl.conf        # EMQ X 默认 ACL 规则配置文件
    ├── certs
    ├── emqx.conf    # EMQ X 配置文件
    ├── lwm2m_xml
    ├── plugins     # EMQ X 扩展插件配置文件
    ├── psk.txt
    ├── ssl_dist.conf
    └── vm.args
    

    2.使用EMQ X Broker

    1)启用EMQ X

    # 直接启动
    $ sudo emqx start
    EMQ X Broker 4.3.5 is started successfully!
    # 或
    $ sudo systemctl start emqx
    
    # 查看 EMQ X 的状态
    $ sudo emqx_ctl status
    Node 'emqx@127.0.0.1' 4.3.5 is started
    
    # 停止 EMQ X Broker
    $ emqx stop
    ok
    

    2)EMQ X TCP服务器

    基于/etc/emqx/emqx.conf可以,emqx tcp服务默认监听0.0.0.0:1883,WebSocket服务监听8083。参考详细配置说明

    ##--------------------------------------------------------------------
    ## MQTT/TCP - External TCP Listener for MQTT Protocol
    
    ## listener.tcp.$name is the IP address and port that the MQTT/TCP
    ## listener will bind.
    ##
    ## Value: IP:Port | Port
    ##
    ## Examples: 1883, 127.0.0.1:1883, ::1:1883
    listener.tcp.external = 0.0.0.0:1883
    
    ##--------------------------------------------------------------------
    ## External WebSocket listener for MQTT protocol
    
    ## listener.ws.$name is the IP address and port that the MQTT/WebSocket
    ## listener will bind.
    ##
    ## Value: IP:Port | Port
    ##
    ## Examples: 8083, 127.0.0.1:8083, ::1:8083
    listener.ws.external = 8083
    

    3)Dashboard

    访问 http://localhost:18083 。默认用户名是 admin,密码是 public。

    导航项目 说明
    MONITORING 提供了服务端与客户端监控信息的展示页面
    RULE ENGINE 提供了规则引擎的可视化操作页面
    MANAGEMENT 提供了扩展插件与应用的管理页面
    TOOLS 提供了 WebSocket 客户端工具以及 HTTP API 速查页面
    ADMIN 提供了 Dashboard 用户管理和显示设置等页面

    3.EMQ X 客户端

    不同语言可以使用各自的的libraries来实现。

    1)MQTT Python 客户端

    sudo pip install paho-mqtt
    

    基于TCP连接emqx broker,发布emqtt,订阅testtopic

    import paho.mqtt.client as mqtt
    
    # 连接成功回调
    def on_connect(client, userdata, flags, rc):
        print('Connected with result code '+str(rc))
        client.subscribe('testtopic/#')
    
    # 消息接收回调
    def on_message(client, userdata, msg):
        print(msg.topic+" "+str(msg.payload))
    
    client = mqtt.Client()
    
    # 指定回调函数
    client.on_connect = on_connect
    client.on_message = on_message
    
    # 建立连接
    client.connect('broker.emqx.io', 1883, 60)
    # 发布消息
    client.publish('emqtt',payload='Hello World',qos=0)
    
    client.loop_forever()
    

    执行

    python3 emqx-test.py
    

    相关文章

      网友评论

          本文标题:MQTT使用说明及EMQ-X部署

          本文链接:https://www.haomeiwen.com/subject/dchywctx.html