根据Kafka subscribe 和 assign接口使用以及rebalancing说明:
1 当使用kafka subscribe接口时,即订阅模式时,需要指定group id。同一个消费组的consumer由coordinator来决定哪个consumer消费哪个(topic, partition)。当topic的partition或者broker发生变化时,会发生rebalance,即重新选择consumer和(topic,partition)的关系。通知给所有的consumer。
2 当使用assign接口时 consumer和(topic,partition)之间的关系由consumer来决定了。 一旦topic的partition发生了变化,需要由consumer自行处理这种情况。
根据Spark streaming kafka的官网样例:样例中的代码使用了Subscribe接口。但是文中也指出:
ConsumerStrategies.Subscribe, as shown above, allows you to subscribe to a fixed collection of topics. SubscribePattern allows you to use a regex to specify topics of interest. Note that unlike the 0.8 integration, using Subscribe or SubscribePattern should respond to adding partitions during a running stream. Finally, Assign allows you to specify a fixed collection of partitions. All three strategies have overloaded constructors that allow you to specify the starting offset for a particular partition.
即assign接口也是OK的,同时会存在partition发生变化无法得知的问题。
找到的相关issue:
1 reactive-kafka
2 KAFKA-2359
3 Question-about-upgrading-Kafka-client
即使用订阅模式时,consumer没有被唤醒。其中2为kafka官网issue,由于问题不好重现,目前未解决。
可能是kafka潜在的bug或者spark steaming kafka使用kafka subscribe api有潜在的问题。
这里调整为assign接口后,目前该问题未再重现。
注意根据spark streaming kafka 官网文档:
If your Spark batch duration is larger than the default Kafka heartbeat session timeout (30 seconds), increase heartbeat.interval.ms and session.timeout.ms appropriately. For batches larger than 5 minutes, this will require changing group.max.session.timeout.ms on the broker.
即当spark streaming的batch interval大于30s时,应当调整kafka相关参数 heartbeat.interval.ms 和 session.timeout.ms。 这里没有给出细节,猜测是可能会引发rebalance。目前运行环境中没有其他异常,暂时不做处理。
kafka相关参数详见: http://www.stratio.com/blog/optimizing-spark-streaming-applications-apache-kafka/
网友评论