美文网首页
spark streaming org.apache.spark

spark streaming org.apache.spark

作者: 邵红晓 | 来源:发表于2021-10-21 16:54 被阅读0次

    背景:

    一个古老的项目运行了好多年,突然报错了
    spark version 1.6.3
    异常如下

    21/10/21 13:28:30 ERROR ApplicationMaster: User class threw exception: org.apache.spark.SparkException: ArrayBuffer(java.nio.channels.ClosedChannelException)
    org.apache.spark.SparkException: ArrayBuffer(java.nio.channels.ClosedChannelException)
            at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.latestLeaderOffsets(DirectKafkaInputDStream.scala:123)
            at org.apache.spark.streaming.kafka.DirectKafkaInputDStream.compute(DirectKafkaInputDStream.scala:145)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
            at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
            at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
            at scala.Option.orElse(Option.scala:257)
            at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
            at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
            at org.apache.spark.streaming.dstream.TransformedDStream$$anonfun$6.apply(TransformedDStream.scala:42)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
            at scala.collection.TraversableLike$$anonfun$map$1.apply(TraversableLike.scala:244)
            at scala.collection.immutable.List.foreach(List.scala:318)
            at scala.collection.TraversableLike$class.map(TraversableLike.scala:244)
            at scala.collection.AbstractTraversable.map(Traversable.scala:105)
            at org.apache.spark.streaming.dstream.TransformedDStream.compute(TransformedDStream.scala:42)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
            at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
            at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
            at org.apache.spark.streaming.dstream.TransformedDStream.createRDDWithLocalProperties(TransformedDStream.scala:65)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
            at scala.Option.orElse(Option.scala:257)
            at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
            at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
            at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
            at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
            at scala.Option.orElse(Option.scala:257)
            at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
            at org.apache.spark.streaming.dstream.MappedDStream.compute(MappedDStream.scala:35)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
            at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
            at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
            at scala.Option.orElse(Option.scala:257)
            at org.apache.spark.streaming.dstream.DStream.getOrCompute(DStream.scala:341)
            at org.apache.spark.streaming.dstream.FilteredDStream.compute(FilteredDStream.scala:35)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1$$anonfun$apply$7.apply(DStream.scala:352)
            at scala.util.DynamicVariable.withValue(DynamicVariable.scala:57)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1$$anonfun$1.apply(DStream.scala:351)
            at org.apache.spark.streaming.dstream.DStream.createRDDWithLocalProperties(DStream.scala:426)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:346)
            at org.apache.spark.streaming.dstream.DStream$$anonfun$getOrCompute$1.apply(DStream.scala:344)
    

    源码分析

      protected val kc = new KafkaCluster(kafkaParams)
      @tailrec
      protected final def latestLeaderOffsets(retries: Int): Map[TopicAndPartition, LeaderOffset] = {
        val o = kc.getLatestLeaderOffsets(currentOffsets.keySet)
        // Either.fold would confuse @tailrec, do it manually
        if (o.isLeft) {
          val err = o.left.get.toString
          if (retries <= 0) {
            throw new SparkException(err)
          } else {
            logError(err)
            Thread.sleep(kc.config.refreshLeaderBackoffMs)
            latestLeaderOffsets(retries - 1)
          }
        } else {
          o.right.get
        }
      }
    

    1、在DirectKafkaInputDStream拉取数据过程中传入TopicAndPartition,拉取可消费的offset值创建KafkaRDD过程中,拉取失败,超过重试次数,直接报错
    固调整spark.streaming.kafka.maxRetries 默认1,调整为6
    2、kafka-0.8 comsumer参数调整刷新时间间隔
    refresh.leader.backoff.ms=200ms,调整为1000ms

    注意:新版本kafka-10已经没有上述参数,直接使用val msgs = c.poll(0)进行获取offset值

    相关文章

      网友评论

          本文标题:spark streaming org.apache.spark

          本文链接:https://www.haomeiwen.com/subject/dgofaltx.html