美文网首页
RabitMQ实现重试次数二-放入消息体

RabitMQ实现重试次数二-放入消息体

作者: 瞿大官人 | 来源:发表于2019-04-05 14:19 被阅读0次

前言

前一节说了如何使用SpringRetry实现重试,详情见RabbitMQ实现重试次数方法一-SpringRetry,这一节讲使用channel.basicPublish实现重试。其实事先说明这种方式的缺点,一个消费者消费失败后重试将消息重新发送出去,这样会导致其他已经正常消费的消费者重新接收到该消息。比如 ConsumerA 监听队列ACroutingKeyacConsumerB监听队列BCroutingKeybc。这时候生产者发送一个message,routingKey为c,这个时候ConsumerAConsumerB都会同时接收到这个消息,假如ConsumerA消费成功,ConsumerB消费失败,然后ConsumerB将消息重新发送出去重试,这个时候原本消费成功的ConsumerA会重新接收到这个消息。

接受消息进行重试

消费者接收到消息执行以下步骤:

  1. 调用channel.basicAck确认消费。
  2. 获取消息体中的重试次数。
  3. 累加重试次数,重新放入消息体中。
  4. 将消息重新发送至队尾。
public class DefaultMessageRetry {

    private RabbitProperties properties;

    public void setProperties(RabbitProperties properties) {
        this.properties = properties;
    }

    private Logger logger = LoggerFactory.getLogger(DefaultMessageRetry.class);

    @Override
    public void retry(Message message, Channel channel) {
        if (!channel.isOpen()) {
            logger.error("rabbit通道关闭");
            return;
        }
        // 消息确认
        try {
            channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
        } catch (IOException e) {
            logger.error("确认消息失败 message = {}, e ={}", new java.lang.String(message.getBody()), e);
            throw new BizSystemException(ConsumerErrorInfoEnum.ACK_ERROR);
        }
        String json = new String(message.getBody());
        // 获取重试次数
        int retryTimes = getRetryTimes(json);
        if (retryTimes >= 100) {
            logger.info("重试次数达到最大值 retryTimes = {}, messageId = {}", retryTimes, message.getMessageProperties().getMessageId());
            return;
        }
        // 消息发送到队尾
        try {
            channel.basicPublish(message.getMessageProperties().getReceivedExchange(),// 交换机
                    message.getMessageProperties().getReceivedRoutingKey(),// 路由键
                    converBasicProperties(message.getMessageProperties()), // 属性
                    newRetryTimes(json, retryTime + 1).getBytes()); // 放入了新重试次数的body
        } catch (IOException e) {
            logger.error("消息发布失败 message = {}, e = {}", new java.lang.String(message.getBody()), e);
            throw new BizSystemException(ConsumerErrorInfoEnum.PUBLISH_ERROR);
        }
    }


   // 获取body中的重试次数
     private int getRetryTime(String json) {
        Object retryTimes =  JSONObject.parseObject(json).get("retryTime");
        return retryTimes == null ? 0 : (int) retryTimes;
    }
  // 将重试次数放入body里面
    private String newRetryTimes(String json, int retryTimes) {
        return JSONObject.parseObject(json).fluentPut("retryTime", retryTimes).toJSONString();
    }

    private AMQP.BasicProperties converBasicProperties(MessageProperties messageProperties) {
        AMQP.BasicProperties basicProperties = new AMQP.BasicProperties(
                messageProperties.getContentType(),
                messageProperties.getContentEncoding(),
                messageProperties.getHeaders(),
                messageProperties.getDelay(),
                messageProperties.getPriority(),
                messageProperties.getCorrelationIdString(),
                messageProperties.getReplyTo(),
                messageProperties.getExpiration(),
                messageProperties.getMessageId(),
                messageProperties.getTimestamp(),
                messageProperties.getType(),
                messageProperties.getUserId(),
                messageProperties.getAppId(),
                messageProperties.getClusterId()
        );
        return basicProperties;
    }
}

总结

这种方式会影响到其他正常消费者,所以一般不赞成使用。

相关文章

网友评论

      本文标题:RabitMQ实现重试次数二-放入消息体

      本文链接:https://www.haomeiwen.com/subject/bxghiqtx.html