美文网首页
SpringRabbitmq-MessageListenerCo

SpringRabbitmq-MessageListenerCo

作者: lazyguy | 来源:发表于2018-09-02 20:00 被阅读0次

    MessageListenerContainer

    定义2个方法:

    void setupMessageListener(Object messageListener); 设置messageListener
    MessageConverter getMessageConverter(); 得到MessageConverter(用于转换接收到的Message的)
    

    AbstractMessageListenerContainer

        
        static final int DEFAULT_FAILED_DECLARATION_RETRY_INTERVAL = 5000;
            
        public static final boolean DEFAULT_DEBATCHING_ENABLED = true;
    
        public static final int DEFAULT_PREFETCH_COUNT = 250;
    
        /**
         * The default recovery interval: 5000 ms = 5 seconds.
         */
        public static final long DEFAULT_RECOVERY_INTERVAL = 5000;
    
        public static final long DEFAULT_SHUTDOWN_TIMEOUT = 5000;
    
        private final ContainerDelegate delegate = this::actualInvokeListener;
    
        protected final Object consumersMonitor = new Object(); //NOSONAR
    
        private final Map<String, Object> consumerArgs = new HashMap<String, Object>();
    
        private ContainerDelegate proxy = this.delegate;
    
        private long shutdownTimeout = DEFAULT_SHUTDOWN_TIMEOUT;
    
        private ApplicationEventPublisher applicationEventPublisher;
    
        private PlatformTransactionManager transactionManager;
    
        private TransactionAttribute transactionAttribute = new DefaultTransactionAttribute();
    
        private String beanName;
    
        private Executor taskExecutor = new SimpleAsyncTaskExecutor();
    
        private boolean taskExecutorSet;
    
        private BackOff recoveryBackOff = new FixedBackOff(DEFAULT_RECOVERY_INTERVAL, FixedBackOff.UNLIMITED_ATTEMPTS);
    
        private MessagePropertiesConverter messagePropertiesConverter = new DefaultMessagePropertiesConverter();
    
        private RabbitAdmin rabbitAdmin;
    
        private boolean missingQueuesFatal = true;
    
        private boolean missingQueuesFatalSet;
    
        private boolean possibleAuthenticationFailureFatal = true;
    
        private boolean possibleAuthenticationFailureFatalSet;
    
        private boolean autoDeclare = true;
    
        private boolean mismatchedQueuesFatal = false;
    
        private long failedDeclarationRetryInterval = DEFAULT_FAILED_DECLARATION_RETRY_INTERVAL;
    
        private boolean autoStartup = true;
    
        private int phase = Integer.MAX_VALUE;
    
        private volatile boolean active = false;
    
        private volatile boolean running = false;
    
        private final Object lifecycleMonitor = new Object();
    
        private volatile List<String> queueNames = new CopyOnWriteArrayList<String>();
    
        private ErrorHandler errorHandler = new ConditionalRejectingErrorHandler();
    
        private MessageConverter messageConverter;
            是否暴露channel的listener给已经注册的ChannelAwareMessageListener?
        private boolean exposeListenerChannel = true;
    
        private volatile Object messageListener;
    
        private volatile AcknowledgeMode acknowledgeMode = AcknowledgeMode.AUTO;
    
        private volatile boolean deBatchingEnabled = DEFAULT_DEBATCHING_ENABLED;
    
        private volatile boolean initialized;
    
        private Collection<MessagePostProcessor> afterReceivePostProcessors;
    
        private volatile ApplicationContext applicationContext;
    
        private String listenerId;
    
        private Advice[] adviceChain = new Advice[0];
    
        private ConsumerTagStrategy consumerTagStrategy;
    
        private volatile boolean exclusive;
    
        private volatile boolean noLocal;
    
        private volatile boolean defaultRequeueRejected = true;
    
        private volatile int prefetchCount = DEFAULT_PREFETCH_COUNT;
    
        private long idleEventInterval;
    
        private volatile long lastReceive = System.currentTimeMillis();
    
        private boolean statefulRetryFatalWithNullMessageId = true;
    
        private ConditionalExceptionLogger exclusiveConsumerExceptionLogger = new DefaultExclusiveConsumerLogger();
    
        private boolean alwaysRequeueWithTxManagerRollback;
    
        private String lookupKeyQualifier = "";
    
        private boolean forceCloseChannel = true
    

    方法

    checkMessageListener 检查messageListener的类型必须是MessageListener或ChannelAwareMessageListener
    @Override
        public final void afterPropertiesSet() {
            super.afterPropertiesSet();父类检查ConnectionFactory存在
            Assert.state(
                    this.exposeListenerChannel || !getAcknowledgeMode().isManual(),
                    "You cannot acknowledge messages manually if the channel is not exposed to the listener "
                            + "(please check your configuration and set exposeListenerChannel=true or acknowledgeMode!=MANUAL)");
    检查
            Assert.state(
                    !(getAcknowledgeMode().isAutoAck() && isChannelTransacted()),
                    "The acknowledgeMode is NONE (autoack in Rabbit terms) which is not consistent with having a "
                            + "transactional channel. Either use a different AcknowledgeMode or make sure channelTransacted=false");
            validateConfiguration();
            initialize();
        }
    
        // -------------------------------------------------------------------------
        // Lifecycle methods for starting and stopping the container
        // -------------------------------------------------------------------------
    
        /**
         * Initialize this container.
         * <p>
         * Creates a Rabbit Connection and calls {@link #doInitialize()}.
         */
        public void initialize() {
            try {
                //获取锁并唤醒锁上等待的所有线程
                synchronized (this.lifecycleMonitor) {
                    this.lifecycleMonitor.notifyAll();
                }
                //将delegate的和内含的adviceChain 生成代理
                initializeProxy(this.delegate);
                //?????????
                checkMissingQueuesFatalFromProperty();
                //??????
                checkPossibleAuthenticationFailureFatalFromProperty();
                //留给子类使用
                doInitialize();
                //??????
                if (!this.isExposeListenerChannel() && this.transactionManager != null) {
                    logger.warn("exposeListenerChannel=false is ignored when using a TransactionManager");
                }
                //默认生成一个SimpleAsyncTaskExecutor,并将标志位设置为true
                if (!this.taskExecutorSet && StringUtils.hasText(this.getBeanName())) {
                    this.taskExecutor = new SimpleAsyncTaskExecutor(this.getBeanName() + "-");
                    this.taskExecutorSet = true;
                }
                //默认事务标志位
                if (this.transactionManager != null) {
                    if (!isChannelTransacted()) {
                        logger.debug("The 'channelTransacted' is coerced to 'true', when 'transactionManager' is provided");
                        setChannelTransacted(true);
                    }
    
                }
            }
            catch (Exception ex) {
                throw convertRabbitAccessException(ex);
            }
        }
    
    shutdown方法,设置标志位。doShutdown方法留给子类
    
        @Override
        public void start() {
          //根据条件判断是否调用afterPropertiesSet
            if (isRunning()) {
                return;
            }
            if (!this.initialized) {
                synchronized (this.lifecycleMonitor) {
                    if (!this.initialized) {
                        afterPropertiesSet();
                        this.initialized = true;
                    }
                }
            }
            try {
                if (logger.isDebugEnabled()) {
                    logger.debug("Starting Rabbit listener container.");
                }
          //
                configureAdminIfNeeded();
                checkMismatchedQueues();
                doStart();
            }
            catch (Exception ex) {
                throw convertRabbitAccessException(ex);
            }
        }
    
    

    SimpleMessageListenerContainer

        /**
         * Re-initializes this container's Rabbit message consumers, if not initialized already.           
         * Then submits each consumer  to this container's task executor. 再次初始化容器的        
         * message consumers。如果已经初始化了,提交每一个consumer到容器的task executor
         * @throws Exception Any Exception.
         */
        @Override
        protected void doStart() throws Exception {
                  //第一步里面主要是将MessageListener希望监听的Queue和Container包含的Queue比较,如果container缺少了任一一个希望的QueueName,抛出异常。
            if (getMessageListener() instanceof ListenerContainerAware) {
                Collection<String> expectedQueueNames = ((ListenerContainerAware) getMessageListener()).expectedQueueNames();
                if (expectedQueueNames != null) {
                    String[] queueNames = getQueueNames();
                    Assert.state(expectedQueueNames.size() == queueNames.length,
                            "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
                                    + Arrays.asList(queueNames));
                    boolean found = false;
                    for (String queueName : queueNames) {
                        if (expectedQueueNames.contains(queueName)) {
                            found = true;
                        }
                        else {
                            found = false;
                            break;
                        }
                    }
                    Assert.state(found, () -> "Listener expects us to be listening on '" + expectedQueueNames + "'; our queues: "
                            + Arrays.asList(queueNames));
                }
            }
        //第二步,调用父类的super.doStart();
            super.doStart();
        //第三步,获取consumersMonitor锁,调用initializeConsumers初始化consumers(BlockingQueueConsumer)。
        //再根据consumer初始化AsyncMessageProcessingConsumer。
        //然后提交给Executor。然后遍历proccessor,调用processor.getStartupException。
            synchronized (this.consumersMonitor) {
                if (this.consumers != null) {
                    throw new IllegalStateException("A stopped container should not have consumers");
                }
                int newConsumers = initializeConsumers();
                if (this.consumers == null) {
                    logger.info("Consumers were initialized and then cleared " +
                            "(presumably the container was stopped concurrently)");
                    return;
                }
                if (newConsumers <= 0) {
                    if (logger.isInfoEnabled()) {
                        logger.info("Consumers are already running");
                    }
                    return;
                }
                Set<AsyncMessageProcessingConsumer> processors = new HashSet<AsyncMessageProcessingConsumer>();
                for (BlockingQueueConsumer consumer : this.consumers) {
                    AsyncMessageProcessingConsumer processor = new AsyncMessageProcessingConsumer(consumer);
                    processors.add(processor);
                    getTaskExecutor().execute(processor);
                    if (getApplicationEventPublisher() != null) {
                        getApplicationEventPublisher().publishEvent(new AsyncConsumerStartedEvent(this, consumer));
                    }
                }
                for (AsyncMessageProcessingConsumer processor : processors) {
                    FatalListenerStartupException startupException = processor.getStartupException();
                    if (startupException != null) {
                        throw new AmqpIllegalStateException("Fatal exception on listener startup", startupException);
                    }
                }
            }
        }
    
    

    BlockingQueueConsumer

        public BlockingQueueConsumer(ConnectionFactory connectionFactory,
                MessagePropertiesConverter messagePropertiesConverter,
                ActiveObjectCounter<BlockingQueueConsumer> activeObjectCounter, AcknowledgeMode acknowledgeMode,
                boolean transactional, int prefetchCount, boolean defaultRequeueRejected,
                Map<String, Object> consumerArgs, boolean noLocal, boolean exclusive, String... queues) {
            this.connectionFactory = connectionFactory;
            this.messagePropertiesConverter = messagePropertiesConverter;
            this.activeObjectCounter = activeObjectCounter;
            this.acknowledgeMode = acknowledgeMode;
            this.transactional = transactional;
            this.prefetchCount = prefetchCount;
            this.defaultRequeueRejected = defaultRequeueRejected;
            if (consumerArgs != null && consumerArgs.size() > 0) {
                this.consumerArgs.putAll(consumerArgs);
            }
            this.noLocal = noLocal;
            this.exclusive = exclusive;
            this.queues = Arrays.copyOf(queues, queues.length);
            this.queue = new LinkedBlockingQueue<Delivery>(prefetchCount);
        }
    
        public void start() throws AmqpException {
            if (logger.isDebugEnabled()) {
                logger.debug("Starting consumer " + this);
            }
    
            this.thread = Thread.currentThread();
    
            try {
    //1.先得到ResourceHolder,得到channel,如果channel的实质是AutorecoveringChannel,为其添加addRecoveryListener,既对象本身。
                this.resourceHolder = ConnectionFactoryUtils.getTransactionalResourceHolder(this.connectionFactory,
                        this.transactional);
                this.channel = this.resourceHolder.getChannel();
                addRecoveryListener();
            }
            catch (AmqpAuthenticationException e) {
                throw new FatalListenerStartupException("Authentication failure", e);
            }
      //2.?????
            this.consumer = new InternalConsumer(this.channel);
            this.deliveryTags.clear();
            this.activeObjectCounter.add(this);
    
            // mirrored queue might be being moved
            int passiveDeclareRetries = this.declarationRetries;
            this.declaring = true;
            do {
                if (cancelled()) {
                    break;
                }
                try {
                    attemptPassiveDeclarations();
                    if (passiveDeclareRetries < this.declarationRetries && logger.isInfoEnabled()) {
                        logger.info("Queue declaration succeeded after retrying");
                    }
                    passiveDeclareRetries = 0;
                }
                catch (DeclarationException e) {
                    if (passiveDeclareRetries > 0 && this.channel.isOpen()) {
                        if (logger.isWarnEnabled()) {
                            logger.warn("Queue declaration failed; retries left=" + (passiveDeclareRetries), e);
                            try {
                                Thread.sleep(this.failedDeclarationRetryInterval);
                            }
                            catch (InterruptedException e1) {
                                this.declaring = false;
                                Thread.currentThread().interrupt();
                                this.activeObjectCounter.release(this);
                                throw RabbitExceptionTranslator.convertRabbitAccessException(e1);
                            }
                        }
                    }
                    else if (e.getFailedQueues().size() < this.queues.length) {
                        if (logger.isWarnEnabled()) {
                            logger.warn("Not all queues are available; only listening on those that are - configured: "
                                    + Arrays.asList(this.queues) + "; not available: " + e.getFailedQueues());
                        }
                        this.missingQueues.addAll(e.getFailedQueues());
                        this.lastRetryDeclaration = System.currentTimeMillis();
                    }
                    else {
                        this.declaring = false;
                        this.activeObjectCounter.release(this);
                        throw new QueuesNotAvailableException("Cannot prepare queue for listener. "
                                + "Either the queue doesn't exist or the broker will not allow us to use it.", e);
                    }
                }
            }
            while (passiveDeclareRetries-- > 0 && !cancelled());
            this.declaring = false;
    
            if (!this.acknowledgeMode.isAutoAck() && !cancelled()) {
                // Set basicQos before calling basicConsume (otherwise if we are not acking the broker
                // will send blocks of 100 messages)
                try {
                    this.channel.basicQos(this.prefetchCount);
                }
                catch (IOException e) {
                    this.activeObjectCounter.release(this);
                    throw new AmqpIOException(e);
                }
            }
    
    
            try {
                if (!cancelled()) {
                    for (String queueName : this.queues) {
                        if (!this.missingQueues.contains(queueName)) {
                            consumeFromQueue(queueName);
                        }
                    }
                }
            }
            catch (IOException e) {
                throw RabbitExceptionTranslator.convertRabbitAccessException(e);
            }
        }
    

    相关文章

      网友评论

          本文标题:SpringRabbitmq-MessageListenerCo

          本文链接:https://www.haomeiwen.com/subject/twejcftx.html