本篇文章研究ActiveMQ的Redelivery Policy中的各个配置含义.
(注: 文中的源码基于ActiveMQ5.15.9版本,不同版本细节上可能会有所不同)
重试策略参数.
配置DEMO.
ActiveMQ重试原理
重试策略参数
公共配置
Property | Default Value | Descrtption |
---|---|---|
initialRedeliveryDelay | 1000 | 第一次重试延时X毫秒 |
redeliveryDelay | 1000 | 第一次消息重试之后,每次重试延长X毫秒 |
maximumRedeliveries | 6 | 最大重试次数 |
如果理解起来比较困难,可以参考一下MQ源码,应该比较好理解了.
// org.apache.activemq.ActiveMQMessageConsumer#rollback
int currentRedeliveryCount = lastMd.getMessage().getRedeliveryCounter();
if (currentRedeliveryCount > 0) {
// 如果不是第一次重试,按redeliverDelay值为基础来取值.
this.redeliveryDelay = this.redeliveryPolicy.getNextRedeliveryDelay(this.redeliveryDelay);
} else {
// 如果是第一次重试,取InitialRedeliveryDelay的值
this.redeliveryDelay = this.redeliveryPolicy.getInitialRedeliveryDelay();
}
碰撞躲避机制
Property | Default Value | Descrtption |
---|---|---|
collisionAvoidanceFactor | 0.15 | 设置防止冲突范围的正负百分比,只有启用useCollisionAvoidance参数时才生效。也就是在延迟时间上再加一个时间波动范围。 |
useCollisionAvoidance | false | 默认不启用 |
延时递增配置
Property | Default Value | Descrtption |
---|---|---|
backOffMultiplier | 5 | 重连时间间隔递增倍数,只有值大于1和启用useExponentialBackOff参数时才生效。 |
maximumRedeliveryDelay | 1000 | 最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 |
useExponentialBackOff | false | 启用指数倍数递增的方式增加延迟时间。 |
碰撞躲避机制 和 延时递增配置的源码可以参考下面
public long getNextRedeliveryDelay(long previousDelay) {
// 如果不启用延时递增和碰撞躲避策略的话,第二次之后重试的间隔就等于redeliveryDelay
long nextDelay = this.redeliveryDelay;
if (previousDelay > 0L && this.useExponentialBackOff && this.backOffMultiplier > 1.0D) {
// 使用延时递增策略的条件是useExponentialBackOff=true并且backOffMultiplier>1
// 然后nextdelay取上一次的延时 * backOffMultiplier
nextDelay = (long)((double)previousDelay * this.backOffMultiplier);
if (this.maximumRedeliveryDelay != -1L && nextDelay > this.maximumRedeliveryDelay) {
nextDelay = Math.max(this.maximumRedeliveryDelay, this.redeliveryDelay);
}
}
if (this.useCollisionAvoidance) {
// 如果采用碰撞躲避机制,以之前计算好的延时为基础,再加上一个随机的延时.
Random random = getRandomNumberGenerator();
double variance = (random.nextBoolean() ? this.collisionAvoidanceFactor : -this.collisionAvoidanceFactor) * random.nextDouble();
nextDelay = (long)((double)nextDelay + (double)nextDelay * variance);
}
return nextDelay;
}
配置DEMO
<bean id="delayRedeliveryPolicy" class="org.apache.activemq.RedeliveryPolicy">
<!--重发次数,默认为6次 这里设置为1次 -->
<property name="maximumRedeliveries" value="3"></property>
<!--重发时间间隔,第一次500毫秒 -->
<property name="initialRedeliveryDelay" value="500"></property>
<!--重发时间间隔,第二次及以后1000毫秒 -->
<property name="redeliveryDelay" value="1000"></property>
<!--是否在每次尝试重新发送失败后,增长这个(redeliveryDelay)等待时间 -->
<property name="useExponentialBackOff" value="true"></property>
<!--第一次失败后重新发送之前等待500毫秒,第二次失败再等待500 * 2毫秒,这里的2就是value -->
<property name="backOffMultiplier" value="2"></property>
<!--最大传送延迟,只在useExponentialBackOff为true时有效(V5.5),假设首次重连间隔为10ms,倍数为2,那么第二次重连时间间隔为 20ms,第三次重连时间间隔为40ms,当重连时间间隔大的最大重连时间间隔时,以后每次重连时间间隔都为最大重连时间间隔。 -->
<property name="maximumRedeliveryDelay" value="-1"></property>
</bean>
ActiveMQ重试原理
看完上面的策略之后,不免好奇ActiveMQ是怎么去实现消息重试的呢?是由Broker控制的吗?带着疑问去翻了一下Consumer的代码,发现其实不然. 原来ActiveMQ的重试是在客户端去处理的,并且是由第一次消费了这条消息的Consumer去重试,而且重试的机制也很简单,MQ利用了一个定时器,延时到了之后再去重新处理消息.
// org.apache.activemq.ActiveMQMessageConsumer#rollback
// Start up the delivery again a little later.
session.getScheduler().executeAfterDelay(new Runnable() {
@Override
public void run() {
try {
if (!unconsumedMessages.isClosed()) {
for(MessageDispatch dispatch : pendingRedeliveries) {
session.dispatch(dispatch);
}
}
} catch (Exception e) {
session.connection.onAsyncException(e);
}
}
}, redeliveryDelay);
举一个小栗子,如果有queue,重试3次,一条消息被某一个客户端消费了,重试了2次,这个时候这个客户端down了,然后重新启动,又重新消费这个消息,如果消费失败了,会重试几次呢? 可以下面留言哦
参考资料
网友评论