美文网首页springbootJavaJava 程序员
Spring Boot MQTT Too many publis

Spring Boot MQTT Too many publis

作者: 马小莫QAQ | 来源:发表于2022-07-02 15:14 被阅读0次

    背景

    最近项目中需要与andorid端进行交互,采用了MQTT消息进行通信,生产环境中偶尔会出现Too many publishesin progress(32202)的错误,严重的影响了正常功能的使用。

    前言

    项目中采用的是Spring Boot2.0集成的MQTT引入的版本为1.2.0,消息发送用的是MessagingGateway的方式实现

    原因分析

    出现此问题的原因跟MQTT的Qos的设置有关,所以需要简单的介绍下Qos相关值的含义

    1. 最多一次的传输

    发布者发送消息到服务器,没有确认消息,也不知道对方是否收到。

    1. 至少一次的传输

    发布者发布消息保存消息,服务器(broker)接收到消息,服务器(broker)发送消息到订阅者,服务器(broker)回一个PUBACK信息到发布者让删除消息,然后订阅者接收消息后PUBACK给服务器让删除消息。如果失败了,在一段时间确认信息没有收到,发送方都会将消息头的DUP设置为1,然后再次发送消息,消息最少一次到达服务。例如网络延迟等问题,发布者重复发送消息,订阅者多次订阅会产生重复消息.

    1. 只有一次的传输

    Qos为2只是在1的基础上做了改进,在发布者发送到服务器之后多了消息的确认以及多了消息msgID的缓存,重复信息的去重。在服务器发送到订阅者之后也多了消息的确认。

    项目中使用了MQTT发送消息的地方比较多,且一般都是以Qos为0,那么为什么Qos为0,在并发量比较大的情况下就会出现Too many publishesin progress(32202)的错误,报错的内容的源码如下:

    当actualInFlight超出设置的maxInflight最大值时就会报此错误,那么具体是什么原因导致的呢?我们需要通过源码来分析此问题的原因。

    源码分析

    关于源码的阅读我们需要整理主线思路,MQTT发送消息主线分为消息push到缓存中和异步发送两部分。

    MQTT的Push消息到缓存中时序图

    MqttPahoMessageHandler的publish方法

    说明:checkConection检查连接后,在发送消息。

    MqttAsyncClient的publish方法
    ClientComms的internalSend方法
    ClientState的send方法

    MqttPublish消息类型,继承了父类MqttWireMessage,而在MqttWireMessage的构造方法中将消息id设置为0

    SaveToken的源码实现如下:

    通过前面这几步的操作,消息已经放入到HashTable缓存中,准备异步发送。

    异步发送消息时序图

    说明:MqttAsyncClient的connect为客户端建立连接,兴趣的可以看下源码。

    ClientComms的conncect方法

    ConnectBG的run方法

    CommsSender的run方法

    1. 从clientState中获取消息
    2. 通过消息id去hashtable中获取缓存消息
    3. 消息不为空,执行消息发送
    4. 调用notifySent方法删除消息,且actualInFlight执行递减操作。

    CommsSender的notifySent方法

    小结

    在高并发的场景下,pendingMessage可能会添加多条数据,Qos设置为0的时候,存入tokens(Hashtable)中的key一直是0,当执行tokenStore.getToken在发送方法之后会remove所有数据,由于tokenStore中已经不存在值,因为已经被上一次已经全部remove了,当再次getToken的消息时获取会为空,不在发送信息,使得actualInFlight没有递减,所以才经过一段时间后actualInFlight就会超出设置最大值,从而报错。

    //存放待发送消息的Vector数组
    volatile private Vector pendingMessages;
    

    解决方案

    方案1:发送消息时设置为Qos=1

    此方案虽然可以解决此问题,但存在如下的缺点:

    1. 网络延迟时会发送重复消息问题,导致消费者重复消费,关于重复的消息解决需要进行相关的幂等性操作,增加了修改的复杂度和成本。
    2. 发送消息需要进行消息确认,网络资源消耗过大。

    方案2: 修改maxInflight的默认值,例如将其修改为50

      MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
      mqttConnectOptions.setMaxInflight(50);
    

    此方案虽然修改比较简单,但是并没有从根本上解决问题,只是缓解了出现错误的时间,如果项目中并发量比较低,可以采用此方案解决。

    方案3:将消息配置为多客户端模式

    由于mqttMessageHandler只会引用一个paho客户端,所以才会想到增加客户端模式来提高并发量.需要重写MqttPahoMessageHandler类的相关方法。虽然可以解决此问题,如果对MQTT的源码不是很了解,不建议采用此方案,不利于后续的版本升级。

    方案4:升级mqtt版本为1.2.1

    在1.2.1的版本中官方已经进行了相关的修改,当qos=0已经不存入tokenStore了,每次发送完之后就会删除掉token以及释放id,所以就不会出现Too many publishes in progress的问题。

    引入1.2.1的版本会带来https验证问题,因为在Mqtt的1.2.1版本中,增加了https的验证需要添加相关配置,否则启动时会报安全认证错误。

    解决方案:如果项目中没有开启https认证,需要设置HttpsHostnameVerificationEnabled为false即可。

    mqttConnectOptions.setHttpsHostnameVerificationEnabled(false);
    

    总结

    本文通过定位MQTT错误,详细的讲解了MQtt消息的发送流程,解决的方案虽然有多种,我们需要结合实际的业务情况来解决问题

    作者:剑圣无痕
    链接:https://juejin.cn/post/7115304423286374407
    来源:稀土掘金

    相关文章

      网友评论

        本文标题:Spring Boot MQTT Too many publis

        本文链接:https://www.haomeiwen.com/subject/jzkkbrtx.html