序
本文只要研究一下KafkaListener的实现机制
KafkaListener
org/springframework/kafka/annotation/KafkaListener.java
@Target({ ElementType.TYPE, ElementType.METHOD, ElementType.ANNOTATION_TYPE })
@Retention(RetentionPolicy.RUNTIME)
@MessageMapping
@Documented
@Repeatable(KafkaListeners.class)
public @interface KafkaListener {
String id() default "";
String containerFactory() default "";
String[] topics() default {};
String topicPattern() default "";
TopicPartition[] topicPartitions() default {};
String containerGroup() default "";
String errorHandler() default "";
String groupId() default "";
boolean idIsGroup() default true;
String clientIdPrefix() default "";
String beanRef() default "__listener";
String concurrency() default "";
String autoStartup() default "";
String[] properties() default {};
}
KafkaListener注解定义了id、topics、groupId等属性
KafkaListenerAnnotationBeanPostProcessor
org/springframework/kafka/annotation/KafkaListenerAnnotationBeanPostProcessor.java
public class KafkaListenerAnnotationBeanPostProcessor<K, V>
implements BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton {
private final KafkaListenerEndpointRegistrar registrar = new KafkaListenerEndpointRegistrar();
@Override
public int getOrder() {
return LOWEST_PRECEDENCE;
}
@Override
public void setBeanFactory(BeanFactory beanFactory) {
this.beanFactory = beanFactory;
if (beanFactory instanceof ConfigurableListableBeanFactory) {
this.resolver = ((ConfigurableListableBeanFactory) beanFactory).getBeanExpressionResolver();
this.expressionContext = new BeanExpressionContext((ConfigurableListableBeanFactory) beanFactory,
this.listenerScope);
}
}
@Override
public void afterSingletonsInstantiated() {
this.registrar.setBeanFactory(this.beanFactory);
if (this.beanFactory instanceof ListableBeanFactory) {
Map<String, KafkaListenerConfigurer> instances =
((ListableBeanFactory) this.beanFactory).getBeansOfType(KafkaListenerConfigurer.class);
for (KafkaListenerConfigurer configurer : instances.values()) {
configurer.configureKafkaListeners(this.registrar);
}
}
if (this.registrar.getEndpointRegistry() == null) {
if (this.endpointRegistry == null) {
Assert.state(this.beanFactory != null,
"BeanFactory must be set to find endpoint registry by bean name");
this.endpointRegistry = this.beanFactory.getBean(
KafkaListenerConfigUtils.KAFKA_LISTENER_ENDPOINT_REGISTRY_BEAN_NAME,
KafkaListenerEndpointRegistry.class);
}
this.registrar.setEndpointRegistry(this.endpointRegistry);
}
if (this.defaultContainerFactoryBeanName != null) {
this.registrar.setContainerFactoryBeanName(this.defaultContainerFactoryBeanName);
}
// Set the custom handler method factory once resolved by the configurer
MessageHandlerMethodFactory handlerMethodFactory = this.registrar.getMessageHandlerMethodFactory();
if (handlerMethodFactory != null) {
this.messageHandlerMethodFactory.setMessageHandlerMethodFactory(handlerMethodFactory);
}
else {
addFormatters(this.messageHandlerMethodFactory.defaultFormattingConversionService);
}
// Actually register all listeners
this.registrar.afterPropertiesSet();
}
@Override
public Object postProcessAfterInitialization(final Object bean, final String beanName) throws BeansException {
if (!this.nonAnnotatedClasses.contains(bean.getClass())) {
Class<?> targetClass = AopUtils.getTargetClass(bean);
Collection<KafkaListener> classLevelListeners = findListenerAnnotations(targetClass);
final boolean hasClassLevelListeners = classLevelListeners.size() > 0;
final List<Method> multiMethods = new ArrayList<>();
Map<Method, Set<KafkaListener>> annotatedMethods = MethodIntrospector.selectMethods(targetClass,
(MethodIntrospector.MetadataLookup<Set<KafkaListener>>) method -> {
Set<KafkaListener> listenerMethods = findListenerAnnotations(method);
return (!listenerMethods.isEmpty() ? listenerMethods : null);
});
if (hasClassLevelListeners) {
Set<Method> methodsWithHandler = MethodIntrospector.selectMethods(targetClass,
(ReflectionUtils.MethodFilter) method ->
AnnotationUtils.findAnnotation(method, KafkaHandler.class) != null);
multiMethods.addAll(methodsWithHandler);
}
if (annotatedMethods.isEmpty()) {
this.nonAnnotatedClasses.add(bean.getClass());
if (this.logger.isTraceEnabled()) {
this.logger.trace("No @KafkaListener annotations found on bean type: " + bean.getClass());
}
}
else {
// Non-empty set of methods
for (Map.Entry<Method, Set<KafkaListener>> entry : annotatedMethods.entrySet()) {
Method method = entry.getKey();
for (KafkaListener listener : entry.getValue()) {
processKafkaListener(listener, method, bean, beanName);
}
}
if (this.logger.isDebugEnabled()) {
this.logger.debug(annotatedMethods.size() + " @KafkaListener methods processed on bean '"
+ beanName + "': " + annotatedMethods);
}
}
if (hasClassLevelListeners) {
processMultiMethodListeners(classLevelListeners, multiMethods, bean, beanName);
}
}
return bean;
}
}
KafkaListenerAnnotationBeanPostProcessor实现了BeanPostProcessor, Ordered, BeanFactoryAware, SmartInitializingSingleton接口,其getOrder返回LOWEST_PRECEDENCE
其afterSingletonsInstantiated方法(SmartInitializingSingleton接口
)首先获取KafkaListenerConfigurer,然后设置configureKafkaListeners为registrar,最后是执行registrar.afterPropertiesSet()
其postProcessAfterInitialization方法(BeanPostProcessor接口
)会收集标注KafkaListener的bean的方法,然后针对每个方法执行processKafkaListener
processKafkaListener
protected void processKafkaListener(KafkaListener kafkaListener, Method method, Object bean, String beanName) {
Method methodToUse = checkProxy(method, bean);
MethodKafkaListenerEndpoint<K, V> endpoint = new MethodKafkaListenerEndpoint<>();
endpoint.setMethod(methodToUse);
processListener(endpoint, kafkaListener, bean, methodToUse, beanName);
}
protected void processListener(MethodKafkaListenerEndpoint<?, ?> endpoint, KafkaListener kafkaListener,
Object bean, Object adminTarget, String beanName) {
String beanRef = kafkaListener.beanRef();
if (StringUtils.hasText(beanRef)) {
this.listenerScope.addListener(beanRef, bean);
}
endpoint.setBean(bean);
endpoint.setMessageHandlerMethodFactory(this.messageHandlerMethodFactory);
endpoint.setId(getEndpointId(kafkaListener));
endpoint.setGroupId(getEndpointGroupId(kafkaListener, endpoint.getId()));
endpoint.setTopicPartitions(resolveTopicPartitions(kafkaListener));
endpoint.setTopics(resolveTopics(kafkaListener));
endpoint.setTopicPattern(resolvePattern(kafkaListener));
endpoint.setClientIdPrefix(resolveExpressionAsString(kafkaListener.clientIdPrefix(), "clientIdPrefix"));
String group = kafkaListener.containerGroup();
if (StringUtils.hasText(group)) {
Object resolvedGroup = resolveExpression(group);
if (resolvedGroup instanceof String) {
endpoint.setGroup((String) resolvedGroup);
}
}
String concurrency = kafkaListener.concurrency();
if (StringUtils.hasText(concurrency)) {
endpoint.setConcurrency(resolveExpressionAsInteger(concurrency, "concurrency"));
}
String autoStartup = kafkaListener.autoStartup();
if (StringUtils.hasText(autoStartup)) {
endpoint.setAutoStartup(resolveExpressionAsBoolean(autoStartup, "autoStartup"));
}
resolveKafkaProperties(endpoint, kafkaListener.properties());
KafkaListenerContainerFactory<?> factory = null;
String containerFactoryBeanName = resolve(kafkaListener.containerFactory());
if (StringUtils.hasText(containerFactoryBeanName)) {
Assert.state(this.beanFactory != null, "BeanFactory must be set to obtain container factory by bean name");
try {
factory = this.beanFactory.getBean(containerFactoryBeanName, KafkaListenerContainerFactory.class);
}
catch (NoSuchBeanDefinitionException ex) {
throw new BeanInitializationException("Could not register Kafka listener endpoint on [" + adminTarget
+ "] for bean " + beanName + ", no " + KafkaListenerContainerFactory.class.getSimpleName()
+ " with id '" + containerFactoryBeanName + "' was found in the application context", ex);
}
}
endpoint.setBeanFactory(this.beanFactory);
String errorHandlerBeanName = resolveExpressionAsString(kafkaListener.errorHandler(), "errorHandler");
if (StringUtils.hasText(errorHandlerBeanName)) {
endpoint.setErrorHandler(this.beanFactory.getBean(errorHandlerBeanName, KafkaListenerErrorHandler.class));
}
this.registrar.registerEndpoint(endpoint, factory);
if (StringUtils.hasText(beanRef)) {
this.listenerScope.removeListener(beanRef);
}
}
processKafkaListener方法将method转换为MethodKafkaListenerEndpoint,然后执行processListener方法,它主要是将KafkaListener注解的信息填充到MethodKafkaListenerEndpoint上,确定KafkaListenerContainerFactory,最后执行registrar.registerEndpoint(endpoint, factory)
registrar.registerEndpoint
org/springframework/kafka/config/KafkaListenerEndpointRegistrar.java
/**
* Register a new {@link KafkaListenerEndpoint} alongside the
* {@link KafkaListenerContainerFactory} to use to create the underlying container.
* <p>The {@code factory} may be {@code null} if the default factory has to be
* used for that endpoint.
* @param endpoint the {@link KafkaListenerEndpoint} instance to register.
* @param factory the {@link KafkaListenerContainerFactory} to use.
*/
public void registerEndpoint(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory) {
Assert.notNull(endpoint, "Endpoint must be set");
Assert.hasText(endpoint.getId(), "Endpoint id must be set");
// Factory may be null, we defer the resolution right before actually creating the container
KafkaListenerEndpointDescriptor descriptor = new KafkaListenerEndpointDescriptor(endpoint, factory);
synchronized (this.endpointDescriptors) {
if (this.startImmediately) { // Register and start immediately
this.endpointRegistry.registerListenerContainer(descriptor.endpoint,
resolveContainerFactory(descriptor), true);
}
else {
this.endpointDescriptors.add(descriptor);
}
}
}
KafkaListenerEndpointRegistrar的registerEndpoint会创建KafkaListenerEndpointDescriptor,然后执行endpointRegistry.registerListenerContainer
endpointRegistry.registerListenerContainer
org/springframework/kafka/config/KafkaListenerEndpointRegistry.java
public void registerListenerContainer(KafkaListenerEndpoint endpoint, KafkaListenerContainerFactory<?> factory,
boolean startImmediately) {
Assert.notNull(endpoint, "Endpoint must not be null");
Assert.notNull(factory, "Factory must not be null");
String id = endpoint.getId();
Assert.hasText(id, "Endpoint id must not be empty");
synchronized (this.listenerContainers) {
Assert.state(!this.listenerContainers.containsKey(id),
"Another endpoint is already registered with id '" + id + "'");
MessageListenerContainer container = createListenerContainer(endpoint, factory);
this.listenerContainers.put(id, container);
if (StringUtils.hasText(endpoint.getGroup()) && this.applicationContext != null) {
List<MessageListenerContainer> containerGroup;
if (this.applicationContext.containsBean(endpoint.getGroup())) {
containerGroup = this.applicationContext.getBean(endpoint.getGroup(), List.class);
}
else {
containerGroup = new ArrayList<MessageListenerContainer>();
this.applicationContext.getBeanFactory().registerSingleton(endpoint.getGroup(), containerGroup);
}
containerGroup.add(container);
}
if (startImmediately) {
startIfNecessary(container);
}
}
}
/**
* Start the specified {@link MessageListenerContainer} if it should be started
* on startup.
* @param listenerContainer the listener container to start.
* @see MessageListenerContainer#isAutoStartup()
*/
private void startIfNecessary(MessageListenerContainer listenerContainer) {
if (this.contextRefreshed || listenerContainer.isAutoStartup()) {
listenerContainer.start();
}
}
KafkaListenerEndpointRegistry的registerListenerContainer方法会根据endpoint和factory来创建MessageListenerContainer,然后放入到listenerContainers中,对于startImmediately的会执行startIfNecessary,它主要是执行listenerContainer.start()
MessageListenerContainer
org/springframework/kafka/listener/MessageListenerContainer.java
public interface MessageListenerContainer extends SmartLifecycle {
void setupMessageListener(Object messageListener);
Map<String, Map<MetricName, ? extends Metric>> metrics();
default ContainerProperties getContainerProperties() {
throw new UnsupportedOperationException("This container doesn't support retrieving its properties");
}
default Collection<TopicPartition> getAssignedPartitions() {
throw new UnsupportedOperationException("This container doesn't support retrieving its assigned partitions");
}
default void pause() {
throw new UnsupportedOperationException("This container doesn't support pause");
}
default void resume() {
throw new UnsupportedOperationException("This container doesn't support resume");
}
default boolean isPauseRequested() {
throw new UnsupportedOperationException("This container doesn't support pause/resume");
}
default boolean isContainerPaused() {
throw new UnsupportedOperationException("This container doesn't support pause/resume");
}
default void setAutoStartup(boolean autoStartup) {
// empty
}
default String getGroupId() {
throw new UnsupportedOperationException("This container does not support retrieving the group id");
}
@Nullable
default String getListenerId() {
throw new UnsupportedOperationException("This container does not support retrieving the listener id");
}
}
MessageListenerContainer继承了SmartLifecycle接口,它有一个泛型接口为GenericMessageListenerContainer,后者有一个抽象类为AbstractMessageListenerContainer,然后它有两个子类,分别是KafkaMessageListenerContainer与ConcurrentMessageListenerContainer
AbstractMessageListenerContainer
public abstract class AbstractMessageListenerContainer<K, V>
implements GenericMessageListenerContainer<K, V>, BeanNameAware, ApplicationEventPublisherAware {
@Override
public final void start() {
checkGroupId();
synchronized (this.lifecycleMonitor) {
if (!isRunning()) {
Assert.isTrue(this.containerProperties.getMessageListener() instanceof GenericMessageListener,
() -> "A " + GenericMessageListener.class.getName() + " implementation must be provided");
doStart();
}
}
}
@Override
public final void stop() {
synchronized (this.lifecycleMonitor) {
if (isRunning()) {
final CountDownLatch latch = new CountDownLatch(1);
doStop(latch::countDown);
try {
latch.await(this.containerProperties.getShutdownTimeout(), TimeUnit.MILLISECONDS); // NOSONAR
publishContainerStoppedEvent();
}
catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
}
}
//......
}
AbstractMessageListenerContainer的start方法会回调子类的doStart方法,其stop方法会回调子类的doStop方法
KafkaMessageListenerContainer
org/springframework/kafka/listener/KafkaMessageListenerContainer.java
public class KafkaMessageListenerContainer<K, V> // NOSONAR comment density
extends AbstractMessageListenerContainer<K, V> {
@Override
protected void doStart() {
if (isRunning()) {
return;
}
if (this.clientIdSuffix == null) { // stand-alone container
checkTopics();
}
ContainerProperties containerProperties = getContainerProperties();
checkAckMode(containerProperties);
Object messageListener = containerProperties.getMessageListener();
Assert.state(messageListener != null, "A MessageListener is required");
if (containerProperties.getConsumerTaskExecutor() == null) {
SimpleAsyncTaskExecutor consumerExecutor = new SimpleAsyncTaskExecutor(
(getBeanName() == null ? "" : getBeanName()) + "-C-");
containerProperties.setConsumerTaskExecutor(consumerExecutor);
}
Assert.state(messageListener instanceof GenericMessageListener, "Listener must be a GenericListener");
GenericMessageListener<?> listener = (GenericMessageListener<?>) messageListener;
ListenerType listenerType = deteremineListenerType(listener);
this.listenerConsumer = new ListenerConsumer(listener, listenerType);
setRunning(true);
this.listenerConsumerFuture = containerProperties
.getConsumerTaskExecutor()
.submitListenable(this.listenerConsumer);
}
//......
}
KafkaMessageListenerContainer的doStart方法会获取到messageListener,然后创建ListenerConsumer,最后提交到线程池中执行
ConcurrentMessageListenerContainer
org/springframework/kafka/listener/ConcurrentMessageListenerContainer.java
public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
@Override
protected void doStart() {
if (!isRunning()) {
checkTopics();
ContainerProperties containerProperties = getContainerProperties();
TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
if (topicPartitions != null && this.concurrency > topicPartitions.length) {
this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
+ topicPartitions.length);
this.concurrency = topicPartitions.length;
}
setRunning(true);
for (int i = 0; i < this.concurrency; i++) {
KafkaMessageListenerContainer<K, V> container;
if (topicPartitions == null) {
container = new KafkaMessageListenerContainer<>(this, this.consumerFactory, containerProperties);
}
else {
container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
containerProperties, partitionSubset(containerProperties, i));
}
String beanName = getBeanName();
container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
if (getApplicationEventPublisher() != null) {
container.setApplicationEventPublisher(getApplicationEventPublisher());
}
container.setClientIdSuffix("-" + i);
container.setGenericErrorHandler(getGenericErrorHandler());
container.setAfterRollbackProcessor(getAfterRollbackProcessor());
container.setRecordInterceptor(getRecordInterceptor());
container.setEmergencyStop(() -> {
stop(() -> {
// NOSONAR
});
publishContainerStoppedEvent();
});
if (isPaused()) {
container.pause();
}
container.start();
this.containers.add(container);
}
}
}
//......
}
ConcurrentMessageListenerContainer的doStart会根据concurrency值来创建对应的KafkaMessageListenerContainer,然后执行其start方法
ListenerConsumer
org/springframework/kafka/listener/KafkaMessageListenerContainer.java
private final class ListenerConsumer implements SchedulingAwareRunnable, ConsumerSeekCallback {
@Override
public void run() {
this.consumerThread = Thread.currentThread();
if (this.genericListener instanceof ConsumerSeekAware) {
((ConsumerSeekAware) this.genericListener).registerSeekCallback(this);
}
if (this.transactionManager != null) {
ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);
}
this.count = 0;
this.last = System.currentTimeMillis();
initAsignedPartitions();
while (isRunning()) {
try {
pollAndInvoke();
}
catch (@SuppressWarnings(UNUSED) WakeupException e) {
// Ignore, we're stopping or applying immediate foreign acks
}
catch (NoOffsetForPartitionException nofpe) {
this.fatalError = true;
ListenerConsumer.this.logger.error("No offset and no reset policy", nofpe);
break;
}
catch (Exception e) {
handleConsumerException(e);
}
catch (Error e) { // NOSONAR - rethrown
Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
if (runnable != null) {
runnable.run();
}
this.logger.error("Stopping container due to an Error", e);
wrapUp();
throw e;
}
}
wrapUp();
}
protected void pollAndInvoke() {
if (!this.autoCommit && !this.isRecordAck) {
processCommits();
}
processSeeks();
checkPaused();
ConsumerRecords<K, V> records = this.consumer.poll(this.pollTimeout);
this.lastPoll = System.currentTimeMillis();
checkResumed();
debugRecords(records);
if (records != null && records.count() > 0) {
if (this.containerProperties.getIdleEventInterval() != null) {
this.lastReceive = System.currentTimeMillis();
}
invokeListener(records);
}
else {
checkIdle();
}
}
private void invokeListener(final ConsumerRecords<K, V> records) {
if (this.isBatchListener) {
invokeBatchListener(records);
}
else {
invokeRecordListener(records);
}
}
private void doInvokeBatchOnMessage(final ConsumerRecords<K, V> records,
List<ConsumerRecord<K, V>> recordList) {
switch (this.listenerType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
this.batchListener.onMessage(recordList,
this.isAnyManualAck
? new ConsumerBatchAcknowledgment(records)
: null, this.consumer);
break;
case ACKNOWLEDGING:
this.batchListener.onMessage(recordList,
this.isAnyManualAck
? new ConsumerBatchAcknowledgment(records)
: null);
break;
case CONSUMER_AWARE:
this.batchListener.onMessage(recordList, this.consumer);
break;
case SIMPLE:
this.batchListener.onMessage(recordList);
break;
}
}
private void doInvokeOnMessage(final ConsumerRecord<K, V> recordArg) {
ConsumerRecord<K, V> record = recordArg;
if (this.recordInterceptor != null) {
record = this.recordInterceptor.intercept(record);
}
if (record == null) {
if (this.logger.isDebugEnabled()) {
this.logger.debug("RecordInterceptor returned null, skipping: " + recordArg);
}
}
else {
switch (this.listenerType) {
case ACKNOWLEDGING_CONSUMER_AWARE:
this.listener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null, this.consumer);
break;
case CONSUMER_AWARE:
this.listener.onMessage(record, this.consumer);
break;
case ACKNOWLEDGING:
this.listener.onMessage(record,
this.isAnyManualAck
? new ConsumerAcknowledgment(record)
: null);
break;
case SIMPLE:
this.listener.onMessage(record);
break;
}
}
}
//......
}
ListenerConsumer实现了org.springframework.scheduling.SchedulingAwareRunnable接口(
它继承了Runnable接口
)以及org.springframework.kafka.listener.ConsumerSeekCallback接口
其run方法主要是执行initAsignedPartitions,然后循环执行pollAndInvoke,对于NoOffsetForPartitionException则跳出异常,对于其他Exception则执行handleConsumerException,对于Error执行emergencyStop与wrapUp方法
pollAndInvoke方法主要是执行consumer.poll(),然后通过invokeListener(records)回调,最后是通过doInvokeBatchOnMessage、doInvokeOnMessage去回调listener.onMessage方法
小结
KafkaListenerAnnotationBeanPostProcessor主要是收集标注KafkaListener的bean的方法,然后针对每个方法执行processKafkaListener,processKafkaListener方法将method转换为MethodKafkaListenerEndpoint,执行registrar.registerEndpoint(endpoint, factory)
KafkaListenerEndpointRegistry的registerListenerContainer方法会根据endpoint和factory来创建MessageListenerContainer,然后放入到listenerContainers中,对于startImmediately的会执行startIfNecessary,它主要是执行listenerContainer.start()
MessageListenerContainer有两个主要的实现类分别是KafkaMessageListenerContainer与ConcurrentMessageListenerContainer,后者的start方法主要是根据concurrency创建对应数量的KafkaMessageListenerContainer,最后都是执行KafkaMessageListenerContainer的start方法,它会创建ListenerConsumer,最后提交到线程池中执行;ListenerConsumer主要是执行pollAndInvoke,拉取消息,然后回到listener的onMessage方法
整体的链路就是KafkaListenerAnnotationBeanPostProcessor --> KafkaListenerEndpointRegistry --> MessageListenerContainer --> GenericMessageListener.onMessage
网友评论