美文网首页
MQTT(32202): 正在发布过多的消息

MQTT(32202): 正在发布过多的消息

作者: Noah牛YY | 来源:发表于2018-02-28 23:44 被阅读1688次

    谢评论区 kangsong 指正,文章已更新

    先贴一下异常信息

    02-28 15:07:21.156 1975-2791/com.tencent.mm E/Xposed: [15:07:21]: publish failed, message: aaaa
                                                          正在进行过多的发布 (32202)
                                                              at org.eclipse.paho.client.mqttv3.internal.ClientState.send(ClientState.java:496)
                                                              at org.eclipse.paho.client.mqttv3.internal.ClientComms.internalSend(ClientComms.java:132)
                                                              at org.eclipse.paho.client.mqttv3.internal.ClientComms.sendNoWait(ClientComms.java:156)
                                                              at org.eclipse.paho.client.mqttv3.MqttAsyncClient.publish(MqttAsyncClient.java:1027)
                                                              at org.eclipse.paho.client.mqttv3.MqttClient.publish(MqttClient.java:399)
                                                              at io.communet.ichater.emq.util.MqttUtil.publishMsg(MqttUtil.java:171)
                                                              at io.communet.ichater.emq.util.MqttUtil.publishMsg(MqttUtil.java:161)
                                                              at io.communet.ichater.emq.sub.MqttSendMsgEventSubscribe.onEvent(MqttSendMsgEventSubscribe.java:28)
                                                              at java.lang.reflect.Method.invoke(Native Method)
                                                              at java.lang.reflect.Method.invoke(Method.java:372)
                                                              at org.greenrobot.eventbus.EventBus.invokeSubscriber(EventBus.java:507)
                                                              at org.greenrobot.eventbus.EventBus.invokeSubscriber(EventBus.java:501)
                                                              at org.greenrobot.eventbus.AsyncPoster.run(AsyncPoster.java:46)
                                                              at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1112)
                                                              at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:587)
                                                              at java.lang.Thread.run(Thread.java:818)
    

    原因

    根据堆栈信息找到报错地方

    if (actualInFlight >= this.maxInflight) {
        //@TRACE 613= sending {0} msgs at max inflight window
        log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)});
    
        throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
    }
    

    其中 actualInFlight 如下

    // processed until the inflight window has space. 
    if (actualInFlight < this.maxInflight) {
        // The in flight window is not full so process the 
        // first message in the queue
        result = (MqttWireMessage)pendingMessages.elementAt(0);
        pendingMessages.removeElementAt(0);
        actualInFlight++;
        
        //@TRACE 623=+1 actualInFlight={0}
        log.fine(CLASS_NAME,methodName,"623",new Object[]{new Integer(actualInFlight)});
    }
    

    pendingMessages 中取出消息时, actualInFlight 加 1, maxInflight 可以自己设定, 默认值为 10.

    public class ClientState {
      ...
      volatile private Vector pendingMessages;
      ...
    }
    

    ClientState 中:

    public void send(MqttWireMessage message, MqttToken token) throws MqttException {
            ... 
            if (message instanceof MqttPublish) {
                synchronized (queueLock) {
                    if (actualInFlight >= this.maxInflight) {
                        //@TRACE 613= sending {0} msgs at max inflight window
                        log.fine(CLASS_NAME, methodName, "613", new Object[]{new Integer(actualInFlight)});
    
                        throw new MqttException(MqttException.REASON_CODE_MAX_INFLIGHT);
                    }
                    
                    MqttMessage innerMessage = ((MqttPublish) message).getMessage();
                    //@TRACE 628=pending publish key={0} qos={1} message={2}
                    log.fine(CLASS_NAME,methodName,"628", new Object[]{new Integer(message.getMessageId()), new Integer(innerMessage.getQos()), message});
    
                    switch(innerMessage.getQos()) {
                        case 2:
                            outboundQoS2.put(new Integer(message.getMessageId()), message);
                            persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
                            break;
                        case 1:
                            outboundQoS1.put(new Integer(message.getMessageId()), message);
                            persistence.put(getSendPersistenceKey(message), (MqttPublish) message);
                            break;
                    }
                    tokenStore.saveToken(token, message);
                    pendingMessages.addElement(message);
                    queueLock.notifyAll();
                }
            } else {
            ...
            }
        }
    

    可以看到 pendingMessages 中添加元素的时候并没有做 qos 类型的判断

    private void decrementInFlight() {
        final String methodName = "decrementInFlight";
        synchronized (queueLock) {
            actualInFlight--;
            //@TRACE 646=-1 actualInFlight={0}
            log.fine(CLASS_NAME,methodName,"646",new Object[]{new Integer(actualInFlight)});
                
            if (!checkQuiesceLock()) {
                queueLock.notifyAll();
            }
        }
    }
    

    当收到消息反馈时 actualInFlight 减 1.

    解决办法

    1. 消息发送发送限流
    2. 用单独的一个线程来完成 MQ 消息的推送

    题外话

    笔者出现这个错误是因为使用 EventBus, 之前使用单独线程的 Handler 是没有问题的, 调查发现, 使用 EventBus 是新建线程运行的, 而 Handler 是单独一个线程.

    所以当发送大量消息的时候, EventBus 几乎是同一个点发出去, 就会造成这个错误

    相关文章

      网友评论

          本文标题:MQTT(32202): 正在发布过多的消息

          本文链接:https://www.haomeiwen.com/subject/eezbxftx.html