美文网首页
聊聊KafkaListener的实现机制

聊聊KafkaListener的实现机制

作者: go4it | 来源:发表于2023-10-21 23:55 被阅读0次

    本文只要研究一下KafkaListener的实现机制

    KafkaListener

    org/springframework/kafka/annotation/KafkaListener.java

    @Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
    @Retention(RetentionPolicy.RUNTIME)
    @MessageMapping
    @Documented
    @Repeatable(KafkaListeners.class)
    public @interface KafkaListener {
        String id() default "";
        String containerFactory() default "";
        String[] topics() default {};
        String topicPattern() default "";
        TopicPartition[] topicPartitions() default {};
        String containerGroup() default "";
        String errorHandler() default "";
        String groupId() default "";
        boolean idIsGroup() default true;
        String clientIdPrefix() default "";
        String beanRef() default "__listener";
        String concurrency() default "";
        String autoStartup() default "";
        String[] properties() default {};
    }
    

    KafkaListener注解定义了id、topics、groupId等属性

    KafkaListenerAnnotationBeanPostProcessor

    org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java

    public class KafkaListenerAnnotationBeanPostProcessor<K, V>
            implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {
    
        private final KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();  
    
        @Override
        public int getOrder() {
            return LOWEST_PRECEDENCE;
        }
    
        @Override
        public void setBeanFactory(BeanFactory beanFactory) {
            this.beanFactory = beanFactory;
            if (beanFactory instanceof ConfigurableListableBeanFactory) {
                this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
                this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,
                        this.listenerScope);
            }
        }
    
        @Override
        public void afterSingletonsInstantiated() {
            this.registrar.setBeanFactory(this.beanFactory);
    
            if (this.beanFactory instanceof ListableBeanFactory) {
                Map<String, KafkaListenerConfigurer> instances =
                        ((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
                for (KafkaListenerConfigurer configurer : instances.values()) {
                    configurer.configureKafkaListeners(this.registrar);
                }
            }
    
            if (this.registrar.getEndpointRegistry() == null) {
                if (this.endpointRegistry == null) {
                    Assert.state(this.beanFactory != null,
                            "BeanFactory must be set to find endpoint registry by bean name");
                    this.endpointRegistry = this.beanFactory.getBean(
                            KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
                            KafkaListenerEndpointRegistry.class);
                }
                this.registrar.setEndpointRegistry(this.endpointRegistry);
            }
    
            if (this.defaultContainerFactoryBeanName != null) {
                this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
            }
    
            // Set the custom handler method factory once resolved by the configurer
            MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
            if (handlerMethodFactory != null) {
                this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
            }
            else {
                addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);
            }
    
            // Actually register all listeners
            this.registrar.afterPropertiesSet();
        }
    
        @Override
        public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
            if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
                Class<?> targetClass = AopUtils.getTargetClass(bean);
                Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
                final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
                final List<Method> multiMethods = new ArrayList<>();
                Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
                        (MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
                            Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
                            return (!listenerMethods.isEmpty() ? listenerMethods : null);
                        });
                if (hasClassLevelListeners) {
                    Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
                            (ReflectionUtils.MethodFilter) method ->
                                    AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
                    multiMethods.addAll(methodsWithHandler);
                }
                if (annotatedMethods.isEmpty()) {
                    this.nonAnnotatedClasses.add(bean.getClass());
                    if (this.logger.isTraceEnabled()) {
                        this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass());
                    }
                }
                else {
                    // Non-empty set of methods
                    for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
                        Method method = entry.getKey();
                        for (KafkaListener listener : entry.getValue()) {
                            processKafkaListener(listener, method, bean, beanName);
                        }
                    }
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '"
                                + beanName + "': " + annotatedMethods);
                    }
                }
                if (hasClassLevelListeners) {
                    processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
                }
            }
            return bean;
        }       
    }       
    

    KafkaListenerAnnotationBeanPostProcessor实现了BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton接口,其getOrder返回LOWEST_PRECEDENCE
    其afterSingletonsInstantiated方法(SmartInitializingSingleton接口)首先获取KafkaListenerConfigurer,然后设置configureKafkaListeners为registrar,最后是执行registrar.afterPropertiesSet()
    其postProcessAfterInitialization方法(BeanPostProcessor接口)会收集标注KafkaListener的bean的方法,然后针对每个方法执行processKafkaListener

    processKafkaListener

        protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
            Method methodToUse = checkProxy(method, bean);
            MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
            endpoint.setMethod(methodToUse);
            processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
        }
    
        protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
                Object bean, Object adminTarget, String beanName) {
    
            String beanRef = kafkaListener.beanRef();
            if (StringUtils.hasText(beanRef)) {
                this.listenerScope.addListener(beanRef, bean);
            }
            endpoint.setBean(bean);
            endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
            endpoint.setId(getEndpointId(kafkaListener));
            endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
            endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
            endpoint.setTopics(resolveTopics(kafkaListener));
            endpoint.setTopicPattern(resolvePattern(kafkaListener));
            endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
            String group = kafkaListener.containerGroup();
            if (StringUtils.hasText(group)) {
                Object resolvedGroup = resolveExpression(group);
                if (resolvedGroup instanceof String) {
                    endpoint.setGroup((String) resolvedGroup);
                }
            }
            String concurrency = kafkaListener.concurrency();
            if (StringUtils.hasText(concurrency)) {
                endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
            }
            String autoStartup = kafkaListener.autoStartup();
            if (StringUtils.hasText(autoStartup)) {
                endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
            }
            resolveKafkaProperties(endpoint, kafkaListener.properties());
    
            KafkaListenerContainerFactory<?> factory = null;
            String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
            if (StringUtils.hasText(containerFactoryBeanName)) {
                Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
                try {
                    factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
                }
                catch (NoSuchBeanDefinitionException ex) {
                    throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget
                            + "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()
                            + " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
                }
            }
    
            endpoint.setBeanFactory(this.beanFactory);
            String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
            if (StringUtils.hasText(errorHandlerBeanName)) {
                endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
            }
            this.registrar.registerEndpoint(endpoint, factory);
            if (StringUtils.hasText(beanRef)) {
                this.listenerScope.removeListener(beanRef);
            }
        }   
    

    processKafkaListener方法将method转换为MethodKafkaListenerEndpoint,然后执行processListener方法,它主要是将KafkaListener注解的信息填充到MethodKafkaListenerEndpoint上,确定KafkaListenerContainerFactory,最后执行registrar.registerEndpoint(endpoint, factory)

    registrar.registerEndpoint

    org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java

        /**
         * Register a new {@link KafkaListenerEndpoint} alongside the
         * {@link KafkaListenerContainerFactory} to use to create the underlying container.
         * <p>The {@code factory} may be {@code null} if the default factory has to be
         * used for that endpoint.
         * @param endpoint the {@link KafkaListenerEndpoint} instance to register.
         * @param factory the {@link KafkaListenerContainerFactory} to use.
         */
        public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
            Assert.notNull(endpoint, "Endpoint must be set");
            Assert.hasText(endpoint.getId(), "Endpoint id must be set");
            // Factory may be null, we defer the resolution right before actually creating the container
            KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
            synchronized (this.endpointDescriptors) {
                if (this.startImmediately) { // Register and start immediately
                    this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
                            resolveContainerFactory(descriptor), true);
                }
                else {
                    this.endpointDescriptors.add(descriptor);
                }
            }
        }
    

    KafkaListenerEndpointRegistrar的registerEndpoint会创建KafkaListenerEndpointDescriptor,然后执行endpointRegistry.registerListenerContainer

    endpointRegistry.registerListenerContainer

    org/springframework/kafka/config/KafkaListenerEndpointRegistry.java

        public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
                boolean startImmediately) {
            Assert.notNull(endpoint, "Endpoint must not be null");
            Assert.notNull(factory, "Factory must not be null");
    
            String id = endpoint.getId();
            Assert.hasText(id, "Endpoint id must not be empty");
            synchronized (this.listenerContainers) {
                Assert.state(!this.listenerContainers.containsKey(id),
                        "Another endpoint is already registered with id '" + id + "'");
                MessageListenerContainer container = createListenerContainer(endpoint, factory);
                this.listenerContainers.put(id, container);
                if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
                    List<MessageListenerContainer> containerGroup;
                    if (this.applicationContext.containsBean(endpoint.getGroup())) {
                        containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
                    }
                    else {
                        containerGroup = new ArrayList<MessageListenerContainer>();
                        this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
                    }
                    containerGroup.add(container);
                }
                if (startImmediately) {
                    startIfNecessary(container);
                }
            }
        }
    
        /**
         * Start the specified {@link MessageListenerContainer} if it should be started
         * on startup.
         * @param listenerContainer the listener container to start.
         * @see MessageListenerContainer#isAutoStartup()
         */
        private void startIfNecessary(MessageListenerContainer listenerContainer) {
            if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
                listenerContainer.start();
            }
        }   
    

    KafkaListenerEndpointRegistry的registerListenerContainer方法会根据endpoint和factory来创建MessageListenerContainer,然后放入到listenerContainers中,对于startImmediately的会执行startIfNecessary,它主要是执行listenerContainer.start()

    MessageListenerContainer

    org/springframework/kafka/listener/MessageListenerContainer.java

    public interface MessageListenerContainer extends SmartLifecycle {
        void setupMessageListener(Object messageListener);
        Map<String, Map<MetricName, ? extends Metric>> metrics();
        default ContainerProperties getContainerProperties() {
            throw new UnsupportedOperationException("This container doesn't support retrieving its properties");
        }
        default Collection<TopicPartition> getAssignedPartitions() {
            throw new UnsupportedOperationException("This container doesn't support retrieving its assigned partitions");
        }
        default void pause() {
            throw new UnsupportedOperationException("This container doesn't support pause");
        }
        default void resume() {
            throw new UnsupportedOperationException("This container doesn't support resume");
        }
        default boolean isPauseRequested() {
            throw new UnsupportedOperationException("This container doesn't support pause/resume");
        }
        default boolean isContainerPaused() {
            throw new UnsupportedOperationException("This container doesn't support pause/resume");
        }
        default void setAutoStartup(boolean autoStartup) {
            // empty
        }
        default String getGroupId() {
            throw new UnsupportedOperationException("This container does not support retrieving the group id");
        }
        @Nullable
        default String getListenerId() {
            throw new UnsupportedOperationException("This container does not support retrieving the listener id");
        }
    }
    

    MessageListenerContainer继承了SmartLifecycle接口,它有一个泛型接口为GenericMessageListenerContainer,后者有一个抽象类为AbstractMessageListenerContainer,然后它有两个子类,分别是KafkaMessageListenerContainer与ConcurrentMessageListenerContainer

    AbstractMessageListenerContainer

    public abstract class AbstractMessageListenerContainer<K, V>
            implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware {
    
        @Override
        public final void start() {
            checkGroupId();
            synchronized (this.lifecycleMonitor) {
                if (!isRunning()) {
                    Assert.isTrue(this.containerProperties.getMessageListener() instanceof GenericMessageListener,
                            () -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
                    doStart();
                }
            }
        }
    
        @Override
        public final void stop() {
            synchronized (this.lifecycleMonitor) {
                if (isRunning()) {
                    final CountDownLatch latch = new CountDownLatch(1);
                    doStop(latch::countDown);
                    try {
                        latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS); // NOSONAR
                        publishContainerStoppedEvent();
                    }
                    catch (InterruptedException e) {
                        Thread.currentThread().interrupt();
                    }
                }
            }
        }
        //......
    }       
    

    AbstractMessageListenerContainer的start方法会回调子类的doStart方法,其stop方法会回调子类的doStop方法

    KafkaMessageListenerContainer

    org/springframework/kafka/listener/KafkaMessageListenerContainer.java

    public class KafkaMessageListenerContainer<K, V> // NOSONAR comment density
            extends AbstractMessageListenerContainer<K, V> {
    
        @Override
        protected void doStart() {
            if (isRunning()) {
                return;
            }
            if (this.clientIdSuffix == null) { // stand-alone container
                checkTopics();
            }
            ContainerProperties containerProperties = getContainerProperties();
            checkAckMode(containerProperties);
    
            Object messageListener = containerProperties.getMessageListener();
            Assert.state(messageListener != null, "A MessageListener is required");
            if (containerProperties.getConsumerTaskExecutor() == null) {
                SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
                        (getBeanName() == null ? "" : getBeanName()) + "-C-");
                containerProperties.setConsumerTaskExecutor(consumerExecutor);
            }
            Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");
            GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
            ListenerType listenerType = deteremineListenerType(listener);
            this.listenerConsumer = new ListenerConsumer(listener, listenerType);
            setRunning(true);
            this.listenerConsumerFuture = containerProperties
                    .getConsumerTaskExecutor()
                    .submitListenable(this.listenerConsumer);
        }
    
        //......
    }       
    

    KafkaMessageListenerContainer的doStart方法会获取到messageListener,然后创建ListenerConsumer,最后提交到线程池中执行

    ConcurrentMessageListenerContainer

    org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java

    public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
    
        @Override
        protected void doStart() {
            if (!isRunning()) {
                checkTopics();
                ContainerProperties containerProperties = getContainerProperties();
                TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
                if (topicPartitions != null && this.concurrency > topicPartitions.length) {
                    this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
                            + "equal to the number of partitions; reduced from " + this.concurrency + " to "
                            + topicPartitions.length);
                    this.concurrency = topicPartitions.length;
                }
                setRunning(true);
    
                for (int i = 0; i < this.concurrency; i++) {
                    KafkaMessageListenerContainer<K, V> container;
                    if (topicPartitions == null) {
                        container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties);
                    }
                    else {
                        container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
                                containerProperties, partitionSubset(containerProperties, i));
                    }
                    String beanName = getBeanName();
                    container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
                    if (getApplicationEventPublisher() != null) {
                        container.setApplicationEventPublisher(getApplicationEventPublisher());
                    }
                    container.setClientIdSuffix("-" + i);
                    container.setGenericErrorHandler(getGenericErrorHandler());
                    container.setAfterRollbackProcessor(getAfterRollbackProcessor());
                    container.setRecordInterceptor(getRecordInterceptor());
                    container.setEmergencyStop(() -> {
                        stop(() -> {
                            // NOSONAR
                        });
                        publishContainerStoppedEvent();
                    });
                    if (isPaused()) {
                        container.pause();
                    }
                    container.start();
                    this.containers.add(container);
                }
            }
        }
    
        //......    
    }
    

    ConcurrentMessageListenerContainer的doStart会根据concurrency值来创建对应的KafkaMessageListenerContainer,然后执行其start方法

    ListenerConsumer

    org/springframework/kafka/listener/KafkaMessageListenerContainer.java

    private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback {
    
            @Override
            public void run() {
                this.consumerThread = Thread.currentThread();
                if (this.genericListener instanceof ConsumerSeekAware) {
                    ((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
                }
                if (this.transactionManager != null) {
                    ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);
                }
                this.count = 0;
                this.last = System.currentTimeMillis();
                initAsignedPartitions();
                while (isRunning()) {
                    try {
                        pollAndInvoke();
                    }
                    catch (@SuppressWarnings(UNUSED) WakeupException e) {
                        // Ignore, we're stopping or applying immediate foreign acks
                    }
                    catch (NoOffsetForPartitionException nofpe) {
                        this.fatalError = true;
                        ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
                        break;
                    }
                    catch (Exception e) {
                        handleConsumerException(e);
                    }
                    catch (Error e) { // NOSONAR - rethrown
                        Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
                        if (runnable != null) {
                            runnable.run();
                        }
                        this.logger.error("Stopping container due to an Error", e);
                        wrapUp();
                        throw e;
                    }
                }
                wrapUp();
            }
    
            protected void pollAndInvoke() {
                if (!this.autoCommit && !this.isRecordAck) {
                    processCommits();
                }
                processSeeks();
                checkPaused();
                ConsumerRecords<K, V> records = this.consumer.poll(this.pollTimeout);
                this.lastPoll = System.currentTimeMillis();
                checkResumed();
                debugRecords(records);
                if (records != null && records.count() > 0) {
                    if (this.containerProperties.getIdleEventInterval() != null) {
                        this.lastReceive = System.currentTimeMillis();
                    }
                    invokeListener(records);
                }
                else {
                    checkIdle();
                }
            }
    
            private void invokeListener(final ConsumerRecords<K, V> records) {
                if (this.isBatchListener) {
                    invokeBatchListener(records);
                }
                else {
                    invokeRecordListener(records);
                }
            }
    
            private void doInvokeBatchOnMessage(final ConsumerRecords<K, V> records,
                    List<ConsumerRecord<K, V>> recordList) {
                switch (this.listenerType) {
                    case ACKNOWLEDGING_CONSUMER_AWARE:
                        this.batchListener.onMessage(recordList,
                                this.isAnyManualAck
                                        ? new ConsumerBatchAcknowledgment(records)
                                        : null, this.consumer);
                        break;
                    case ACKNOWLEDGING:
                        this.batchListener.onMessage(recordList,
                                this.isAnyManualAck
                                        ? new ConsumerBatchAcknowledgment(records)
                                        : null);
                        break;
                    case CONSUMER_AWARE:
                        this.batchListener.onMessage(recordList, this.consumer);
                        break;
                    case SIMPLE:
                        this.batchListener.onMessage(recordList);
                        break;
                }
            }
    
            private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
                ConsumerRecord<K, V> record = recordArg;
                if (this.recordInterceptor != null) {
                    record = this.recordInterceptor.intercept(record);
                }
                if (record == null) {
                    if (this.logger.isDebugEnabled()) {
                        this.logger.debug("RecordInterceptor returned null, skipping: " + recordArg);
                    }
                }
                else {
                    switch (this.listenerType) {
                        case ACKNOWLEDGING_CONSUMER_AWARE:
                            this.listener.onMessage(record,
                                    this.isAnyManualAck
                                            ? new ConsumerAcknowledgment(record)
                                            : null, this.consumer);
                            break;
                        case CONSUMER_AWARE:
                            this.listener.onMessage(record, this.consumer);
                            break;
                        case ACKNOWLEDGING:
                            this.listener.onMessage(record,
                                    this.isAnyManualAck
                                            ? new ConsumerAcknowledgment(record)
                                            : null);
                            break;
                        case SIMPLE:
                            this.listener.onMessage(record);
                            break;
                    }
                }
            }                                   
        //......    
    }
    

    ListenerConsumer实现了org.springframework.scheduling.SchedulingAwareRunnable接口(它继承了Runnable接口)以及org.springframework.kafka.listener.ConsumerSeekCallback接口
    其run方法主要是执行initAsignedPartitions,然后循环执行pollAndInvoke,对于NoOffsetForPartitionException则跳出异常,对于其他Exception则执行handleConsumerException,对于Error执行emergencyStop与wrapUp方法
    pollAndInvoke方法主要是执行consumer.poll(),然后通过invokeListener(records)回调,最后是通过doInvokeBatchOnMessage、doInvokeOnMessage去回调listener.onMessage方法

    小结

    KafkaListenerAnnotationBeanPostProcessor主要是收集标注KafkaListener的bean的方法,然后针对每个方法执行processKafkaListener,processKafkaListener方法将method转换为MethodKafkaListenerEndpoint,执行registrar.registerEndpoint(endpoint, factory)
    KafkaListenerEndpointRegistry的registerListenerContainer方法会根据endpoint和factory来创建MessageListenerContainer,然后放入到listenerContainers中,对于startImmediately的会执行startIfNecessary,它主要是执行listenerContainer.start()
    MessageListenerContainer有两个主要的实现类分别是KafkaMessageListenerContainer与ConcurrentMessageListenerContainer,后者的start方法主要是根据concurrency创建对应数量的KafkaMessageListenerContainer,最后都是执行KafkaMessageListenerContainer的start方法,它会创建ListenerConsumer,最后提交到线程池中执行;ListenerConsumer主要是执行pollAndInvoke,拉取消息,然后回到listener的onMessage方法
    整体的链路就是KafkaListenerAnnotationBeanPostProcessor --> KafkaListenerEndpointRegistry --> MessageListenerContainer --> GenericMessageListener.onMessage

    相关文章

      网友评论

          本文标题:聊聊KafkaListener的实现机制

          本文链接:https://www.haomeiwen.com/subject/xhsridtx.html