美文网首页
spring-boot-starter-amqp部分源码分析

spring-boot-starter-amqp部分源码分析

作者: 无聊之园 | 来源:发表于2019-06-15 17:59 被阅读0次

    事情起因:
    1、rabbitmq原生的好像不支持多线程消费,而spring boot封装的starter-amqp是支持的。然后,我使用的时候,配置了多线程,但是碰到几个坑:
    1、服务启动的时候,消费者sleep无限秒,但是,还是只接收到了一条消息,还让我以为自己配置错了。
    2、后来,发现,服务启动后,再发送消息,是起到多线程效果了。然后就感到很奇怪了,来后,就打断点分析一下相关源码。

    spring:
      rabbitmq:
        listener:
          simple:
            concurrency: 5        #最小消息监听线程数
            max-concurrency: 5    #最大消息监听线程数
    
     public void processMessage1(String msg) {
            System.out.println("thread name:" + Thread.currentThread().getName());
            System.out.println(Thread.currentThread().getName() + " 接收到来自hello.queue1队列的消息:" + msg);
          try {
                Thread.sleep(10000000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            return;
        }
    

    这是spring ioc初始化的refresh方法,是在最后一个行,finishRefresh调用的。

    @Override
        public void refresh() throws BeansException, IllegalStateException {
            synchronized (this.startupShutdownMonitor) {
            ...
                    // Last step: publish corresponding event.
                    finishRefresh();
            ...
        }
    
    protected void finishRefresh() {
            // Clear context-level resource caches (such as ASM metadata from scanning).
            clearResourceCaches();
            // Initialize lifecycle processor for this context.
    //  初始化声明周期processor
            initLifecycleProcessor();
    
            //  就是这里调用的
            getLifecycleProcessor().onRefresh();
    
            // Publish the final event.
            publishEvent(new ContextRefreshedEvent(this));
    
            // Participate in LiveBeansView MBean, if active.
            LiveBeansView.registerApplicationContext(this);
        }
    
    protected void initLifecycleProcessor() {
            ConfigurableListableBeanFactory beanFactory = getBeanFactory();
            if (beanFactory.containsLocalBean(LIFECYCLE_PROCESSOR_BEAN_NAME)) {
                this.lifecycleProcessor =
                        beanFactory.getBean(LIFECYCLE_PROCESSOR_BEAN_NAME, LifecycleProcessor.class);
                if (logger.isDebugEnabled()) {
                    logger.debug("Using LifecycleProcessor [" + this.lifecycleProcessor + "]");
                }
            }
            else {
    // 这里提供了一个默认的生命周期processor
                DefaultLifecycleProcessor defaultProcessor = new DefaultLifecycleProcessor();
                defaultProcessor.setBeanFactory(beanFactory);
    // 把这个默认生命周期保存在到applicationcontext中
                this.lifecycleProcessor = defaultProcessor;
                beanFactory.registerSingleton(LIFECYCLE_PROCESSOR_BEAN_NAME, this.lifecycleProcessor);
                if (logger.isDebugEnabled()) {
                    logger.debug("Unable to locate LifecycleProcessor with name '" +
                            LIFECYCLE_PROCESSOR_BEAN_NAME +
                            "': using default [" + this.lifecycleProcessor + "]");
                }
            }
        }
    

    LifecycleProcessor接口就两个方法

    public interface LifecycleProcessor extends Lifecycle {
    
        /**
         * Notification of context refresh, e.g. for auto-starting components.
         */
            // spring refresh的时候,自动调用这个方法,启动组件
        void onRefresh();
        /**
         * Notification of context close phase, e.g. for auto-stopping components.
         */
        // 同理
        void onClose();
    
    }
    

    默认的生命周期process,实现了lifecycleProcess的这个方法,前面的spring的finish调用了这里。

    @Override
        public void onRefresh() {
            startBeans(true);
            this.running = true;
        }
    
    private void startBeans(boolean autoStartupOnly) {
    // 获取实现了lifecycle的所有的类,注意是lifecycle,不是lifecycleProcess
    // 这里有两个,rabbitTemplate和internalRabbitListenerEndpointRegistry这个
    // rabbit监听器注册器,这个监听器注册器很重要
            Map<String, Lifecycle> lifecycleBeans = getLifecycleBeans();
            Map<Integer, LifecycleGroup> phases = new HashMap<>();
            lifecycleBeans.forEach((beanName, bean) -> {
                if (!autoStartupOnly || (bean instanceof SmartLifecycle && ((SmartLifecycle) bean).isAutoStartup())) {
                    int phase = getPhase(bean);
                    LifecycleGroup group = phases.get(phase);
                    if (group == null) {
    // defaultLifecycleProcess维护了一个group组,然后这里把这个每个lifecycleProcess封装一下,放进去
    // 这里只有rabbitmq监听器注册器,会放进来
                        group = new LifecycleGroup(phase, this.timeoutPerShutdownPhase, lifecycleBeans, autoStartupOnly);
                        phases.put(phase, group);
                    }
                    group.add(beanName, bean);
                }
            });
            if (!phases.isEmpty()) {
                List<Integer> keys = new ArrayList<>(phases.keySet());
                Collections.sort(keys);
                for (Integer key : keys) {
                                    // 关键在这里,调用LifecycleGroup的start方法,这个LifecycleGroup封装了rabbitmq的监听器,注册器
                    phases.get(key).start();
                }
            }
        }
    
    public void start() {
                if (this.members.isEmpty()) {
                    return;
                }
                if (logger.isInfoEnabled()) {
                    logger.info("Starting beans in phase " + this.phase);
                }
                Collections.sort(this.members);
                for (LifecycleGroupMember member : this.members) {
                    if (this.lifecycleBeans.containsKey(member.name)) {
                                            // 这里调用defaultLifeCycleProcess的dostart方法
                        doStart(this.lifecycleBeans, member.name, this.autoStartupOnly);
                    }
                }
            }
    
    private void doStart(Map<String, ? extends Lifecycle> lifecycleBeans, String beanName, boolean autoStartupOnly) {
                    // 这里获得rabbit监听器注册器
            Lifecycle bean = lifecycleBeans.remove(beanName);
            if (bean != null && bean != this) {
                          // 获取rabbitmq监听器依赖的bean,这里没有
                String[] dependenciesForBean = getBeanFactory().getDependenciesForBean(beanName);
                for (String dependency : dependenciesForBean) {
                    doStart(lifecycleBeans, dependency, autoStartupOnly);
                }
                if (!bean.isRunning() &&
                        (!autoStartupOnly || !(bean instanceof SmartLifecycle) || ((SmartLifecycle) bean).isAutoStartup())) {
                    if (logger.isDebugEnabled()) {
                        logger.debug("Starting bean '" + beanName + "' of type [" + bean.getClass() + "]");
                    }
                    try {
                                      // 这里调用rabbitmq监听器的start方法,
                      // 这里终于进入重点了
                        bean.start();
                    }
                    catch (Throwable ex) {
                        throw new ApplicationContextException("Failed to start bean '" + beanName + "'", ex);
                    }
                    if (logger.isDebugEnabled()) {
                        logger.debug("Successfully started bean '" + beanName + "'");
                    }
                }
            }
        }
    

    再进入重点之前,总结一下lifecycle:
    spring的finishRefresh方法,会生成一个默认的lifecycleProcess,并注册到applicationContext中变量中去,然后,调用这个lifecycleProcess的onRefresh方法,然后lifecycleProcess会寻找所有的实现了lifecycle的类,然后调用他们的start方法。
    所以,lifecycle就是提供给spring组件,初始化的。

    这里进入了重点:
    调用rabbit监听器注册器的start方法

    @Override
        public void start() {
                      // getListenerContainers获取所有的消费者监听器
                    // 然后遍历
            for (MessageListenerContainer listenerContainer : getListenerContainers()) {
                        // 启动监听器,重点
                startIfNecessary(listenerContainer);
            }
        }
    
    @Override
        protected void doStart() throws Exception {
            if (getMessageListener() instanceof ListenerContainerAware) {
                Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames();
                if (expectedQueueNames != null) {
                    String[] queueNames = getQueueNames();
                    Assert.state(expectedQueueNames.size() == queueNames.length,
                            "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
                                    + Arrays.asList(queueNames));
                    boolean found = false;
                    for (String queueName : queueNames) {
                        if (expectedQueueNames.contains(queueName)) {
                            found = true;
                        }
                        else {
                            found = false;
                            break;
                        }
                    }
                    Assert.state(found, () -> "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
                            + Arrays.asList(queueNames));
                }
            }
            super.doStart();
            synchronized (this.consumersMonitor) {
                if (this.consumers != null) {
                    throw new IllegalStateException("A stopped container should not have consumers");
                }
    // 这里,会初始化线程池,重点,往下看
                int newConsumers = initializeConsumers();
                if (this.consumers == null) {
                    logger.info("Consumers were initialized and then cleared " +
                            "(presumably the container was stopped concurrently)");
                    return;
                }
                if (newConsumers <= 0) {
                    if (logger.isInfoEnabled()) {
                        logger.info("Consumers are already running");
                    }
                    return;
                }
                Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
                for (BlockingQueueConsumer consumer : this.consumers) {
                    AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
                    processors.add(processor);
                    getTaskExecutor().execute(processor);
                    if (getApplicationEventPublisher() != null) {
                        getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
                    }
                }
                for (AsyncMessageProcessingConsumer processor : processors) {
                    FatalListenerStartupException startupException = processor.getStartupException();
                    if (startupException != null) {
                        throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
                    }
                }
            }
        }
    
    protected int initializeConsumers() {
            int count = 0;
            synchronized (this.consumersMonitor) {
                if (this.consumers == null) {
                    this.cancellationLock.reset();
    // this.concurrentConsumers是我们配置的concurrency线程数
    // 可以看到,有多个线程数就有多少个,堵塞队列消费者BlockingQueueConsumer的set
    // BlockingQueueConsumer就是真正消费消息的
                    this.consumers = new HashSet<BlockingQueueConsumer>(this.concurrentConsumers);
    // 有多少个线程,就创建多少个堵塞队列消费者BlockingQueueConsumer
                    for (int i = 0; i < this.concurrentConsumers; i++) {
                        BlockingQueueConsumer consumer = createBlockingQueueConsumer();
                        this.consumers.add(consumer);
                        count++;
                    }
                }
            }
            return count;
        }
    

    这是堵塞队列消费者的构造方法

    public BlockingQueueConsumer(ConnectionFactory connectionFactory,
                MessagePropertiesConverter messagePropertiesConverter,
                ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
                boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
                Map<String, Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues) {
            this.connectionFactory = connectionFactory;
            this.messagePropertiesConverter = messagePropertiesConverter;
            this.activeObjectCounter = activeObjectCounter;
            this.acknowledgeMode = acknowledgeMode;
            this.transactional = transactional;
            this.prefetchCount = prefetchCount;
            this.defaultRequeueRejected = defaultRequeueRejected;
            if (consumerArgs != null && consumerArgs.size() > 0) {
                this.consumerArgs.putAll(consumerArgs);
            }
            this.noLocal = noLocal;
            this.exclusive = exclusive;
            this.queues = Arrays.copyOf(queues, queues.length);
                    // 这里看到,堵塞队列的大小就是prefetchCount设定的,
                    // prefetchCount默认是250,也就是这个消费者,会堵塞250个消息
            this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);
        }
    

    其实,这里就已经发现了我前面的问题。
    解答是:服务一启动的时候,消费者的一个线程,就会一下拉取最大250个消息,所以一下把rabbitmq服务器的消息拉取完了。(其实不是拉取,是有一个线程往这个队列里循环放最大250个消息)
    所以,这个消费者线程堵塞了,其他消息者线程也拿不到消息,所以也就没有并发执行。
    而,服务启动后,生产者再发送一条消息,则会随机取一个线程执行,也就达到并发的效果。

    回到这个dostart方法

    protected void doStart() throws Exception {
            if (getMessageListener() instanceof ListenerContainerAware) {
                Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames();
                if (expectedQueueNames != null) {
                    String[] queueNames = getQueueNames();
                    Assert.state(expectedQueueNames.size() == queueNames.length,
                            "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
                                    + Arrays.asList(queueNames));
                    boolean found = false;
                    for (String queueName : queueNames) {
                        if (expectedQueueNames.contains(queueName)) {
                            found = true;
                        }
                        else {
                            found = false;
                            break;
                        }
                    }
                    Assert.state(found, () -> "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
                            + Arrays.asList(queueNames));
                }
            }
            super.doStart();
            synchronized (this.consumersMonitor) {
                if (this.consumers != null) {
                    throw new IllegalStateException("A stopped container should not have consumers");
                }
    // 这里初始化了所有的堵塞队列消费者
                int newConsumers = initializeConsumers();
                if (this.consumers == null) {
                    logger.info("Consumers were initialized and then cleared " +
                            "(presumably the container was stopped concurrently)");
                    return;
                }
                if (newConsumers <= 0) {
                    if (logger.isInfoEnabled()) {
                        logger.info("Consumers are already running");
                    }
                    return;
                }
    
                Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
    // 这里启动所有的堵塞队列消费者
                for (BlockingQueueConsumer consumer : this.consumers) {
                    AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
                    processors.add(processor);
    // getTaskExecutor()获取单线程线程池,然后启动线程,执行process的run方法
                    getTaskExecutor().execute(processor);
                    if (getApplicationEventPublisher() != null) {
                        getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
                    }
                }
                for (AsyncMessageProcessingConsumer processor : processors) {
                    FatalListenerStartupException startupException = processor.getStartupException();
                    if (startupException != null) {
                        throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
                    }
                }
            }
        }
    

    总结,设置了多少个concurrency最大监听器线程数,就会有多少个堵塞队列消费者,然后每个堵塞队列消费者,启动一个线程。

    @Override
            public void run() {
            ...
    // 轮询,查看是否来了消息
                    while (isActive(this.consumer) || this.consumer.hasDelivery() || !this.consumer.cancelled()) {
                        try {
    // 这里重点,有消息则true,没消息则false
                            boolean receivedOk = receiveAndExecute(this.consumer); // At least one message received
                            if (SimpleMessageListenerContainer.this.maxConcurrentConsumers != null) {
                                if (receivedOk) {
                                    if (isActive(this.consumer)) {
                                        consecutiveIdles = 0;
                                        if (consecutiveMessages++ > SimpleMessageListenerContainer.this.consecutiveActiveTrigger) {
                                            considerAddingAConsumer();
                                            consecutiveMessages = 0;
                                        }
                                    }
                                }
                                else {
                                    consecutiveMessages = 0;
                                    if (consecutiveIdles++ > SimpleMessageListenerContainer.this.consecutiveIdleTrigger) {
                                        considerStoppingAConsumer(this.consumer);
                                        consecutiveIdles = 0;
                                    }
                                }
                            }
                            long idleEventInterval = getIdleEventInterval();
                            if (idleEventInterval > 0) {
                                if (receivedOk) {
                                    updateLastReceive();
                                }
                                else {
                                    long now = System.currentTimeMillis();
                                    long lastAlertAt = SimpleMessageListenerContainer.this.lastNoMessageAlert.get();
                                    long lastReceive = getLastReceive();
                                    if (now > lastReceive + idleEventInterval
                                            && now > lastAlertAt + idleEventInterval
                                            && SimpleMessageListenerContainer.this.lastNoMessageAlert
                                            .compareAndSet(lastAlertAt, now)) {
                                        publishIdleContainerEvent(now - lastReceive);
                                    }
                                }
                            }
                        }
                        
            }
    

    最后,到这里,

    private boolean doReceiveAndExecute(BlockingQueueConsumer consumer) throws Throwable { //NOSONAR
    
            Channel channel = consumer.getChannel();
    
            for (int i = 0; i < this.txSize; i++) {
    
                logger.trace("Waiting for message from consumer.");
                Message message = consumer.nextMessage(this.receiveTimeout);
                if (message == null) {
                    break;
                }
                try {
    // 有消息,则反射调用我们定义的消费者
                    executeListener(channel, message);
                }
            ...
            }
    

    然后到这里

    public Message nextMessage(long timeout) throws InterruptedException, ShutdownSignalException {
            if (logger.isTraceEnabled()) {
                logger.trace("Retrieving delivery for " + this);
            }
            checkShutdown();
            if (this.missingQueues.size() > 0) {
                checkMissingQueues();
            }
    // 从堵塞队列中poll出元素,没有元素,则堵塞,一秒,这个时间receiveTimeout是可以设置的
            Message message = handle(this.queue.poll(timeout, TimeUnit.MILLISECONDS));
            if (message == null && this.cancelled.get()) {
                throw new ConsumerCancelledException();
            }
            return message;
        }
    
    

    至此:整个消费者的过程,分析完了。
    总结:每个消费者线程,轮询查看是否有消息,默认每1秒轮询一次。有消息,则调用我们定义的消费者。

    关于消费者线程:
    假设,有两个消费者,然后消费者线程定义为,则总共有10个消费者线程。

    消费者发送消息,会来到这里,然后往我们的堵塞队列消费者,添加消息

    @Override
            public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
                    throws IOException {
                if (logger.isDebugEnabled()) {
                    logger.debug("Storing delivery for consumerTag: '"
                            + consumerTag + "' with deliveryTag: '" + envelope.getDeliveryTag() + "' in "
                            + BlockingQueueConsumer.this);
                }
                try {
                    if (BlockingQueueConsumer.this.abortStarted > 0) {
                        if (!BlockingQueueConsumer.this.queue.offer(new Delivery(consumerTag, envelope, properties, body),
                                BlockingQueueConsumer.this.shutdownTimeout, TimeUnit.MILLISECONDS)) {
                            RabbitUtils.setPhysicalCloseRequired(getChannel(), true);
                            // Defensive - should never happen
                            BlockingQueueConsumer.this.queue.clear();
                            getChannel().basicNack(envelope.getDeliveryTag(), true, true);
                            getChannel().basicCancel(consumerTag);
                            try {
                                getChannel().close();
                            }
                            catch (TimeoutException e) {
                                // no-op
                            }
                        }
                    }
                    else {
    // 消费者发送消息,会来到这里,然后往我们的堵塞队列消费者,添加消息
                        BlockingQueueConsumer.this.queue.put(new Delivery(consumerTag, envelope, properties, body));
                    }
                }
                catch (InterruptedException e) {
                    Thread.currentThread().interrupt();
                }
            }
    

    调试过程遇到的问题:
    BlockingQueueConsumer的queue,看到发送消息的时候,offer进去了,但是while遍历的时候,queue的size都是0。然后queue的任何地方,打断点都没有停。
    结果:这是多线程导致的问题,blockQueueConsumer有多少线程拥有了这个对象,然后看到的是不同对象的BlockingQueueConsumer的queue。

    相关文章

      网友评论

          本文标题:spring-boot-starter-amqp部分源码分析

          本文链接:https://www.haomeiwen.com/subject/sjibfctx.html