前言
前一节说了如何使用SpringRetry
实现重试,详情见RabbitMQ实现重试次数方法一-SpringRetry,这一节讲使用channel.basicPublish
实现重试。其实事先说明这种方式的缺点,一个消费者消费失败后重试将消息重新发送出去,这样会导致其他已经正常消费的消费者重新接收到该消息。比如 ConsumerA
监听队列AC
,routingKey
为 a
、c
。ConsumerB
监听队列BC
,routingKey
为b
、c
。这时候生产者发送一个message
,routingKey为c
,这个时候ConsumerA
和ConsumerB
都会同时接收到这个消息,假如ConsumerA
消费成功,ConsumerB
消费失败,然后ConsumerB
将消息重新发送出去重试,这个时候原本消费成功的ConsumerA
会重新接收到这个消息。
接受消息进行重试
消费者接收到消息执行以下步骤:
- 调用
channel.basicAck
确认消费。 - 获取消息体中的重试次数。
- 累加重试次数,重新放入消息体中。
- 将消息重新发送至队尾。
public class DefaultMessageRetry {
private RabbitProperties properties;
public void setProperties(RabbitProperties properties) {
this.properties = properties;
}
private Logger logger = LoggerFactory.getLogger(DefaultMessageRetry.class);
@Override
public void retry(Message message, Channel channel) {
if (!channel.isOpen()) {
logger.error("rabbit通道关闭");
return;
}
// 消息确认
try {
channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
} catch (IOException e) {
logger.error("确认消息失败 message = {}, e ={}", new java.lang.String(message.getBody()), e);
throw new BizSystemException(ConsumerErrorInfoEnum.ACK_ERROR);
}
String json = new String(message.getBody());
// 获取重试次数
int retryTimes = getRetryTimes(json);
if (retryTimes >= 100) {
logger.info("重试次数达到最大值 retryTimes = {}, messageId = {}", retryTimes, message.getMessageProperties().getMessageId());
return;
}
// 消息发送到队尾
try {
channel.basicPublish(message.getMessageProperties().getReceivedExchange(),// 交换机
message.getMessageProperties().getReceivedRoutingKey(),// 路由键
converBasicProperties(message.getMessageProperties()), // 属性
newRetryTimes(json, retryTime + 1).getBytes()); // 放入了新重试次数的body
} catch (IOException e) {
logger.error("消息发布失败 message = {}, e = {}", new java.lang.String(message.getBody()), e);
throw new BizSystemException(ConsumerErrorInfoEnum.PUBLISH_ERROR);
}
}
// 获取body中的重试次数
private int getRetryTime(String json) {
Object retryTimes = JSONObject.parseObject(json).get("retryTime");
return retryTimes == null ? 0 : (int) retryTimes;
}
// 将重试次数放入body里面
private String newRetryTimes(String json, int retryTimes) {
return JSONObject.parseObject(json).fluentPut("retryTime", retryTimes).toJSONString();
}
private AMQP.BasicProperties converBasicProperties(MessageProperties messageProperties) {
AMQP.BasicProperties basicProperties = new AMQP.BasicProperties(
messageProperties.getContentType(),
messageProperties.getContentEncoding(),
messageProperties.getHeaders(),
messageProperties.getDelay(),
messageProperties.getPriority(),
messageProperties.getCorrelationIdString(),
messageProperties.getReplyTo(),
messageProperties.getExpiration(),
messageProperties.getMessageId(),
messageProperties.getTimestamp(),
messageProperties.getType(),
messageProperties.getUserId(),
messageProperties.getAppId(),
messageProperties.getClusterId()
);
return basicProperties;
}
}
总结
这种方式会影响到其他正常消费者,所以一般不赞成使用。
网友评论