美文网首页
ActiveMQ 消息重试策略及原理

ActiveMQ 消息重试策略及原理

作者: 超人也害羞 | 来源:发表于2020-05-09 00:10 被阅读0次

    本篇文章研究ActiveMQ的Redelivery Policy中的各个配置含义.
    (注: 文中的源码基于ActiveMQ5.15.9版本,不同版本细节上可能会有所不同)

    重试策略参数.
    配置DEMO.
    ActiveMQ重试原理

    重试策略参数

    公共配置

    Property Default Value Descrtption
    initialRedeliveryDelay 1000 第一次重试延时X毫秒
    redeliveryDelay 1000 第一次消息重试之后,每次重试延长X毫秒
    maximumRedeliveries 6 最大重试次数

    如果理解起来比较困难,可以参考一下MQ源码,应该比较好理解了.

    // org.apache.activemq.ActiveMQMessageConsumer#rollback
    int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
    if (currentRedeliveryCount > 0) {
        // 如果不是第一次重试,按redeliverDelay值为基础来取值.
        this.redeliveryDelay = this.redeliveryPolicy.getNextRedeliveryDelay(this.redeliveryDelay);
    } else {
        // 如果是第一次重试,取InitialRedeliveryDelay的值
        this.redeliveryDelay = this.redeliveryPolicy.getInitialRedeliveryDelay();
    }
    

    碰撞躲避机制

    Property Default Value Descrtption
    collisionAvoidanceFactor 0.15 设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。也就是在延迟时间上再加一个时间波动范围。
    useCollisionAvoidance false 默认不启用

    延时递增配置

    Property Default Value Descrtption
    backOffMultiplier 5 重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。
    maximumRedeliveryDelay 1000 最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。
    useExponentialBackOff false 启用指数倍数递增的方式增加延迟时间。

    碰撞躲避机制 和 延时递增配置的源码可以参考下面

    public long getNextRedeliveryDelay(long previousDelay) {
            // 如果不启用延时递增和碰撞躲避策略的话,第二次之后重试的间隔就等于redeliveryDelay
            long nextDelay = this.redeliveryDelay;
            if (previousDelay > 0L && this.useExponentialBackOff && this.backOffMultiplier > 1.0D) {
                // 使用延时递增策略的条件是useExponentialBackOff=true并且backOffMultiplier>1
                // 然后nextdelay取上一次的延时 * backOffMultiplier
                nextDelay = (long)((double)previousDelay * this.backOffMultiplier);
                if (this.maximumRedeliveryDelay != -1L && nextDelay > this.maximumRedeliveryDelay) {
                    nextDelay = Math.max(this.maximumRedeliveryDelay, this.redeliveryDelay);
                }
            }
    
            if (this.useCollisionAvoidance) {
                // 如果采用碰撞躲避机制,以之前计算好的延时为基础,再加上一个随机的延时.
                Random random = getRandomNumberGenerator();
                double variance = (random.nextBoolean() ? this.collisionAvoidanceFactor : -this.collisionAvoidanceFactor) * random.nextDouble();
                nextDelay = (long)((double)nextDelay + (double)nextDelay * variance);
            }
    
            return nextDelay;
        }
    

    配置DEMO

    <bean id="delayRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
            <!--重发次数,默认为6次   这里设置为1次 -->
            <property name="maximumRedeliveries" value="3"></property>
            <!--重发时间间隔,第一次500毫秒 -->
            <property name="initialRedeliveryDelay" value="500"></property>
            <!--重发时间间隔,第二次及以后1000毫秒 -->
            <property name="redeliveryDelay" value="1000"></property>
    
            <!--是否在每次尝试重新发送失败后,增长这个(redeliveryDelay)等待时间 -->
            <property name="useExponentialBackOff" value="true"></property>
            <!--第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value -->
            <property name="backOffMultiplier" value="2"></property>
            <!--最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 -->
            <property name="maximumRedeliveryDelay" value="-1"></property>
    </bean>
    

    ActiveMQ重试原理

    看完上面的策略之后,不免好奇ActiveMQ是怎么去实现消息重试的呢?是由Broker控制的吗?带着疑问去翻了一下Consumer的代码,发现其实不然. 原来ActiveMQ的重试是在客户端去处理的,并且是由第一次消费了这条消息的Consumer去重试,而且重试的机制也很简单,MQ利用了一个定时器,延时到了之后再去重新处理消息.

    // org.apache.activemq.ActiveMQMessageConsumer#rollback
    // Start up the delivery again a little later.
    session.getScheduler().executeAfterDelay(new Runnable() {
      @Override
      public void run() {
      try {
        if (!unconsumedMessages.isClosed()) {
          for(MessageDispatch dispatch : pendingRedeliveries) {
            session.dispatch(dispatch);
          }
        }
       } catch (Exception e) {
        session.connection.onAsyncException(e);
       }
      }
    }, redeliveryDelay);
    

    举一个小栗子,如果有queue,重试3次,一条消息被某一个客户端消费了,重试了2次,这个时候这个客户端down了,然后重新启动,又重新消费这个消息,如果消费失败了,会重试几次呢? 可以下面留言哦

    参考资料

    https://activemq.apache.org/redelivery-policy.html

    https://blog.csdn.net/qq_39706128/article/details/80570577

    相关文章

      网友评论

          本文标题:ActiveMQ 消息重试策略及原理

          本文链接:https://www.haomeiwen.com/subject/siicnhtx.html