美文网首页Apache Kafka@IT·大数据消息中间件
总结kafka的consumer消费能力很低的情况下的处理方案

总结kafka的consumer消费能力很低的情况下的处理方案

作者: LOC_Thomas | 来源:发表于2016-10-21 10:52 被阅读32835次

    简介

    由于项目中需要使用kafka作为消息队列,并且项目是基于spring-boot来进行构建的,所以项目采用了spring-kafka作为原生kafka的一个扩展库进行使用。先说明一下版本:

    • spring-boot 的版本是1.4.0.RELEASE
    • kafka 的版本是0.9.0.x 版本
    • spring-kafka 的版本是1.0.3.RELEASE

    用过kafka的人都知道,对于使用kafka来说,producer的使用相对简单一些,只需要把数据按照指定的格式发送给kafka中某一个topic就可以了。本文主要是针对spring-kafka的consumer端上的使用进行简单一些分析和总结。

    kafka的速度是很快,所以一般来说producer的生产消息的逻辑速度都会比consumer的消费消息的逻辑速度快。

    具体案例

    之前在项目中遇到了一个案例是,consumer消费一条数据平均需要200ms的时间,并且在某个时刻,producer会在短时间内产生大量的数据丢进kafka的broker里面(假设平均1s中内丢入了5w条需要消费的消息,这个情况会持续几分钟)。

    对于这种情况,kafka的consumer的行为会是:

    • kafka的consumer会从broker里面取出一批数据,�给消费线程进行消费。
    • 由于取出的一批消息数量太大,consumer在session.timeout.ms时间之内没有消费完成
    • consumer coordinator 会由于没有接受到心跳而挂掉,并且出现一些日志
    [rhllor] Tue Oct 18 21:39:16 CST 2016 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator coordinatorDead 529: kafka-example|NTI|Marking the coordinator 2147483646 dead.
    [rhllor] Tue Oct 18 21:39:16 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator sendGroupMetadataRequest 465: kafka-example|NTI|Issuing group metadata request to broker 1
    [rhllor] Tue Oct 18 21:39:16 CST 2016 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator handle 550: kafka-example|NTI|Error ILLEGAL_GENERATION occurred while committing offsets for group new-message-1
    [rhllor] Tue Oct 18 21:39:16 CST 2016 WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onComplete 424: kafka-example|NTI|Auto offset commit failed: Commit cannot be completed due to group rebalance
    [rhllor] Tue Oct 18 21:39:16 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator run 408: kafka-example|NTI|Cannot auto-commit offsets now since the coordinator is unknown, will retry after backoff
    [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handleGroupMetadataResponse 478: kafka-example|NTI|Group metadata response ClientResponse(receivedTimeMs=1476797957072, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@1d3d7e6, request=RequestSend(header={api_key=10,api_version=0,correlation_id=20,client_id=consumer-1}, body={group_id=new-message-1}), createdTimeMs=1476797956485, sendTimeMs=1476797956485), responseBody={error_code=0,coordinator={node_id=1,host=10.10.44.124,port=9092}})
    [rhllor] Tue Oct 18 21:39:17 CST 2016 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator handle 550: kafka-example|NTI|Error ILLEGAL_GENERATION occurred while committing offsets for group new-message-1
    [rhllor] Tue Oct 18 21:39:17 CST 2016 WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator maybeAutoCommitOffsetsSync 445: kafka-example|NTI|Auto offset commit failed: 
    [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinPrepare 247: kafka-example|NTI|Revoking previously assigned partitions [rhllor-log-0, rhllor-log-1, rhllor-log-2]
    [rhllor] Tue Oct 18 21:39:17 CST 2016 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.springframework.kafka.listener.KafkaMessageListenerContainer onPartitionsRevoked 244: kafka-example|NTI|partitions revoked:[rhllor-log-0, rhllor-log-1, rhllor-log-2]
    [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator performGroupJoin 309: kafka-example|NTI|(Re-)joining group new-message-1
    [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator performGroupJoin 318: kafka-example|NTI|Issuing request (JOIN_GROUP: {group_id=new-message-1,session_timeout=15000,member_id=consumer-1-64063d04-9d4e-45af-a927-17ccf31c6ec1,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]}) to coordinator 2147483646
    [rhllor] Tue Oct 18 21:39:17 CST 2016 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle 354: kafka-example|NTI|Attempt to join group new-message-1 failed due to unknown member id, resetting and retrying.
    [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator performGroupJoin 309: kafka-example|NTI|(Re-)joining group new-message-1
    [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator performGroupJoin 318: kafka-example|NTI|Issuing request (JOIN_GROUP: {group_id=new-message-1,session_timeout=15000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]}) to coordinator 2147483646
    [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle 336: kafka-example|NTI|Joined group: {error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,member_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,members=[{member_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]}
    [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator performAssignment 225: kafka-example|NTI|Performing range assignment for subscriptions {consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2=org.apache.kafka.clients.consumer.internals.PartitionAssignor$Subscription@1dbca7d4}
    [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator performAssignment 229: kafka-example|NTI|Finished assignment: {consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2=org.apache.kafka.clients.consumer.internals.PartitionAssignor$Assignment@4826f394}
    [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator onJoinLeader 397: kafka-example|NTI|Issuing leader SyncGroup (SYNC_GROUP: {group_id=new-message-1,generation_id=1,member_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,group_assignment=[{member_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=38 cap=38]}]}) to coordinator 2147483646
    [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle 423: kafka-example|NTI|Received successful sync group response for group new-message-1: {error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=38 cap=38]}
    [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinComplete 191: kafka-example|NTI|Setting newly assigned partitions [rhllor-log-0, rhllor-log-1, rhllor-log-2]
    [rhllor] Tue Oct 18 21:39:17 CST 2016 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.springframework.kafka.listener.KafkaMessageListenerContainer onPartitionsAssigned 249: kafka-example|NTI|partitions assigned:[rhllor-log-0, rhllor-log-1, rhllor-log-2]
    [rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator sendOffsetFetchRequest 581: kafka-example|NTI|Fetching committed offsets for partitions: [rhllor-log-0, rhllor-log-1, rhllor-log-2]
    

    日志的意思大概是coordinator挂掉了,然后自动提交offset失败,然后重新分配partition给客户端

    • 由于自动提交offset失败,导致重新分配了partition的客户端又重新消费之前的一批数据
    • 接着consumer重新消费,又出现了消费超时,无限循环下去。

    解决方案

    遇到了这个问题之后, 我们做了一些步骤:

    • 提高了partition的数量,从而提高了consumer的并行能力,从而提高数据的消费能力
    • �对于单partition的消费线程,增加了一个固定长度的阻塞队列和工作线程池进一步提高并行消费的能力
    • �由于使用了spring-kafka,则把kafka-client的enable.auto.commit设置成了false,表示禁止kafka-client自动提交offset,因为就是之前的自动提交失败,导致offset永远没更新,从而转向使用spring-kafka的offset提交机制。并且spring-kafka提供了多种提交策略:
    /**
         * The ack mode to use when auto ack (in the configuration properties) is false.
         * <ul>
         * <li>RECORD: Ack after each record has been passed to the listener.</li>
         * <li>BATCH: Ack after each batch of records received from the consumer has been
         * passed to the listener</li>
         * <li>TIME: Ack after this number of milliseconds; (should be greater than
         * {@code #setPollTimeout(long) pollTimeout}.</li>
         * <li>COUNT: Ack after at least this number of records have been received</li>
         * <li>MANUAL: Listener is responsible for acking - use a
         * {@link AcknowledgingMessageListener}.
         * </ul>
         */
        private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH;
    

    这些策略保证了在一批消息没有完成消费的情况下,也能提交offset,从而避免了完全提交不上而导致永远重复消费的问题。

    分析

    那么问题来了,为什么spring-kafka的提交offset的策略能够解决spring-kafka的auto-commit的带来的重复消费的问题呢?下面通过分析spring-kafka的关键源码来解析这个问题。

    • 首先来看看spring-kafka的消费线程逻辑
    if (isRunning() && this.definedPartitions != null) { 
          initPartitionsIfNeeded();      
     // we start the invoker here as there will be no rebalance calls to       
    // trigger it, but only if the container is not set to autocommit       
    // otherwise we will process records on a separate thread      
         if (!this.autoCommit) {        
                startInvoker();     
         }
     }
    

    上面可以看到,如果auto.commit关掉的话,spring-kafka会启动一个invoker,这个invoker的目的就是启动一个线程去消费数据,他消费的数据不是直接从kafka里面直接取的,那么他消费的数据从哪里来呢?他是从一个spring-kafka自己创建的阻塞队列里面取的。

    • 然后会进入一个循环,从源代码中可以看到如果auto.commit被关掉的话, 他会先把之前处理过的数据先进行提交offset,然后再去从kafka里面取数据。

    • 然后把取到的数据丢给上面提到的阻塞列队,由上面创建的线程去消费,并且如果阻塞队列满了导致取到的数据塞不进去的话,spring-kafka会调用kafka的pause方法,则consumer会停止从kafka里面继续再拿数据。

    • 接着spring-kafka还会处理一些异常的情况,比如失败之后是不是需要commit offset这样的逻辑。

    最后

    • spring-kafka是一个很好的用来操作kafka的库,并且可以和spring进行完美结合。
    • spring-kafka提供了一些kafka使用上功能的扩展。
    • 相比于使用原生的kafka-client的api的话,使用更加简单,需要编写的码量更少。
    • 最好能够使用最新的kafka(0.10.0)和spring-kafka(1.1.1.RELEASE)的版本

    相关文章

      网友评论

      • 2d7a87b0f412:有没有样例代码呢?
      • 2d7a87b0f412:comsumer能不能一条一条取, 处理之后再 手动commit, 求样例代码
      • 特步男孩儿:作者你好,我现在发现个问题,消费者消费消息后,如果处理异常后没有提交ack,这个消费者就不能消费消息了,而且是有时能消费,有时消费不了,请能指点什么原因么
      • 022fdd51de0b:作者你好,文中所说当设置enable.auto.commit=false后,spring-kafka底层会才用一个队列来预取数据,就算MessageListener中没有进行手动提交,spring-kafka自己也会进行commit。这样一来,MessageListener中的手动提交的意义是什么呢?我也发现了这个问题,当我采用手动提交的时候,onCommit监听中的监听次数总是比实际消费的消息数多一些。比如消费了10条数据,但是onCommit监听了15次commit动作。
      • 锋帅:最后的逻辑写比较混乱,狠容易误导人
        LOC_Thomas:还是建议大家看官方文档,这里的记录其实是对自己的一个总结而已, 如果误导了其他人,深表歉意。
      • 1b1d84d0b266:可以把session.timeout.ms设置成无限时或者不要超时时间么?
      • 4332aa0e610d:请教一下,使用spring-kafka的offset提交机制,代码里怎样自动提交offset呢?
        null_bb28:spring-kafka 和单独使用kafka 都会报错,貌似spring并没有解决问题

        java.io.EOFException
        at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
        at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
        at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
        at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
        at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343)
        at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:136)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:197)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:248)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
        at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:532)
        at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
        at java.util.concurrent.FutureTask.run(FutureTask.java:266)
        at java.lang.Thread.run(Thread.java:745)
        LOC_Thomas:@钱多多猫 spring-kafka他自己管理了offset的提交机制的,自己的代码里面不用关系offset的提交,提交机制的选择在创建bean的时候可以自行配制,默认的是batch模式。
      • f820dbf78065:enable.auto.commit=false,如果把线程kill掉几次,同样的consumerRecord不会出现了,这个会是什么原因呢?
      • ec5f4c017aec:enable.auto.commit如果设置为true,按照官方文档理解,应该是auto.commit.interval.ms这个周期会自动提交,但我实际测试中发现auto.commit.interval.ms这个周期内如果没有处理完这一批消息,没有自动提交;所以猜测是不是一次处理时间小于auto.commit.interval.ms时,会到这个时间提交,否则只能等待一次处理完才提交
        LOC_Thomas:@Captain盖_迎新 auto.commit.interval.ms时间一般都很短,默认的好像就1000ms还是100ms吧?
        LOC_Thomas:@Captain盖_迎新 这个可以针对三楼的解释,在一定时间内没有处理完并且没有提交ack和auto.commit.interval.ms时间应该是没有关系的
      • 不务正业的coder:你提到spring-kakka本地起了一个阻塞队列,如果你的consumer端APP在重启的时候,队列里还有数据没有被消费怎么办,这个阻塞队里的数据如果只是保存在内存中,重启会造成数据丢失
        不务正业的coder:@浮云1o__o1 3Q,有相关的consumer demo么
        LOC_Thomas:@本宅有心 不是的, 他是开始是保存在内存中,如果消息被消费了, 他会进行commit ack到zk上面,如果这个时候进程被kill了, 内存队列里面有数据,但是没有进行commit ack到zk,所以下次启动, 会根据zk上面offset重新进行消费
      • e4433fda5e7b:针对这句话“consumer在heartbeat.interval.ms时间之内没有消费完成”我觉得是session.timeout.ms 这个参数吧! “heartbeat.interval.ms” 是间隔多少秒生产一个心跳并放入心跳队列,等到下次poll的时候发出,如果没有在session.timeout.ms的时间内发出心态,协调器则认为消费者丢失。从而导致错误。
        LOC_Thomas:@hfa1s2 多谢指出错误
        网上有查到:
        ```
        The amount of time a consumer can be out of contact with the brokers while still considered alive, defaults to 3 seconds. If a consumer goes for more than session.timeout.ms without sending a heartbeat to the group coordinator, it is considered dead and the group coordinator will trigger a rebalance of the consumer group to allocate partitions from the dead consumer to the other consumers in the group. This property is closely related to heartbeat.interval.ms. heartbeat.interval.ms controls how frequently the KafkaConsumer poll() method will send a heartbeat to the group coordinator, while session.timeout.ms controls how long can a consumer go without sending a heartbeat. Therefore, thoese two properties are typically modified together - heatbeat.interval.ms must be lower than session.timeout.ms, and is usually set to a 1/3 of the timeout value. So if session.timeout.ms is 3 seconds, heartbeat.interval.ms should be 1 second. Setting session.timeout.ms lower than default will allow consumer groups to detect and recover from failure sooner, but may also cause unwanted rebalances as result of consumers taking longer to complete the poll loop or garbage collection. Setting session.timeout.ms higher will reduce the chance of accidental rebalance, but also means it will take longer to detect a real failure.
        ```
      • 明翼:是不是可以把心跳时间扩大点,这样保证在这个时间内可以取到,我们用高级API消费也遇到这个问题,数据重复
        LOC_Thomas:@明翼
        我个人觉得单纯把心跳时间扩大,不能从根本上解决,因为不知道一次consumer能拉到多少数据,也不能保证消费端一定在一个时间段内消费完所有拉到的数据,并且进行commit。 我是这么理解的,不能保证是否正确。

      本文标题:总结kafka的consumer消费能力很低的情况下的处理方案

      本文链接:https://www.haomeiwen.com/subject/npwquttx.html