Spring Boot RabbitMQ实践

作者: xiaolyuh | 来源:发表于2018-02-07 13:43 被阅读1226次

    背景

    我们现在有两个主要的系统一个是活动系统一个是奖品系统,活动系统会调用奖品系统发放奖励。

    最开始两个之间只通过http直接调用,优点:开发成本低,没有多余组件引入;发放奖励实时返回;活动系统不需要管奖品是否还有剩余库存。缺点:这样就导致上游活动系统强依赖于下游的奖品系统,如果一旦奖品系统挂掉,我们活动系统也就不可用了;这里还有个bug,在调用奖品系统发放奖励,奖品系统发放成功了,但是活动系统请求超时了,就会导致提示客户的没有奖品了,但是实际奖品又发放了。

    访问量上来后发直接走http肯定是不行的,所以引入了MQ将将两个系统隔离开,优点:所有发放流程异步执行,活动系统响应更快了;这两个系统就变成弱引用关系,即使奖品系统挂掉,活动系统仍能正常运行;不会出现上面说的bug了;缺点:发放奖励将会有延迟;引入MQ增加了项目复杂度,我们必须去考虑消息的丢失,重复消费等问题;活动系统需要知道奖品的库存情况。

    解决方案

    针对上面使用MQ发放奖励会遇到的问题,我们可以通过面的方案来解决。

    消息的丢失问题

    在数据库创建一张异常消息表。

    • 在发消息的时候如果出现异常,直接将消息记录到异常消息表,等待后台跑批,进行补偿发放。
    • 在发消息的时候,如果发送消息的ack回调没没有发送成功,将进行消息重发,如果重发3次还是失败,该消息就记录到异常消息表,等待后台跑批,进行补偿发放。消息的重复发送可以使用RabbitMQ的ConfirmCallback、ReturnCallback机制来实现。
    • 在消费端处理消息(调用奖品系统发放奖励)的时候,如果出现异常也将消息放到异常消息表中,等待后台跑批,进行补偿发放。如果将异常消息保存到数据库时发生了异常,则将消息放到死信队列,等待后台跑批,进行补偿发放。

    这样子虽然还是不能完全杜绝消息丢失,但是绝大部分情况下是没有问题的。

    重复消费问题

    为每个消息生成业务流水号,将流水号和发放里的参数一起发送到奖品系统,奖品系统在发放奖励的时候先判断这个流水号是否存在,存在就表示该奖品已经发过来直接返回发放成功,如果没有就进行发放奖励操作。

    活动系统需要知道奖品的库存情况。

    我们在配置活动的时候会将奖品的库存放到我们活动系统,在发MQ消息之前回去判断是否有剩余库存,如果没有直接返回奖励领完了,如果有才回去发MQ消息。扣减库存可以参考基于redis实现的扣减库存

    活动流程图

    下面是引入MQ过后我们系统的流程图


    MQ解耦系统间的依赖关系 生产者.png 消费者.png

    生产者端实现

    /**
     * Rabbit 发送消息
     *
     * @author yuhao.wang
     */
    @Service
    public class RabbitSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback, InitializingBean {
        private final Logger logger = LoggerFactory.getLogger(RabbitSender.class);
    
        /**
         * Rabbit MQ 客户端
         */
        @Autowired
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 系统配置
         */
        @Autowired
        private SystemConfig systemConfig;
    
        /**
         * 发送MQ消息
         *
         * @param exchangeName 交换机名称
         * @param routingKey   路由名称
         * @param message      发送消息体
         */
        public void sendMessage(String exchangeName, String routingKey, Object message) {
            Assert.notNull(message, "message 消息体不能为NULL");
            Assert.notNull(exchangeName, "exchangeName 不能为NULL");
            Assert.notNull(routingKey, "routingKey 不能为NULL");
    
            // 获取CorrelationData对象
            CorrelationData correlationData = this.correlationData(message);
            correlationData.setExchange(exchangeName);
            correlationData.setRoutingKey(routingKey);
            correlationData.setMessage(message);
    
            logger.info("发送MQ消息,消息ID:{},消息体:{}, exchangeName:{}, routingKey:{}",
                    correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey);
            // 发送消息
            this.convertAndSend(exchangeName, routingKey, message, correlationData);
        }
    
        /**
         * 用于实现消息发送到RabbitMQ交换器后接收ack回调。
         * 如果消息发送确认失败就进行重试。
         *
         * @param correlationData
         * @param ack
         * @param cause
         */
        @Override
        public void confirm(org.springframework.amqp.rabbit.support.CorrelationData correlationData, boolean ack, String cause) {
            // 消息回调确认失败处理
            if (!ack && correlationData instanceof CorrelationData) {
                CorrelationData correlationDataExtends = (CorrelationData) correlationData;
    
                //消息发送失败,就进行重试,重试过后还不能成功就记录到数据库
                if (correlationDataExtends.getRetryCount() < systemConfig.getMqRetryCount()) {
                    logger.info("MQ消息发送失败,消息重发,消息ID:{},重发次数:{},消息体:{}", correlationDataExtends.getId(),
                            correlationDataExtends.getRetryCount(), JSON.toJSONString(correlationDataExtends.getMessage()));
    
                    // 将重试次数加一
                    correlationDataExtends.setRetryCount(correlationDataExtends.getRetryCount() + 1);
    
                    // 重发发消息
                    this.convertAndSend(correlationDataExtends.getExchange(), correlationDataExtends.getRoutingKey(),
                            correlationDataExtends.getMessage(), correlationDataExtends);
                } else {
                    //消息重试发送失败,将消息放到数据库等待补发
                    logger.warn("MQ消息重发失败,消息入库,消息ID:{},消息体:{}", correlationData.getId(),
                            JSON.toJSONString(correlationDataExtends.getMessage()));
    
                    // TODO 保存消息到数据库
                }
            } else {
                logger.info("消息发送成功,消息ID:{}", correlationData.getId());
            }
        }
    
        /**
         * 用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。
         */
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            logger.error("MQ消息发送失败,replyCode:{}, replyText:{},exchange:{},routingKey:{},消息体:{}",
                    replyCode, replyText, exchange, routingKey, JSON.toJSONString(message.getBody()));
    
            // TODO 保存消息到数据库
        }
    
        /**
         * 消息相关数据(消息ID)
         *
         * @param message
         * @return
         */
        private CorrelationData correlationData(Object message) {
    
            return new CorrelationData(UUID.randomUUID().toString(), message);
        }
    
        /**
         * 发送消息
         *
         * @param exchange        交换机名称
         * @param routingKey      路由key
         * @param message         消息内容
         * @param correlationData 消息相关数据(消息ID)
         * @throws AmqpException
         */
        private void convertAndSend(String exchange, String routingKey, final Object message, CorrelationData correlationData) throws AmqpException {
            try {
                rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
            } catch (Exception e) {
                logger.error("MQ消息发送异常,消息ID:{},消息体:{}, exchangeName:{}, routingKey:{}",
                        correlationData.getId(), JSON.toJSONString(message), exchange, routingKey, e);
    
                // TODO 保存消息到数据库
            }
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
            rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.setReturnCallback(this);
        }
    }
    

    生产者端使用ConfirmCallback和ReturnCallback回调机制,最大限度的保证消息不丢失,对原有CorrelationData类进行扩展,来实现消息的重发,具体请看源码。

    消费者端实现

    
    /**
     * 发放优惠券的MQ处理
     *
     * @author yuhao.wang
     */
    @Service
    @ConditionalOnClass({RabbitTemplate.class})
    public class SendMessageListener {
    
        private final Logger logger = LoggerFactory.getLogger(SendMessageListener.class);
    
        @RabbitListener(queues = RabbitConstants.QUEUE_NAME_SEND_COUPON)
        public void process(SendMessage sendMessage, Channel channel, Message message) throws Exception {
            logger.info("[{}]处理发放优惠券奖励消息队列接收数据,消息ID:{},消息体:{}", RabbitConstants.QUEUE_NAME_SEND_COUPON,
                    message.getMessageProperties().getCorrelationIdString(), JSON.toJSONString(sendMessage));
    
            try {
                // 参数校验
                Assert.notNull(sendMessage, "sendMessage 消息体不能为NULL");
    
                // TODO 处理消息
    
                // 确认消息已经消费成功
                channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
            } catch (Exception e) {
                logger.error("MQ消息处理异常,消息ID:{},消息体:{}", message.getMessageProperties().getCorrelationIdString(),
                        JSON.toJSONString(sendMessage), e);
    
                try {
                    // TODO 保存消息到数据库
    
                    // 确认消息已经消费成功
                    channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
                } catch (Exception e1) {
                    logger.error("保存异常MQ消息到数据库异常,放到死性队列,消息ID:{}", message.getMessageProperties().getCorrelationIdString());
                    // 确认消息将消息放到死信队列
                    channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
                }
            }
        }
    }
    

    消费者端主要做了消息消费失败的容错处理。

    源码

    https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

    spring-boot-student-rabbitmq 工程

    相关文章

      网友评论

      • 薄飞:emmm。。关于producer的ack,在提出一点问题:
        当消息到达exchange,却没到达queue时,ack为true,楼主逻辑判断为“发送成功",实际上却发送失败了
        应该ack为true,且returnback没调用时,才能确定消息已经到达队列并持久化
      • 薄飞:还有这里
        catch (Exception dbe) {
        logger.error("保存异常MQ消息到数据库异常,放到死性队列,消息体:{}", JSON.toJSONString(sendMessage), dbe);
        // 确认消息将消息放到死信队列
        channel.basicNack(message.getMessageProperties().getDeliveryTag(), false, false);
        }
        确认了消息,消息怎么会发到死信队列。。
        薄飞:这里我弄错了, basicNack(), requeue=false时会发到死信,但3楼那个basicAck没有这个功能
      • 薄飞:这里有问题啊
        catch (Exception e) {
        logger.error("MQ消息处理异常,消息体:{}", message.getMessageProperties().getCorrelationIdString(), JSON.toJSONString(sendMessage), e);

        // 确认消息已经消费消费失败,将消息发给下一个消费者
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), true);
        }
        确认了消息。消息根本不会发给下一个消费者了啊
      • a6954041faee:重发消息的时候使用correlationDataExtends获取了exchange和routingKey,但是下发消息的时候只构造了CorrelationData的ID和Message呢?
        xiaolyuh:这里确实有点问题,我忘记了吧exchange和Routingkey设置到CorrelationData里面了。现在已经改了 你可以看一下最新代码。谢谢了的宝贵意见

      本文标题:Spring Boot RabbitMQ实践

      本文链接:https://www.haomeiwen.com/subject/mdgozxtx.html