美文网首页MQRabbitMQ
RabbitMQ 高可用优化

RabbitMQ 高可用优化

作者: xiaolyuh | 来源:发表于2018-07-22 17:38 被阅读346次

    RabbitMQ的主要作用基本上可以用8个字概括,削峰填谷异步解耦。但是引入MQ我们也不得不考虑引入MQ后带来的一些问题,如消息丢失。

    在一些业务场景不一样,处理方式也就不一样,比如发短信,日志收集我们主要看吞吐量所以对消息丢失容忍度较高,这类场景基本上不用花太多时间在消息丢失问题上。另外一种,如我们用MQ来做分布式事务,续保计算,提成的计算,这类业务对消息丢失容忍度较底,所以我们一定要考虑消息丢失的问题。这次分享的内容是怎么来最大限制的防止消息丢失,顺带提一下消息的重发和重复消费。

    RabbitMQ 模型图

    RabbitMQ 模型.jpg

    ConfirmCallback和ReturnCallback

    在这个里我们主要实现了ConfirmCallback和ReturnCallback两个接口。这两个接口主要是用来发送消息后回调的。因为rabbit发送消息是只管发,至于发没发成功,发送方法不管。

    • ConfirmCallback:当消息成功到达exchange的时候触发的ack回调。
    • ReturnCallback:当消息成功到达exchange,但是没有队列与之绑定的时候触发的ack回调。发生网络分区会出现这种情况。

    在这里一定要把这两个开关打开, publisher-confirms="true" publisher-returns="true"。

    生产者端使用ConfirmCallback和ReturnCallback回调机制,最大限度的保证消息不丢失,对原有CorrelationData类进行扩展,来实现消息的重发,具体请看源码。

    消息的日志链路跟踪

    使用MQ来解耦服务,异步化处理一些复杂耗时逻辑,但是也带来了一个问题。由于异步化以后,排查问题就很不方便了,根本不知道这个消息什么时候消费,消费的日志也很不好排查。所以引入了Slf4j MDC机制将主线程的日志链路和消息的日志链路连起来,方便MQ问题的排查。

    RabbitSender

    import com.alibaba.fastjson.JSON;
    import com.wlqq.insurance.common.enums.MetricNameEnum;
    import com.wlqq.insurance.common.enums.SystemTypeEnum;
    import com.wlqq.insurance.common.log.core.FisLoggerFactory;
    import com.wlqq.insurance.common.mq.CorrelationData;
    import com.wlqq.insurance.common.service.AlertService;
    import org.slf4j.Logger;
    import org.slf4j.MDC;
    import org.springframework.amqp.AmqpException;
    import org.springframework.amqp.core.Message;
    import org.springframework.amqp.rabbit.core.RabbitTemplate;
    import org.springframework.beans.factory.InitializingBean;
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.beans.factory.annotation.Value;
    import org.springframework.util.Assert;
    import org.springframework.util.StringUtils;
    
    import java.util.UUID;
    
    /**
     * Rabbit 发送消息
     *
     * @author yuhao.wang
     */
    public class RabbitSender implements RabbitTemplate.ConfirmCallback, RabbitTemplate.ReturnCallback, InitializingBean {
        private final Logger logger = FisLoggerFactory.getLogger(RabbitSender.class);
    
        @Value("${mq.retry.count}")
        private int mqRetryCount;
    
        /**
         * 告警服务
         */
        @Autowired
        private AlertService alertService;
    
        /**
         * Rabbit MQ 客户端
         */
        private RabbitTemplate rabbitTemplate;
    
        /**
         * 发送MQ消息,异步
         *
         * @param exchangeName 交换机名称
         * @param routingKey   路由名称
         * @param message      发送消息体
         */
        public void sendMessage(String exchangeName, String routingKey, com.wlqq.insurance.common.mq.message.Message message) {
            Assert.notNull(message, "message 消息体不能为NULL");
            Assert.notNull(exchangeName, "exchangeName 不能为NULL");
            Assert.notNull(routingKey, "routingKey 不能为NULL");
            // 获取CorrelationData对象
            CorrelationData correlationData = this.correlationData(message, message.getMessageId());
            correlationData.setExchange(exchangeName);
            correlationData.setRoutingKey(routingKey);
            correlationData.setMessage(message);
    
            logger.info("发送MQ消息,消息ID:{},消息体:{}, exchangeName:{}, routingKey:{}",
                    correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey);
            // 发送消息
            this.convertAndSend(exchangeName, routingKey, message, correlationData);
        }
    
        /**
         * RPC方式,发送MQ消息
         *
         * @param exchangeName 交换机名称
         * @param routingKey   路由名称
         * @param message      发送消息体
         */
        public void sendAndReceiveMessage(String exchangeName, String routingKey, com.wlqq.insurance.common.mq.message.Message message) {
            Assert.notNull(message, "message 消息体不能为NULL");
            Assert.notNull(exchangeName, "exchangeName 不能为NULL");
            Assert.notNull(routingKey, "routingKey 不能为NULL");
            // 获取CorrelationData对象
            CorrelationData correlationData = this.correlationData(message, message.getMessageId());
            correlationData.setExchange(exchangeName);
            correlationData.setRoutingKey(routingKey);
            correlationData.setMessage(message);
    
            logger.info("发送MQ消息,消息ID:{},消息体:{}, exchangeName:{}, routingKey:{}",
                    correlationData.getId(), JSON.toJSONString(message), exchangeName, routingKey);
    
            rabbitTemplate.convertSendAndReceive(exchangeName, routingKey, message);
        }
    
        /**
         * 用于实现消息发送到RabbitMQ交换器后接收ack回调。
         * 如果消息发送确认失败就进行重试。
         *
         * @param correlationData
         * @param ack
         * @param cause
         */
        @Override
        public void confirm(org.springframework.amqp.rabbit.support.CorrelationData correlationData, boolean ack, String cause) {
            CorrelationData correlationDataExtends = null;
            if (correlationData instanceof CorrelationData) {
                correlationDataExtends = (CorrelationData) correlationData;
                if (correlationDataExtends.getMdcContainer() != null) {
                    // 日志链路跟踪
                    MDC.setContextMap(correlationDataExtends.getMdcContainer());
                }
            }
    
            // 消息回调确认失败处理
            if (!ack) {
                if (correlationDataExtends != null) {
                    //消息发送失败,就进行重试,重试过后还不能成功就记录到数据库
                    if (correlationDataExtends.getRetryCount() < mqRetryCount) {
                        logger.info("MQ消息发送失败,消息重发,消息ID:{},重发次数:{},消息体:{}", correlationDataExtends.getId(),
                                correlationDataExtends.getRetryCount(), JSON.toJSONString(correlationDataExtends.getMessage()));
    
                        // 将重试次数加一
                        correlationDataExtends.setRetryCount(correlationDataExtends.getRetryCount() + 1);
    
                        // 重发发消息
                        this.convertAndSend(correlationDataExtends.getExchange(), correlationDataExtends.getRoutingKey(),
                                correlationDataExtends.getMessage(), correlationDataExtends);
                    } else {
                        //消息重试发送失败,将消息放到数据库等待补发
                        logger.error("MQ消息重发失败,消息ID:{},消息体:{}", correlationData.getId(),
                                JSON.toJSONString(correlationDataExtends.getMessage()));
    
                        alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(),
                                correlationDataExtends.getExchange(), null);
                    }
                }
            } else {
                logger.info("消息发送成功,消息ID:{}", correlationData.getId());
            }
        }
    
        /**
         * 用于实现消息发送到RabbitMQ交换器,但无相应队列与交换器绑定时的回调。
         * 在脑裂的情况下会出现这种情况。
         */
        @Override
        public void returnedMessage(Message message, int replyCode, String replyText, String exchange, String routingKey) {
            // 反序列化消息
            Object msg = rabbitTemplate.getMessageConverter().fromMessage(message);
            if (msg instanceof com.wlqq.insurance.common.mq.message.Message) {
                // 日志链路跟踪
                MDC.setContextMap(((com.wlqq.insurance.common.mq.message.Message) msg).getMdcContainer());
            }
    
            logger.error("MQ消息发送失败,replyCode:{}, replyText:{},exchange:{},routingKey:{},消息体:{}",
                    replyCode, replyText, exchange, routingKey, JSON.toJSONString(msg));
    
            alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(), exchange, null);
        }
    
        /**
         * 消息相关数据(消息ID)
         *
         * @param message   消息体
         * @param messageId 消息ID
         * @return
         */
        private CorrelationData correlationData(Object message, String messageId) {
            // 消息ID默认使用UUID
            if (StringUtils.isEmpty(messageId)) {
                messageId = UUID.randomUUID().toString();
            }
            return new CorrelationData(messageId, message);
        }
    
        /**
         * 发送消息
         *
         * @param exchange        交换机名称
         * @param routingKey      路由key
         * @param message         消息内容
         * @param correlationData 消息相关数据(消息ID)
         * @throws AmqpException
         */
        private void convertAndSend(String exchange, String routingKey, final Object message, CorrelationData correlationData) {
            try {
                rabbitTemplate.convertAndSend(exchange, routingKey, message, correlationData);
            } catch (Exception e) {
                logger.error("MQ消息发送异常,消息ID:{},消息体:{}, exchangeName:{}, routingKey:{}",
                        correlationData.getId(), JSON.toJSONString(message), exchange, routingKey, e);
    
                alertService.postAlert(MetricNameEnum.SYSTEM_INTERNAL_EXCEPTION, SystemTypeEnum.MQ.name(), exchange, null);
            }
        }
    
        @Override
        public void afterPropertiesSet() throws Exception {
            rabbitTemplate.setConfirmCallback(this);
            rabbitTemplate.setReturnCallback(this);
        }
    
        public void setRabbitTemplate(RabbitTemplate rabbitTemplate) {
            this.rabbitTemplate = rabbitTemplate;
        }
    }
    

    CorrelationData

    import lombok.Data;
    import org.slf4j.MDC;
    
    import java.util.Map;
    
    /**
     * 发送消息的相关数据
     *
     * @author yuhao.wang
     */
    @Data
    public class CorrelationData extends org.springframework.amqp.rabbit.support.CorrelationData {
    
    
        /**
         * MDC容器
         * 获取父线程MDC中的内容,做日志链路
         */
        private Map<String, String> mdcContainer = MDC.getCopyOfContextMap();
    
        /**
         * 消息体
         */
        private volatile Object message;
    
        /**
         * 交换机名称
         */
        private String exchange;
    
        /**
         * 路由key
         */
        private String routingKey;
    
        /**
         * 重试次数
         */
        private int retryCount = 0;
    
        public CorrelationData(String id) {
            super(id);
        }
    
        public CorrelationData(String id, Object data) {
            this(id);
            this.message = data;
        }
    }
    
    

    Message

    /**
     * MQ消息的父类消息体
     *
     * @author yuhao.wang
     */
    @Data
    public class Message implements Serializable {
        private static final long serialVersionUID = -4731326195678504565L;
    
        /**
         * MDC容器
         * 获取父线程MDC中的内容,做日志链路
         */
        private Map<String, String> mdcContainer = MDC.getCopyOfContextMap();
    
        /**
         * 消息ID(消息的唯一标示)
         */
        private String messageId;
    }
    

    AbstractConsumer

    /**
     * 默认消费者
     *
     * @author yuhao.wang3
     */
    public abstract class AbstractConsumer implements MessageListener {
        private static final Logger LOGGER = FisLoggerFactory.getLogger(AbstractConsumer.class);
    
        @Override
        public void onMessage(Message msg) {
            String body = null;
    
            try {
                // 日志链路跟踪逻辑
                body = new String(msg.getBody(), "utf-8");
                DefaultMessage message = JSON.parseObject(body, DefaultMessage.class);
                Map<String, String> container = message.getMdcContainer();
                if (container != null) {
                    // 日志链路跟踪
                    MDC.setContextMap(message.getMdcContainer());
                }
            } catch (Exception e) {
                LOGGER.warn("没有找到MQ消息日志链路数据,无法做日志链路追踪");
            }
    
            try {
                // 处理消息逻辑
                doMessage(msg);
                LOGGER.info("成功处理MQ消息, 消息体:{}", body);
            } catch (Exception e) {
                LOGGER.error("处理MQ消息异常 {}, 消息体:{}", JSON.toJSONString(msg), body, e);
            }
        }
    
        /**
         * 处理消息的实现方法
         *
         * @param msg
         */
        public abstract void doMessage(Message msg);
    }
    

    源码

    https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases

    spring-boot-student-rabbitmq 工程

    相关文章

      网友评论

      • Kate_7a5d:给个git地址呗
        xiaolyuh:@Kate_7a5d https://github.com/wyh-spring-ecosystem-student/spring-boot-student/tree/releases?from=%E6%96%87%E7%AB%A0%E9%A1%B5%E5%86%85%E9%93%BE%E6%8E%A5
      • kg_eaf9:生产者重发发消息无效,确定这代码可用?
        xiaolyuh:@kg_eaf9 ConfirmCallback:当消息成功到达exchange的时候触发的ack回调。
        ReturnCallback:当消息成功到达exchange,但是没有队列与之绑定的时候触发的ack回调。基本上来说线上不可能出现这种情况,除非手动将已经存在的队列删掉,否则在测试阶段肯定能测试出
        kg_eaf9:// 获取CorrelationData对象
        CorrelationData correlationData = this.correlationData(message);
        correlationData.setExchange(exchangeName);
        correlationData.setRoutingKey(routingKey);
        correlationData.setMessage(message);
        // 发送消息,设置不存在的交换机和队列名称
        this.convertAndSend("notExist", "notExist", message, correlationData);
        如上,在发送消息时设置不存在的交换机和队列,但是correlationData中设置的是正确存在的,但是不会重发,RabbitMQ的可视化管理界面上也没有显示收到消息,消费者代码也没有运行。
        运行环境:RabbitMQ和项目都是在本地跑的。
        不知道原因,求博主指点
        xiaolyuh:这里确实有点问题,我忘记了吧exchange和Routingkey设置到CorrelationData里面了。现在已经改了 你可以看一下最新代码。谢谢了的宝贵意见

      本文标题:RabbitMQ 高可用优化

      本文链接:https://www.haomeiwen.com/subject/iiglmftx.html