美文网首页
Streaming消费Kafka消息遇到rebalance故障

Streaming消费Kafka消息遇到rebalance故障

作者: ryancao_b9b9 | 来源:发表于2020-04-01 10:02 被阅读0次

    一、任务背景

    上游Kafka消息量:60W/S,64个分区
    Streaming资源:32Executor(堆内1G 堆外2G) 64Threads
    Batch窗口:300S

    二、解决过程
    1、故障描述
    spark streaming任务运行失败,故障日志如下:

    org.apache.kafka.clients.consumer.CommitFailedException: Commit cannot be completed since the group has already rebalanced and assigned the partitions to another member. This means that the time between subsequent calls to poll() was longer than the configured session.timeout.ms, which typically implies that the poll loop is spending too much time message processing. You can address this either by increasing the session timeout or by reducing the maximum size of batches returned in poll() with max.poll.records.
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:600)
        at org.apache.kafka.clients.consumer.internals.ConsumerCoordinator$OffsetCommitResponseHandler.handle(ConsumerCoordinator.java:541)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:679)
        at org.apache.kafka.clients.consumer.internals.AbstractCoordinator$CoordinatorResponseHandler.onSuccess(AbstractCoordinator.java:658)
        at org.apache.kafka.clients.consumer.internals.RequestFuture$1.onSuccess(RequestFuture.java:167)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.fireSuccess(RequestFuture.java:133)
        at org.apache.kafka.clients.consumer.internals.RequestFuture.complete(RequestFuture.java:107)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient$RequestFutureCompletionHandler.onComplete(ConsumerNetworkClient.java:426)
        at org.apache.kafka.clients.NetworkClient.poll(NetworkClient.java:278)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.clientPoll(ConsumerNetworkClient.java:360)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:224)
        at org.apache.kafka.clients.consumer.internals.ConsumerNetworkClient.poll(ConsumerNetworkClient.java:201)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:999)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
    

    2、故障分析
    上述错误日志含义:消费者在处理完一批poll的消息后,在同步提交偏移量给broker时报的错。初步分析日志是由于当前消费者线程消费的分区已经被broker给回收了。

    Kafka消费者配置参数:
    max.poll.interval.ms = 600000
    Streaming相关配置参数:
    spark.streaming.kafka.maxRatePerPartition=50000

    任务停止后再启动,因上游积压了大量数据,安装上述配置启动时每批最大的数据量为960,000,000【maxRatePerPartition的值 * kafka分区数 * 窗口时间】,某次重启运行图如下:


    某次重启截图.png

    大概耗时11分钟,同时本例中设置max.poll.interval.ms = 600000【该属性为kafka消费者在每一轮poll()调用之间的最大延迟时长,消费者在获取更多记录之前可以空闲的时间量的上限。如果此超时时间期满之前poll()没有被再次调用,则消费者被视为失败,并且分组将重新平衡。】,综合上述配置结果会出现因为某批次处理时长超过kafka等待poll的最大时长导致kafka集群进行rebalance,和本例的故障现象吻合。

    3、解决方案
    (a)延长Poll等待时长
    (b)降低批次最大数据处理量
    调整kafka相关参数如下:

    max.poll.interval.ms = 1200000
    spark.streaming.kafka.maxRatePerPartition=15000
    运行效果如下:


    参数优化后效果.png

    三、参考文章
    1、https://www.jianshu.com/p/271f88f06eb3

    相关文章

      网友评论

          本文标题:Streaming消费Kafka消息遇到rebalance故障

          本文链接:https://www.haomeiwen.com/subject/trkouhtx.html