美文网首页
【RabbitMQ-13】惊!线上的RabbitMQ消费者自己k

【RabbitMQ-13】惊!线上的RabbitMQ消费者自己k

作者: 小胖学编程 | 来源:发表于2021-08-10 17:27 被阅读0次

    在很早之前,项目出现了一个问题,即有一个消费队列。其中数据处理的过程中出现了OOM,然后导致了RabbitMQ消费者全部挂掉,最终导致消息大量堆积。

    在排查这个问题的时候,我深入了解了下RabbitMq源码,最终定位并解决了这个问题。

    版本信息:SpringBoot 2.0.4.RELEASE
    因为SpringBoot版本不同,RabbitMq源码有一些改动。所以要确定版本号。

    1. 问题定位

    某个消息出现error级别的异常后,对应的消费者会关闭,且返回unack。消息会回到Mq并转发给下一个消费者去消费。

    当然,这个消息在下一个消费者中很大程度上也会出现error异常。故一个error级别的异常,会将整个队列所有的消息者全关闭

    1.1 源码分析

    每创建一个消息者,则在线程池中获取一个线程。去执行AsyncMessageProcessingConsumer

    创建消费者.png

    然后消费者将“死循环”式的监听消费消息。
    org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer

    消息队列是死循环去消费数据.png

    当出现error异常时,如代码所示,会stop掉消费者。

    stop消费者.png

    2. 解决方案

    2.1 方式一:监听事件,重启消费者

    在上图可知,当消费者被终止后,会发送Event事件,那么监听事件后重启消费者即可。

    @Slf4j
    public class ListenerContainerConsumerFailedEventListener implements ApplicationListener<ListenerContainerConsumerFailedEvent> {
    
        /**
         * 重启消费者失败的消费者方法
         */
        private Consumer<RestartConsumerFailEvent> restartConsumerFailEventConsumer;
    
        /**
         * 开启自动重启的回调方法
         * true:表示将要重启消费者
         */
        private Function<ListenerContainerConsumerFailedEvent, Boolean> failedEventListenerBooleanFunction;
    
    
        public void setRestartConsumerFailEventConsumer(Consumer<RestartConsumerFailEvent> restartConsumerFailEventConsumer) {
            this.restartConsumerFailEventConsumer = restartConsumerFailEventConsumer;
        }
    
        public void setFailedEventListenerBooleanFunction(Function<ListenerContainerConsumerFailedEvent, Boolean> failedEventListenerBooleanFunction) {
            this.failedEventListenerBooleanFunction = failedEventListenerBooleanFunction;
        }
    
        @Override
        public void onApplicationEvent(ListenerContainerConsumerFailedEvent event) {
            log.error("消费者失败事件发生:{}", event);
            //判断是否需要进行重试
            Boolean restart = false;
            if (failedEventListenerBooleanFunction != null) {
                restart = failedEventListenerBooleanFunction.apply(event);
            }
            if (restart) {
                log.error(String.format("Stopping container from aborted consumer. Reason::%s.",
                        event.getReason()), event.getThrowable());
                //获取到容器
                SimpleMessageListenerContainer container = (SimpleMessageListenerContainer) event.getSource();
                String queueNames = Arrays.toString(container.getQueueNames());
                // 重启
                try {
                    restart(container);
                    log.info("重启队列{}的监听成功!", queueNames);
                } catch (Exception e) {
                    log.error(String.format("重启队列%s的监听失败!", queueNames), e);
                    //发布事件
                    if (restartConsumerFailEventConsumer != null) {
                        RestartConsumerFailEvent restartConsumerFailEvent = new RestartConsumerFailEvent(event.getSource());
                        restartConsumerFailEvent.setThrowable(e);
                        restartConsumerFailEventConsumer.accept(restartConsumerFailEvent);
                    }
                }
            }
    
        }
    
        /**
         * 重启消费者
         *
         * @param container 容器对象
         */
        private void restart(SimpleMessageListenerContainer container) {
            try {
                Thread.sleep(30000);
            } catch (InterruptedException e) {
                log.error("", e);
            }
            Assert.state(!container.isRunning(), String.format("监听容器%s正在运行!", container));
            //重启
            container.start();
        }
    
    }
    

    缺点是:触发error异常的消息依旧没有被消费掉,依旧可能会将重启的消费者给kill掉。

    2.2 方式二:捕获Error异常

    首先实现拦截器,当出现Error异常后,捕获处理异常,并向外抛出事件通知。

    @Slf4j
    public class ErrorHandlerInterceptor implements MethodInterceptor, Serializable {
    
        private ApplicationContext applicationContext;
    
        public void setApplicationContext(ApplicationContext applicationContext) {
            this.applicationContext = applicationContext;
        }
    
        /**
         * 当抛出Error异常后,处理方案。
         */
        @Override
        public Object invoke(MethodInvocation invocation) throws Throwable {
            Object proceed = null;
            try {
                proceed = invocation.proceed();
            } catch (Error e) {
                log.error("", e);
                //遇到Error异常后的处理方案
                MessageErrorEvent messageErrorEvent = new MessageErrorEvent(applicationContext, RabbitUtil.getMessage(invocation), e);
                applicationContext.publishEvent(messageErrorEvent);
            }
            return proceed;
        }
    }
    

    事件信息:

    public class MessageErrorEvent extends ApplicationContextEvent {
    
        /**
         * 消息对象
         */
        private Message message;
    
        /**
         * Error异常信息
         */
        private Error error;
    
        /**
         * Create a new ContextStartedEvent.
         *
         * @param source the {@code ApplicationContext} that the event is raised for
         *               (must not be {@code null})
         */
        public MessageErrorEvent(ApplicationContext source) {
            super(source);
        }
    
        public MessageErrorEvent(ApplicationContext source, Message message, Error error) {
            super(source);
            this.message = message;
            this.error = error;
        }
    
        public Message getMessage() {
            return message;
        }
    
        public void setMessage(Message message) {
            this.message = message;
        }
    
        public Error getError() {
            return error;
        }
    
        public void setError(Error error) {
            this.error = error;
        }
    }
    

    关键点:将拦截器设置到SimpleRabbitListenerContainerFactory中:

    @EnableRabbit
    @Configuration
    public class RabbitConfiguration {
    
        @Autowired
        private ObjectProvider<ErrorHandlerInterceptor> errorHandlerInterceptorObjectProvider;
    
        @Bean(name = "sealListenerContainer")
        public SimpleRabbitListenerContainerFactory listenerContainerFactory(CachingConnectionFactory connectionFactory) {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            factory.setConnectionFactory(connectionFactory);
            factory.setConcurrentConsumers(1);
            factory.setMaxConcurrentConsumers(10);
            factory.setPrefetchCount(250);
            /* 设置当rabbitmq收到nack/reject确认信息时的处理方式,设为true,扔回queue头部,设为false,丢弃。 */
            factory.setDefaultRequeueRejected(true);
            //慢消息触发事件通知
            List<Advice> adviceList = new ArrayList<>();
      
            ErrorHandlerInterceptor errorHandlerInterceptor = errorHandlerInterceptorObjectProvider.getIfAvailable();
            if (errorHandlerInterceptor != null) {
                adviceList.add(errorHandlerInterceptor);
            }
            //加入拦截器配置
            List<MessageInterceptor> messageInterceptors = messageInterceptorsObjectProvider.getIfAvailable();
            if (!CollectionUtils.isEmpty(messageInterceptors)) {
                for (MessageInterceptor messageInterceptor : messageInterceptors) {
                    if (messageInterceptor != null) {
                        adviceList.add(messageInterceptor);
                    }
                }
            }
            factory.setAdviceChain(adviceList.toArray(new Advice[adviceList.size()]));
            //自动确认
            factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
            return factory;
        }
    }
    

    由此,可以将通过责任链的方式,进行AOP处理,使得消费者不对外抛出Error级别的异常。

    相关文章

      网友评论

          本文标题:【RabbitMQ-13】惊!线上的RabbitMQ消费者自己k

          本文链接:https://www.haomeiwen.com/subject/vcxwiktx.html