简介
由于项目中需要使用kafka作为消息队列,并且项目是基于spring-boot来进行构建的,所以项目采用了spring-kafka作为原生kafka的一个扩展库进行使用。先说明一下版本:
- spring-boot 的版本是1.4.0.RELEASE
- kafka 的版本是0.9.0.x 版本
- spring-kafka 的版本是1.0.3.RELEASE
用过kafka的人都知道,对于使用kafka来说,producer的使用相对简单一些,只需要把数据按照指定的格式发送给kafka中某一个topic就可以了。本文主要是针对spring-kafka的consumer端上的使用进行简单一些分析和总结。
kafka的速度是很快,所以一般来说producer的生产消息的逻辑速度都会比consumer的消费消息的逻辑速度快。
具体案例
之前在项目中遇到了一个案例是,consumer消费一条数据平均需要200ms的时间,并且在某个时刻,producer会在短时间内产生大量的数据丢进kafka的broker里面(假设平均1s中内丢入了5w条需要消费的消息,这个情况会持续几分钟)。
对于这种情况,kafka的consumer的行为会是:
- kafka的consumer会从broker里面取出一批数据,�给消费线程进行消费。
- 由于取出的一批消息数量太大,consumer在
session.timeout.ms
时间之内没有消费完成 -
consumer coordinator
会由于没有接受到心跳而挂掉,并且出现一些日志
[rhllor] Tue Oct 18 21:39:16 CST 2016 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator coordinatorDead 529: kafka-example|NTI|Marking the coordinator 2147483646 dead.
[rhllor] Tue Oct 18 21:39:16 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator sendGroupMetadataRequest 465: kafka-example|NTI|Issuing group metadata request to broker 1
[rhllor] Tue Oct 18 21:39:16 CST 2016 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator handle 550: kafka-example|NTI|Error ILLEGAL_GENERATION occurred while committing offsets for group new-message-1
[rhllor] Tue Oct 18 21:39:16 CST 2016 WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onComplete 424: kafka-example|NTI|Auto offset commit failed: Commit cannot be completed due to group rebalance
[rhllor] Tue Oct 18 21:39:16 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator run 408: kafka-example|NTI|Cannot auto-commit offsets now since the coordinator is unknown, will retry after backoff
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handleGroupMetadataResponse 478: kafka-example|NTI|Group metadata response ClientResponse(receivedTimeMs=1476797957072, disconnected=false, request=ClientRequest(expectResponse=true, callback=org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler@1d3d7e6, request=RequestSend(header={api_key=10,api_version=0,correlation_id=20,client_id=consumer-1}, body={group_id=new-message-1}), createdTimeMs=1476797956485, sendTimeMs=1476797956485), responseBody={error_code=0,coordinator={node_id=1,host=10.10.44.124,port=9092}})
[rhllor] Tue Oct 18 21:39:17 CST 2016 ERROR [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator handle 550: kafka-example|NTI|Error ILLEGAL_GENERATION occurred while committing offsets for group new-message-1
[rhllor] Tue Oct 18 21:39:17 CST 2016 WARN [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator maybeAutoCommitOffsetsSync 445: kafka-example|NTI|Auto offset commit failed:
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinPrepare 247: kafka-example|NTI|Revoking previously assigned partitions [rhllor-log-0, rhllor-log-1, rhllor-log-2]
[rhllor] Tue Oct 18 21:39:17 CST 2016 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.springframework.kafka.listener.KafkaMessageListenerContainer onPartitionsRevoked 244: kafka-example|NTI|partitions revoked:[rhllor-log-0, rhllor-log-1, rhllor-log-2]
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator performGroupJoin 309: kafka-example|NTI|(Re-)joining group new-message-1
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator performGroupJoin 318: kafka-example|NTI|Issuing request (JOIN_GROUP: {group_id=new-message-1,session_timeout=15000,member_id=consumer-1-64063d04-9d4e-45af-a927-17ccf31c6ec1,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]}) to coordinator 2147483646
[rhllor] Tue Oct 18 21:39:17 CST 2016 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle 354: kafka-example|NTI|Attempt to join group new-message-1 failed due to unknown member id, resetting and retrying.
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator performGroupJoin 309: kafka-example|NTI|(Re-)joining group new-message-1
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator performGroupJoin 318: kafka-example|NTI|Issuing request (JOIN_GROUP: {group_id=new-message-1,session_timeout=15000,member_id=,protocol_type=consumer,group_protocols=[{protocol_name=range,protocol_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]}) to coordinator 2147483646
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle 336: kafka-example|NTI|Joined group: {error_code=0,generation_id=1,group_protocol=range,leader_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,member_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,members=[{member_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,member_metadata=java.nio.HeapByteBuffer[pos=0 lim=22 cap=22]}]}
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator performAssignment 225: kafka-example|NTI|Performing range assignment for subscriptions {consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2=org.apache.kafka.clients.consumer.internals.PartitionAssignor$Subscription@1dbca7d4}
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator performAssignment 229: kafka-example|NTI|Finished assignment: {consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2=org.apache.kafka.clients.consumer.internals.PartitionAssignor$Assignment@4826f394}
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator onJoinLeader 397: kafka-example|NTI|Issuing leader SyncGroup (SYNC_GROUP: {group_id=new-message-1,generation_id=1,member_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,group_assignment=[{member_id=consumer-1-d3f30611-5788-4b81-bf0d-e779a11093d2,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=38 cap=38]}]}) to coordinator 2147483646
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.AbstractCoordinator handle 423: kafka-example|NTI|Received successful sync group response for group new-message-1: {error_code=0,member_assignment=java.nio.HeapByteBuffer[pos=0 lim=38 cap=38]}
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator onJoinComplete 191: kafka-example|NTI|Setting newly assigned partitions [rhllor-log-0, rhllor-log-1, rhllor-log-2]
[rhllor] Tue Oct 18 21:39:17 CST 2016 INFO [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.springframework.kafka.listener.KafkaMessageListenerContainer onPartitionsAssigned 249: kafka-example|NTI|partitions assigned:[rhllor-log-0, rhllor-log-1, rhllor-log-2]
[rhllor] Tue Oct 18 21:39:17 CST 2016 DEBUG [org.springframework.kafka.KafkaListenerEndpointContainer#0-0-kafka-consumer-1] org.apache.kafka.clients.consumer.internals.ConsumerCoordinator sendOffsetFetchRequest 581: kafka-example|NTI|Fetching committed offsets for partitions: [rhllor-log-0, rhllor-log-1, rhllor-log-2]
日志的意思大概是coordinator
挂掉了,然后自动提交offset失败,然后重新分配partition给客户端
- 由于自动提交offset失败,导致重新分配了partition的客户端又重新消费之前的一批数据
- 接着consumer重新消费,又出现了消费超时,无限循环下去。
解决方案
遇到了这个问题之后, 我们做了一些步骤:
- 提高了partition的数量,从而提高了consumer的并行能力,从而提高数据的消费能力
- �对于单partition的消费线程,增加了一个固定长度的阻塞队列和工作线程池进一步提高并行消费的能力
- �由于使用了spring-kafka,则把kafka-client的
enable.auto.commit
设置成了false,表示禁止kafka-client自动提交offset,因为就是之前的自动提交失败,导致offset永远没更新,从而转向使用spring-kafka的offset提交机制。并且spring-kafka提供了多种提交策略:
/**
* The ack mode to use when auto ack (in the configuration properties) is false.
* <ul>
* <li>RECORD: Ack after each record has been passed to the listener.</li>
* <li>BATCH: Ack after each batch of records received from the consumer has been
* passed to the listener</li>
* <li>TIME: Ack after this number of milliseconds; (should be greater than
* {@code #setPollTimeout(long) pollTimeout}.</li>
* <li>COUNT: Ack after at least this number of records have been received</li>
* <li>MANUAL: Listener is responsible for acking - use a
* {@link AcknowledgingMessageListener}.
* </ul>
*/
private AbstractMessageListenerContainer.AckMode ackMode = AckMode.BATCH;
这些策略保证了在一批消息没有完成消费的情况下,也能提交offset,从而避免了完全提交不上而导致永远重复消费的问题。
分析
那么问题来了,为什么spring-kafka的提交offset的策略能够解决spring-kafka的auto-commit的带来的重复消费的问题呢?下面通过分析spring-kafka的关键源码来解析这个问题。
- 首先来看看spring-kafka的消费线程逻辑
if (isRunning() && this.definedPartitions != null) {
initPartitionsIfNeeded();
// we start the invoker here as there will be no rebalance calls to
// trigger it, but only if the container is not set to autocommit
// otherwise we will process records on a separate thread
if (!this.autoCommit) {
startInvoker();
}
}
上面可以看到,如果auto.commit
关掉的话,spring-kafka会启动一个invoker,这个invoker的目的就是启动一个线程去消费数据,他消费的数据不是直接从kafka里面直接取的,那么他消费的数据从哪里来呢?他是从一个spring-kafka自己创建的阻塞队列里面取的。
-
然后会进入一个循环,从源代码中可以看到如果
auto.commit
被关掉的话, 他会先把之前处理过的数据先进行提交offset,然后再去从kafka里面取数据。 -
然后把取到的数据丢给上面提到的阻塞列队,由上面创建的线程去消费,并且如果阻塞队列满了导致取到的数据塞不进去的话,spring-kafka会调用kafka的pause方法,则consumer会停止从kafka里面继续再拿数据。
-
接着spring-kafka还会处理一些异常的情况,比如失败之后是不是需要commit offset这样的逻辑。
最后
- spring-kafka是一个很好的用来操作kafka的库,并且可以和spring进行完美结合。
- spring-kafka提供了一些kafka使用上功能的扩展。
- 相比于使用原生的kafka-client的api的话,使用更加简单,需要编写的码量更少。
- 最好能够使用最新的kafka(0.10.0)和spring-kafka(1.1.1.RELEASE)的版本
网友评论
java.io.EOFException
at org.apache.kafka.common.network.NetworkReceive.readFromReadableChannel(NetworkReceive.java:83)
at org.apache.kafka.common.network.NetworkReceive.readFrom(NetworkReceive.java:71)
at org.apache.kafka.common.network.KafkaChannel.receive(KafkaChannel.java:154)
at org.apache.kafka.common.network.KafkaChannel.read(KafkaChannel.java:135)
at org.apache.kafka.common.network.Selector.pollSelectionKeys(Selector.java:343)
at org.apache.kafka.common.network.Selector.poll(Selector.java:291)
at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:260)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:232)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:209)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:148)
at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.awaitMetadataUpdate(ConsumerNetworkClient.java:136)
at org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureCoordinatorReady(AbstractCoordinator.java:197)
at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:248)
at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1013)
at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:979)
at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:532)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.run(FutureTask.java:266)
at java.lang.Thread.run(Thread.java:745)
网上有查到:
```
The amount of time a consumer can be out of contact with the brokers while still considered alive, defaults to 3 seconds. If a consumer goes for more than session.timeout.ms without sending a heartbeat to the group coordinator, it is considered dead and the group coordinator will trigger a rebalance of the consumer group to allocate partitions from the dead consumer to the other consumers in the group. This property is closely related to heartbeat.interval.ms. heartbeat.interval.ms controls how frequently the KafkaConsumer poll() method will send a heartbeat to the group coordinator, while session.timeout.ms controls how long can a consumer go without sending a heartbeat. Therefore, thoese two properties are typically modified together - heatbeat.interval.ms must be lower than session.timeout.ms, and is usually set to a 1/3 of the timeout value. So if session.timeout.ms is 3 seconds, heartbeat.interval.ms should be 1 second. Setting session.timeout.ms lower than default will allow consumer groups to detect and recover from failure sooner, but may also cause unwanted rebalances as result of consumers taking longer to complete the poll loop or garbage collection. Setting session.timeout.ms higher will reduce the chance of accidental rebalance, but also means it will take longer to detect a real failure.
```
我个人觉得单纯把心跳时间扩大,不能从根本上解决,因为不知道一次consumer能拉到多少数据,也不能保证消费端一定在一个时间段内消费完所有拉到的数据,并且进行commit。 我是这么理解的,不能保证是否正确。