美文网首页
[SPARK-19680] OffsetOutOfRangeEx

[SPARK-19680] OffsetOutOfRangeEx

作者: 竹子平江 | 来源:发表于2018-10-31 21:08 被阅读0次

    当kafka中的数据丢失时,Spark程序消费kafka中数据的时候就可能会出现以下异常:

    Lost task 12.0 in stage 398.0 (TID 2311, localhost, executor driver): org.apache.kafka.clients.consumer.OffsetOutOfRangeException: Offsets out of range with no configured reset policy for partitions: {abc_2018-0=151260}
        at org.apache.kafka.clients.consumer.internals.Fetcher.parseFetchedData(Fetcher.java:588)
        at org.apache.kafka.clients.consumer.internals.Fetcher.fetchedRecords(Fetcher.java:354)
        at org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1000)
        at org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:938)
        at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.poll(CachedKafkaConsumer.scala:99)
        at org.apache.spark.streaming.kafka010.CachedKafkaConsumer.get(CachedKafkaConsumer.scala:70)
        at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:223)
        at org.apache.spark.streaming.kafka010.KafkaRDD$KafkaRDDIterator.next(KafkaRDD.scala:189)
        at scala.collection.Iterator$$anon$11.next(Iterator.scala:410)
        at scala.collection.Iterator$$anon$13.hasNext(Iterator.scala:463)
        at scala.collection.Iterator$$anon$11.hasNext(Iterator.scala:409)
        at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:125)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:96)
        at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
        at org.apache.spark.scheduler.Task.run(Task.scala:109)
        at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:338)
        at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1142)
        at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:617)
        at java.lang.Thread.run(Thread.java:745)
    

    原因分析

    Spark在创建Kafka RDD时会将kafkaParams 中的 auto.offset.reset 强制修改为none因此,当在zookeeper中获取到的offset超出kafka中topic有效offset范围时,就会报这个异常。这个异常通常出现在kafka中的数据丢失或过期所导致。
    问题源码参考:

    DirectKafkaInputDStream.scala:218
    DirectKafkaInputDStream.scala:63
    KafkaUtils.scala:205
    

    解决方案

    在创建KafkaRDD时,设置验证过的offset,代码如下:

    /**
      * Kafka辅助处理工具
      */
    object MyKafkaUtils {
      private val logger: Logger = LoggerFactory.getLogger(this.getClass)
    
      /**
        * 获取最小offset
        *
        * @param consumer   消费者
        * @param partitions topic分区
        * @return
        */
      def getEarliestOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
        consumer.seekToBeginning(partitions)
        partitions.map(tp => tp -> consumer.position(tp)).toMap
      }
    
      /**
        * 获取最小offset
        * Returns the earliest (lowest) available offsets, taking new partitions into account.
        *
        * @param kafkaParams kafka客户端配置
        * @param topics      获取获取offset的topic
        */
      def getEarliestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
        val newKafkaParams = mutable.Map[String, Object]()
        newKafkaParams ++= kafkaParams
        newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest")
        val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
        consumer.subscribe(topics)
        val parts = consumer.assignment()
        consumer.seekToBeginning(parts)
        consumer.pause(parts)
        val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
        consumer.unsubscribe()
        consumer.close()
        offsets
      }
    
      /**
        * 获取最大offset
        *
        * @param consumer   消费者
        * @param partitions topic分区
        * @return
        */
      def getLatestOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
        consumer.seekToEnd(partitions)
        partitions.map(tp => tp -> consumer.position(tp)).toMap
      }
    
      /**
        * 获取最大offset
        * Returns the latest (highest) available offsets, taking new partitions into account.
        *
        * @param kafkaParams kafka客户端配置
        * @param topics      需要获取offset的topic
        **/
      def getLatestOffsets(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
        val newKafkaParams = mutable.Map[String, Object]()
        newKafkaParams ++= kafkaParams
        newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest")
        val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
        consumer.subscribe(topics)
        val parts = consumer.assignment()
        consumer.seekToEnd(parts)
        consumer.pause(parts)
        val offsets = parts.map(tp => tp -> consumer.position(tp)).toMap
        consumer.unsubscribe()
        consumer.close()
        offsets
      }
    
      /**
        * 获取消费者当前offset
        *
        * @param consumer   消费者
        * @param partitions topic分区
        * @return
        */
      def getCurrentOffsets(consumer: Consumer[_, _], partitions: Set[TopicPartition]): Map[TopicPartition, Long] = {
        partitions.map(tp => tp -> consumer.position(tp)).toMap
      }
    
      /**
        * 获取offsets
        *
        * @param kafkaParams kafka参数
        * @param topics      topic
        * @return
        */
      def getCurrentOffset(kafkaParams: Map[String, Object], topics: Iterable[String]): Map[TopicPartition, Long] = {
        val offsetResetConfig = kafkaParams.getOrElse(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "latest").toString.toLowerCase
        val newKafkaParams = mutable.Map[String, Object]()
        newKafkaParams ++= kafkaParams
        newKafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
        val consumer: KafkaConsumer[String, Array[Byte]] = new KafkaConsumer[String, Array[Byte]](newKafkaParams)
        consumer.subscribe(topics)
        val notOffsetTopicPartition = mutable.Set[TopicPartition]()
        try {
          consumer.poll(0)
        } catch {
          case ex: NoOffsetForPartitionException =>
            logger.warn(s"consumer topic partition offset not found:${ex.partition()}")
            notOffsetTopicPartition.add(ex.partition())
        }
        val parts = consumer.assignment().toSet
        consumer.pause(parts)
        val topicPartition = parts.diff(notOffsetTopicPartition)
        //获取当前offset
        val currentOffset = mutable.Map[TopicPartition, Long]()
        topicPartition.foreach(x => {
          try {
            currentOffset.put(x, consumer.position(x))
          } catch {
            case ex: NoOffsetForPartitionException =>
              logger.warn(s"consumer topic partition offset not found:${ex.partition()}")
              notOffsetTopicPartition.add(ex.partition())
          }
        })
        //获取earliestOffset
        val earliestOffset = getEarliestOffsets(consumer, parts)
        earliestOffset.foreach(x => {
          val value = currentOffset.get(x._1)
          if (value.isEmpty) {
            currentOffset(x._1) = x._2
          } else if (value.get < x._2) {
            logger.warn(s"kafka data is lost from partition:${x._1} offset ${value.get} to ${x._2}")
            currentOffset(x._1) = x._2
          }
        })
        //获取lastOffset
        val latestOffset = if (offsetResetConfig.equalsIgnoreCase("earliest")) {
          getLatestOffsets(consumer, topicPartition)
        } else {
          getLatestOffsets(consumer, parts)
        }
        latestOffset.foreach(x => {
          val value = currentOffset.get(x._1)
          if (value.isEmpty || value.get > x._2) {
            currentOffset(x._1) = x._2
          }
        })
        consumer.unsubscribe()
        consumer.close()
        currentOffset.toMap
      }
    }
    

    Spark Kafka RDD 创建:

    val offset = MyKafkaUtils.getCurrentOffset(kafkaParams.toMap,topics)
        val kafkaStreams: DStream[ConsumerRecord[String, ObjectNode]] = KafkaUtils.createDirectStream(streamingContext, LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe(topics, kafkaParams,offset))
    

    到些问题基本解决,但是如果是从checkpoint里面恢复时,依然会出现问题,这个就得使用commit了

    //自动提交新的offset
        kafkaStreams.foreachRDD { rdd =>
          val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
          kafkaStreams.asInstanceOf[CanCommitOffsets].commitAsync(offset)
        }
    

    相关文章

      网友评论

          本文标题:[SPARK-19680] OffsetOutOfRangeEx

          本文链接:https://www.haomeiwen.com/subject/mldztqtx.html