美文网首页
SparkStreaming kafka 某个 partitio

SparkStreaming kafka 某个 partitio

作者: pcqlegend | 来源:发表于2018-05-02 15:27 被阅读0次

    三台kafka broker
    0,1,2
    topic test 两个分区 0,1,每个分区三个replica
    如下图所示 :


    image.png

    如果这个时候我直接kill掉1节点,这个时候出现如下所示


    image.png
    我们看到partition 0的leader 从1 切换成2 了。isr(In-Sync-Replica)从1,0变为 2
    这个时候我的Job异常退出了。错误如下
    org.apache.spark.SparkException: ArrayBuffer(java.nio.channels.ClosedChannelException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([test,0]))
        at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
        at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
        at scala.Option.orElse(Option.scala:257)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
        at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
        at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
        at scala.Option.orElse(Option.scala:257)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
        at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
        at scala.Option.orElse(Option.scala:257)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    Exception in thread "main" org.apache.spark.SparkException: ArrayBuffer(java.nio.channels.ClosedChannelException, org.apache.spark.SparkException: Couldn't find leader offsets for Set([test,0]))
        at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
        at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
        at scala.Option.orElse(Option.scala:257)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
        at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
        at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
        at scala.collection.immutable.List.foreach(List.scala:318)
        at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
        at scala.collection.AbstractTraversable.map(Traversable.scala:105)
        at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
        at scala.Option.orElse(Option.scala:257)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
        at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
        at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
        at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
        at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
        at scala.Option.orElse(Option.scala:257)
        at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
        at org.apache.spark.streaming.dstream.ForEachDStream.generateJob(ForEachDStream.scala:47)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:115)
        at org.apache.spark.streaming.DStreamGraph$$anonfun$1.apply(DStreamGraph.scala:114)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.TraversableLike$$anonfun$flatMap$1.apply(TraversableLike.scala:251)
        at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59)
        at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47)
        at scala.collection.TraversableLike$class.flatMap(TraversableLike.scala:251)
        at scala.collection.AbstractTraversable.flatMap(Traversable.scala:105)
        at org.apache.spark.streaming.DStreamGraph.generateJobs(DStreamGraph.scala:114)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:248)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anonfun$3.apply(JobGenerator.scala:246)
        at scala.util.Try$.apply(Try.scala:161)
        at org.apache.spark.streaming.scheduler.JobGenerator.generateJobs(JobGenerator.scala:246)
        at org.apache.spark.streaming.scheduler.JobGenerator.org$apache$spark$streaming$scheduler$JobGenerator$$processEvent(JobGenerator.scala:181)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:87)
        at org.apache.spark.streaming.scheduler.JobGenerator$$anon$1.onReceive(JobGenerator.scala:86)
        at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:48)
    

    所以从错误提示可以看到,无法找到topic test 分区0的leader的offsets
    所以如果想要job稳定运行需要对这个异常进行处理。
    可以给spark添加这个参数
    spark.streaming.kafka.maxRetries=3 //默认值是1
    然后kafka client 丢失当前leader的时候刷新leader的时间设置成 5000, 默认是200ms

      /** backoff time to refresh the leader of a partition after it loses the current leader */
      val refreshLeaderBackoffMs = props.getInt("refresh.leader.backoff.ms", RefreshMetadataBackoffMs)
    

    参考代码
    org.apache.spark.streaming.kafka.DirectKafkaInputDStream#latestLeaderOffsets

    
      @tailrec
      protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
        val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
        // Either.fold would confuse @tailrec, do it manually
        if (o.isLeft) {
          val err = o.left.get.toString
          if (retries <= 0) {
            throw new SparkException(err)
          } else {
            log.error(err)
            Thread.sleep(kc.config.refreshLeaderBackoffMs)
            latestLeaderOffsets(retries - 1)
          }
        } else {
          o.right.get
        }
      }
    

    相关文章

      网友评论

          本文标题:SparkStreaming kafka 某个 partitio

          本文链接:https://www.haomeiwen.com/subject/gtrcrftx.html