美文网首页
【RabbitMQ-9】自定义配置线程池(线程池资源不足-MQ初

【RabbitMQ-9】自定义配置线程池(线程池资源不足-MQ初

作者: 小胖学编程 | 来源:发表于2020-11-16 18:40 被阅读0次

    1. 源码注意点

    源码一:启动消费者

    @Override 
    protected void doStart() {
        checkListenerContainerAware();
        super.doStart();
        synchronized(this.consumersMonitor) {
            if (this.consumers != null) {
                throw new IllegalStateException("A stopped container should not have consumers");
            }
            int newConsumers = initializeConsumers();
            if (this.consumers == null) {
                logger.info("Consumers were initialized and then cleared " + "(presumably the container was stopped concurrently)");
                return;
            }
            if (newConsumers <= 0) {
                if (logger.isInfoEnabled()) {
                    logger.info("Consumers are already running");
                }
                return;
            }
            Set < AsyncMessageProcessingConsumer > processors = new HashSet < AsyncMessageProcessingConsumer > ();
            //每一个消费者,创建ConcurrentConsumers个线程
            for (BlockingQueueConsumer consumer: this.consumers) {
                AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
                processors.add(processor);
                //使用配置的线程池去开启线程
                //(便会执行run方法,run方法中启动成功会使得processor的CountDownLatch-1)
                getTaskExecutor().execute(processor);
                if (getApplicationEventPublisher() != null) {
                    getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
                }
            }
            //判断所有的线程是否执行run()方法启动消费者成功?没有成功的话,阻塞,直至所有消费者成功。
            waitForConsumersToStart(processors);
        }
    }
    

    若核心线程数满了,但是依旧有消费者等待启动,那么会在waitForConsumersToStart阻塞。

    源码二:串行阻塞

    private void waitForConsumersToStart(Set < AsyncMessageProcessingConsumer > processors) {
        for (AsyncMessageProcessingConsumer processor: processors) {
            FatalListenerStartupException startupException = null;
            try {
                startupException = processor.getStartupException();
            } catch(TimeoutException e) {
                throw RabbitExceptionTranslator.convertRabbitAccessException(e);
            } catch(InterruptedException e) {
                Thread.currentThread().interrupt();
                throw RabbitExceptionTranslator.convertRabbitAccessException(e);
            }
            if (startupException != null) {
                throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
            }
        }
    }
    

    源码三:使用CountDownLatch阻塞

    private FatalListenerStartupException getStartupException() throws TimeoutException,
    InterruptedException {
        if (!this.start.await(SimpleMessageListenerContainer.this.consumerStartTimeout, TimeUnit.MILLISECONDS)) {
            logger.error("Consumer failed to start in " + SimpleMessageListenerContainer.this.consumerStartTimeout + " milliseconds; does the task executor have enough threads to support the container " + "concurrency?");
        }
        return this.startupException;
    }
    

    启动的线程是串行的阻塞。

    例如:线程池只存在1个线程,但某个队列消费者需要10个线程。

    1. 创建消费者线程;
    2. 使用配置的线程池启动消费者;
    3. 发布创建消费者的消息;
    4. 串行阻塞判断所有消费者是否创建完毕(默认60s);
    5. 理论是等待9*60s的时间,唯一的消费者才会开始执行;

    注意点:

    1. 队列抢占线程池线程顺序是按队列初始化顺序决定的,即先初始化的队列先占用线程池资源。若线程不足,MQ打印Consumer failed to start in 60000 milliseconds; does the task executor have enough threads to support the container concurrency?信息。

    2. 配置的线程池资源被消费者占用后,是不会被释放的,while循环会一直监听MQ消息。

    配置MQ的线程池不应该配置阻塞队列,因为getTaskExecutor().execute(processor);使用线程池启动线程,若核心线程满了之后,会使用阻塞队列。而使用阻塞队列,会导致消费者不能被启动。

    2. 实现方式

    配置线程池模式:

    @Slf4j
    @Configuration
    public class RabbitConfig {
    
        @Autowired
        private CachingConnectionFactory connectionFactory;
    
            @Bean(name = "singleListenerContainer")
        public SimpleRabbitListenerContainerFactory listenerContainerFactory() {
            SimpleRabbitListenerContainerFactory factory = new SimpleRabbitListenerContainerFactory();
            /* setConnectionFactory:设置spring-amqp的ConnectionFactory。 */
            factory.setConnectionFactory(connectionFactory);
            factory.setConcurrentConsumers(2);
            factory.setPrefetchCount(1);
            factory.setDefaultRequeueRejected(true);
            //使用自定义线程池来启动消费者。
            factory.setTaskExecutor(taskExecutor());
            factory.setAcknowledgeMode(AcknowledgeMode.AUTO);
            return factory;
        }
    
    
        @Bean("correctTaskExecutor")
        @Primary
        public TaskExecutor taskExecutor() {
            ThreadPoolTaskExecutor executor = new MyThreadPoolTaskExecutor();
            // 设置核心线程数
            executor.setCorePoolSize(100);
            // 设置最大线程数
            executor.setMaxPoolSize(100);
            // 设置队列容量
            executor.setQueueCapacity(0);
            // 设置线程活跃时间(秒)
            executor.setKeepAliveSeconds(300);
            // 设置默认线程名称
            executor.setThreadNamePrefix("thread-xx-");
            // 设置拒绝策略rejection-policy:当pool已经达到max size的时候,丢弃
            executor.setRejectedExecutionHandler(new ThreadPoolExecutor.DiscardPolicy());
            // 等待所有任务结束后再关闭线程池
            executor.setWaitForTasksToCompleteOnShutdown(true);
            return executor;
        }
    
    }
    

    3. MQ自动扩容的影响

    上面说到,mq在启动时创建消费者时由于线程池资源不足,会导致阻塞(影响该queue的消费消息)。

    那么若是代码中配置了factory.setMaxConcurrentConsumers(2);,扩容时发现线程池资源不足,有什么影响呢?

    3.1 源码分析

    1. 消费者线程循环的消费消息

    源码位置org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.AsyncMessageProcessingConsumer#run

    @Override // NOSONAR - complexity - many catch blocks
    public void run() { // NOSONAR - line count
        if (!isActive()) {
            return;
        }
        ...
        try {
            initialize();
            //每个消费者线程循环的去获取消息
            while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
                mainLoop();
            }
        } ...
    }
    
    1. 循环体的操作

    注意receiveAndExecute()方法的返回值是checkAdjust()方法的请求参数,那么理解MQ动态扩容,就必须先明白receiveAndExecute()的逻辑以及返回值的含义。

    private void mainLoop() throws Exception { // NOSONAR Exception
        try {
            //该方法是获取消息,并执行业务操作(并发送ACK或NACK到MQ)。返回值true表示已经消费消息;false表示未获取到消息
            boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
            //判断是否配置了maxConcurrentConsumers,是否进行动态扩容
            if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
                checkAdjust(receivedOk);
            }
            ...
        }
    }
    

    2.1 receiveAndExecute—业务逻辑

    该方法会执行业务逻辑,并发送ACK或NACK到MQ中。完成一个消息的消费。
    但是即使发送ACK后,依旧在mainLoop()循环中,需要完成后续逻辑才能消费下一个消息。(注:不是向MQ发送ACK或NACK后立即去消费后续消息!!!)

    private boolean receiveAndExecute(final BlockingQueueConsumer consumer) throws Exception { // NOSONAR
        PlatformTransactionManager transactionManager = getTransactionManager();
        if (transactionManager != null) {...事务操作,不关注
        }
        //接受消息并进行处理
        return doReceiveAndExecute(consumer);
    }
    

    若是执行nextMessage()没有获取到消息,那么执行break操作,最终会导致上面的receiveAndExecute()方法返回false。而receiveAndExecute()的值可以决定是否动态扩容

    private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Exception { //NOSONAR
        Channel channel = consumer.getChannel();
        //默认txSize=1
        for (int i = 0; i < this.txSize; i++) {
    
            //在内存队列中获取消息
            Message message = consumer.nextMessage(this.receiveTimeout);
            //未获取到消息,开始下次循环
            if (message == null) {
                break;
            }
            try {
                //执行业务逻辑  
                executeListener(channel, message);
            } 
           ...catch操作,不关注
        }
        //没有获取到消息,这个方法会返回false
        return consumer.commitIfNecessary(isChannelLocallyTransacted());
    }
    
    public boolean commitIfNecessary(boolean locallyTransacted) throws IOException {
        //此处直接返回false
        if (this.deliveryTags.isEmpty()) {
            return false;
        }
        ...
        return true;
    }
    

    在内存中获取消息

    由于配置了setPrefetchCount参数,所以内存会去MQ中预取配置的消息数,放到本地的BlockingQueue中。
    配置详见:
    【RabbitMQ-2】RabbitMQ的并发参数(concurrency和prefetch)

    未获取到消息

    public Message nextMessage(long timeout) throws InterruptedException,
    ShutdownSignalException {
        if (logger.isTraceEnabled()) {
            logger.trace("Retrieving delivery for " + this);
        }
        checkShutdown();
        if (this.missingQueues.size() > 0) {
            checkMissingQueues();
        }
        //poll的API描述:检索并删除此队列的头,等待指定的等待时间(如有必要)使元素变为可用。
        Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
        //cancelled默认false不会执行改逻辑
        if (message == null && this.cancelled.get()) {
            throw new ConsumerCancelledException();
        }
        //未获取到消息返回null。
        return message;
    }
    

    2.2 checkAdjust()—动态扩容业务

    由上面源码可以若是没有获取到消息,receivedOk返回false(注:若是获取到消息,但是NACK,receivedOk返回值依旧是true)。

    如何保证是连续获取或者连续空转的?
    答案:因为mainloop()一直循环,每次均在本地queue获取消息(最长阻塞1s)。若连续9次均未获取到消息,第10次获取到消息,那么会重置consecutiveIdles=0

    private void checkAdjust(boolean receivedOk) {
        //成功获取到消息
        if (receivedOk) {
            if (isActive(this.consumer)) {
                //连续空转标识设置为0
                this.consecutiveIdles = 0;
                //consecutiveActiveTrigger默认为10
                if (this.consecutiveMessages++>SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
                   //开启一个消费者线程
                    considerAddingAConsumer();
                    //练习消费的标识设置为0
                    this.consecutiveMessages = 0;
                }
            }
        } else {
            this.consecutiveMessages = 0;
            if (this.consecutiveIdles++>SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
                considerStoppingAConsumer(this.consumer);
                this.consecutiveIdles = 0;
            }
        }
    }
    

    开启一个消费者线程:

    private void considerAddingAConsumer() {
        //加锁
        synchronized(this.consumersMonitor) {
          //若是当前consumers数量小于配置maxConcurrentConsumers
            if (this.consumers != null && this.maxConcurrentConsumers != null && this.consumers.size() < this.maxConcurrentConsumers) {
                long now = System.currentTimeMillis();
                //开启消费者有间隔时间
                if (this.lastConsumerStarted + this.startConsumerMinInterval < now) {
                    //增加消费者。
                    this.addAndStartConsumers(1);
                    this.lastConsumerStarted = now;
                }
            }
        }
    }
    

    开启消费者的操作

    protected void addAndStartConsumers(int delta) {
        synchronized(this.consumersMonitor) {
            if (this.consumers != null) {
                //每一次循环均是创建一个消费者
                for (int i = 0; i < delta; i++) {
                     //判断是否创建消费者
                    if (this.maxConcurrentConsumers != null && this.consumers.size() >= this.maxConcurrentConsumers) {
                        break;
                    }
                    //创建消费者
                    BlockingQueueConsumer consumer = createBlockingQueueConsumer();
                    //(核心)属性的consumers+1
                    this.consumers.add(consumer);
                    AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Starting a new consumer: " + consumer);
                    }  
                    //使用内部的线程池执行
                    getTaskExecutor().execute(processor);
                    //发布创建消费者事件
                    if (this.getApplicationEventPublisher() != null) {
                        this.getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
                    }
                    try {
                        //线程执行完run方法后,线程中的CountDownLatch-1。
                        //若线程池没有资源,那么会在此处阻塞(默认60s)
                        //阻塞完毕,startupException返回null。即成功创建
                        FatalListenerStartupException startupException = processor.getStartupException();
                        //若是线程池资源不足,只是返回null,不会执行下面分支。
                        if (startupException != null) {
                            this.consumers.remove(consumer);
                            throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
                        }
                    } catch(InterruptedException ie) {
                        Thread.currentThread().interrupt();
                    } catch(Exception e) {
                        consumer.stop();
                        logger.error("Error starting new consumer", e);
                        this.cancellationLock.release(consumer);
                        this.consumers.remove(consumer);
                    }
                }
            }
        }
    }
    
    1. 上述代码中,创建消费者线程是同步的流程,即某个消费者线程加锁去创建。某创建时线程池没有资源,会阻塞消费者线程。
    2. 若线程池没有资源,阻塞完毕后,只是打印异常日志,并抛出异常,此时内存中消费者个数为n+1个,但是只有n个线程可以消费消息
    3. 当连续10次空转时consecutiveIdles =10,且消费者线程n+1,会回收临时扩展的消费者线程。
    private void considerStoppingAConsumer(BlockingQueueConsumer consumer) {
        synchronized(this.consumersMonitor) {
            if (this.consumers != null && this.consumers.size() > this.concurrentConsumers) {
                long now = System.currentTimeMillis();
                if (this.lastConsumerStopped + this.stopConsumerMinInterval < now) {
                    //回收消费者的核心方式
                    consumer.basicCancel(true);
                    //本地消费者集合移除消费者
                    this.consumers.remove(consumer);
                    if (logger.isDebugEnabled()) {
                        logger.debug("Idle consumer terminating: " + consumer);
                    }
                    this.lastConsumerStopped = now;
                }
            }
        }
    }
    

    上面说到,内存中的消费者数量n+1,但是有效的消费者n个。当回收消费者时会回收有效的消费者使得内存消费者数量n个,有效消费者数量n-1个。

    若是线程池资源不足,且配置了消费者动态扩展参数后,最终会导致有效的消费者数量为0,导致消息的大量积压!!!

    注:RabbitMQ使用默认的new SimpleAsyncTaskExecutor()开启消费者线程,即每当使用线程是,均是new出来的。

    总结:不推荐使用自定义配置的线程池,若使用,每次增加队列时均需要注意配置好线程数。

    相关文章

      网友评论

          本文标题:【RabbitMQ-9】自定义配置线程池(线程池资源不足-MQ初

          本文链接:https://www.haomeiwen.com/subject/ujvabktx.html