美文网首页kafka
Kafka消费者总结

Kafka消费者总结

作者: HannahLi_9f1c | 来源:发表于2020-10-31 22:47 被阅读0次
  1. 提交偏移量的方式,如何保证消息不丢失,不重复消费

    1. 自动提交偏移量,会导致消息重复消费和消息丢失。重复消费是因为,当一个消费者提交偏移量之后拉取消息到本地之后消费者崩溃了,消息就没法提交,等他恢复之后又从上次的偏移量开始消费,造成消费重复消费。消息丢失的情况:一个线程A拉取消息到缓存之后提交偏移量,另一个线程B从缓存中读取消息,如果线程B发生异常
    2. 手动提交偏移量:coomitSync、commitAsync
  2. 多线程消费,KafkaConsumer是非线程安全的

    1. 线程封闭。每个Partiton新建一个线程,每个线程实例化一个KafkaConsumer,一个线程处理一个或多个分区。缺点是并发度受限与分区个数,比如说只有一个分区,那么只有一个消费者线程在处理。而且这样每个线程都要占用一个TCP连接,系统开销比较大

      image.png
    2. 每个分区多个消费线程,但是这种需要处理偏移量,比较麻烦。一般不用这种方式处理

    3. 处理消息模块使用多线程


      image.png
  3. 重要的参数

    1. bookstrap.servers:Host:port,Host1:port1的形式
    2. group.id:一般设置为有业务意义的名称
    3. key.deserializer和value.deserializer和生产者的key.serializer和value.serializer相对应
    4. client.id,客户端的id,如果没设定,会由kafka生成
    5. enable.auto.commit,是否自动移交偏移量
    6. fetch.min.bytes,Kafka收到消费者的拉取请求后,如果需要返回的数据量少于这个参数大小,那么需要等待,这个参数默认1B,是吞吐量和延迟的平衡
    7. fetch.max.bytes,默认50B,Consumer再一次拉取请求从Kafka获取的最大数据量
    8. fetch.max.wait.ms。默认500Ms,如果Kafka的消息不满足fetch.min.bytes,最多等待这个参数的配置时间
    9. request.timeout.ms,Consumer等待的最长时间,默认30000ms
  4. @KafkaListener、ConcurrentKafkaListenerContainerFactory、ConsumerFactory、KafkaProperties、poll-timeout
    a. @KafkaListener
    解析:
    由KafkaListenerAnnotationBeanPostProcessor完成,实现了BeanPostProcessor,在Spring初始化之前会遍历BeanPostProcessor的实现类,执行PostProcessorAfterInitialization方法。
    解析出注解了@KafkaListener的所有方法

ConfluenceInstalledFont, monospace;">Collection<KafkaListener> classLevelListeners = this.findListenerAnnotations(targetClass);

之后把KafkaListener的信息封装到MethodKafkaListenerEndpoint,再调用this.registrar.registerEndpoint(endpoint, factory);注册:KafkaListenerEndpointRegistrar实现了BeanFactoryAware, InitializingBean两个接口,因此在Spring初始化Bean的时候会遍历InitializingBean的所有实现类,并执行afterPropertiesSet方法

public void afterPropertiesSet() {
        this.registerAllEndpoints();
    }

    protected void registerAllEndpoints() {
        synchronized(this.endpointDescriptors) {
            Iterator var2 = this.endpointDescriptors.iterator();

            while(var2.hasNext()) {
                KafkaListenerEndpointRegistrar.KafkaListenerEndpointDescriptor descriptor = (KafkaListenerEndpointRegistrar.KafkaListenerEndpointDescriptor)var2.next();
                this.endpointRegistry.registerListenerContainer(descriptor.endpoint, this.resolveContainerFactory(descriptor));
            }

            this.startImmediately = true;
        }
    }

registerAllEndpoints方法将解析的KafkaListener封装到KafkaListenerEndpointDescriptor,然后注册到list里。registerListenerContainer为每一个KafkaListenerEndpointDescriptor生成一个MessageListenerContainer

  MessageListenerContainer container = this.createListenerContainer(endpoint, factory);
    this.listenerContainers.put(id, container);

KafkaMessageListenerContainer最终继承了Lifecycle,Spring在遍历所有的LifeStyle,执行start方法时KafkaMessageListenerContainer的dostart方法会被调用,实例化了KafkaListenerConsumer对象,ListenerConsumer实现了
Runnable,所以可以放入线程池中,这样可以并发执行,但是这里有个问题,就是getConsumerTaskExecutor如果没有配置线程池,默认的线程池是什么?

GenericMessageListener<?> listener = (GenericMessageListener)messageListener;
    ListenerType listenerType = this.deteremineListenerType(listener);
    this.listenerConsumer = new KafkaMessageListenerContainer.ListenerConsumer(listener, listenerType);
    this.setRunning(true);
    this.listenerConsumerFuture = containerProperties.getConsumerTaskExecutor().submitListenable(this.listenerConsumer);

执行:
ListenerConsumer实现了Runnable,所以最终由run方法调用的poll()来拉取消息。

public void run() {
        this.consumerThread = Thread.currentThread();
        if (this.genericListener instanceof ConsumerSeekAware) {
            ((ConsumerSeekAware)this.genericListener).registerSeekCallback(this);
        }

        if (this.transactionManager != null) {
            ProducerFactoryUtils.setConsumerGroupId(this.consumerGroupId);
        }

        this.count = 0;
        this.last = System.currentTimeMillis();
        this.initAsignedPartitions();

        while(KafkaMessageListenerContainer.this.isRunning()) {
            try {
                this.pollAndInvoke();
            } catch (WakeupException var3) {
            } catch (NoOffsetForPartitionException var4) {
                this.fatalError = true;
                this.logger.error("No offset and no reset policy", var4);
                break;
            } catch (Exception var5) {
                this.handleConsumerException(var5);
            } catch (Error var6) {
                Runnable runnable = KafkaMessageListenerContainer.this.emergencyStop;
                if (runnable != null) {
                    runnable.run();
                }

                this.logger.error("Stopping container due to an Error", var6);
                this.wrapUp();
                throw var6;
            }
        }

        this.wrapUp();
    }

总结:
KafkaListener内部也是多线程消费,并且是多线程消费的第一种,一个线程实例化一个KafkaConsumer实例

this.consumer = KafkaMessageListenerContainer.this.consumerFactory.createConsumer(this.consumerGroupId, this.containerProperties.getClientId(), KafkaMessageListenerContainer.this.clientIdSuffix);
image.png
b. ConcurrentKafkaListenerContainerFactory指定concurrent为4,那么就会有4个KafkaMessageListenerContainer->实例化4个LinstenerConsumer->4个KafkaConsumer
5.  topic,partition,group,customer之间的联系
主题可以以业务维度区分,一个topic有多个分区,具体怎么有多少个分区合适?一个group的消费者消费一个topic。每个分区只能由一个消费者组中的消费者消费,如果想一个分区多次消费,可以另外新建消费者组。如果想消费指定的分区,可以指定key

相关文章

网友评论

    本文标题:Kafka消费者总结

    本文链接:https://www.haomeiwen.com/subject/hcbevktx.html