美文网首页
sparkstreaming读取kafka数据源流程解析(0.1

sparkstreaming读取kafka数据源流程解析(0.1

作者: 森林和海洋 | 来源:发表于2020-05-01 15:38 被阅读0次

    针对0.10及以上版本的kafka, spark推出了更简洁的模式进行数据读取, jar包名称为spark-streaming-kafka-0-10_2.12. 这种方式可以使得读取的rdd的分区和kafka的分区保持一致, 从而实现高效地读取. 本文对这种读取方式进行解析, 不关注过多源码细节, 主要关注官方是如何利用KafkaConsumer这个类来实现这种读取模式的, 本文的源码版本为2.4.0
    先说结论 :
    (1) 在启动的时候, spark在driver端生成一个consumer, 并获得所订阅主题及分区的当前offset值
    (2) 根据设置的参数去判断每个分区需要拉取的数据量, 即每个分区的untilOffset值, 此时driver端获得了每个分区需要被消费的数据量, spark将这些值保存在OffsetRange的类里面, 该类的定义如下

    final class OffsetRange private(
        val topic: String,
        val partition: Int,
        val fromOffset: Long,
        val untilOffset: Long) extends Serializable {
    

    (3) 在executor端创建新的consumer (和driver端的consumer属于不同的消费组), 然后将上一步获得的 OffsetRange分配到各个executor, 这些executor上的consumer根据指定的分区及起始offset进行消费.
    (4) 待executor消费成功后, driver端的consumer即seek每个分区的offset到最新的位置, 然后重复第二步的过程, 同时提交当前offset(可自动/手动提交)

    总的来说, spark分别在driver端和executor端创建consumer连接, driver端的consumer负责确定当前需要消费数据offset范围, 并分配到各个executor, 然后executor端的consumer根据分配到的offset范围进行消费, 最后在driver端进行offset的提交
    下面的详细流程:

    1. 创建DirectKafkaInputDStream
      官方的实例代码如下:
    val kafkaParams = Map[String, Object](
      "bootstrap.servers" -> "localhost:9092,anotherhost:9092",
      "key.deserializer" -> classOf[StringDeserializer],
      "value.deserializer" -> classOf[StringDeserializer],
      "group.id" -> "use_a_separate_group_id_for_each_stream",
      "auto.offset.reset" -> "latest",
      "enable.auto.commit" -> (false: java.lang.Boolean)
    )
    
    val topics = Array("topicA", "topicB")
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Subscribe[String, String](topics, kafkaParams)
    )
    

    这里创建的stream对象为DirectKafkaInputDStream, 这个类还是继承类InputDStream, 所以我们直接看它重写的startcompute方法即可
    start方法定义如下:

      override def start(): Unit = {
        // 这里的consume类型为KafkaConsumer, 通过上一步传入的Subscribe类型来设置不同的KafkaConsumer属性
        val c = consumer
        // 对offset进行矫正, 主要是为了考虑缓存中有数据, 但是还未poll的情况
        paranoidPoll(c)
        if (currentOffsets.isEmpty) {
          currentOffsets = c.assignment().asScala.map { tp =>
            tp -> c.position(tp)
          }.toMap
        }
      }
    

    start方法主要就是在driver端创建了一个KafkaConsumer, 并对当前的offset进行更新/矫正
    接下来是compute方法, 主要是生成KafkaRDD, 代码如下:

     override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
        val untilOffsets = clamp(latestOffsets())
        val offsetRanges = untilOffsets.map { case (tp, uo) =>
          val fo = currentOffsets(tp)
          OffsetRange(tp.topic, tp.partition, fo, uo)
        }
        val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled",
          true)
        val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,
          getPreferredHosts, useConsumerCache)
    
        // Report the record number and metadata of this batch interval to InputInfoTracker.
        val description = offsetRanges.filter { offsetRange =>
          // Don't display empty ranges.
          offsetRange.fromOffset != offsetRange.untilOffset
        }.map { offsetRange =>
          s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
            s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
        }.mkString("\n")
        // Copy offsetRanges to immutable.List to prevent from being modified by the user
        val metadata = Map(
          "offsets" -> offsetRanges.toList,
          StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
        val inputInfo = StreamInputInfo(id, rdd.count, metadata)
        ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
    
        currentOffsets = untilOffsets
        commitAll()
        Some(rdd)
      }
    

    KafkaRDD`的构造方法为

    val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,      getPreferredHosts, useConsumerCache)
    

    除了包含每个分区需要消费的数据范围offsetRanges参数外, 还包含一个executorKafkaParams, 此为executor端创建consumer时的参数,相关代码在

      val executorKafkaParams = {
        val ekp = new ju.HashMap[String, Object](consumerStrategy.executorKafkaParams)
        KafkaUtils.fixKafkaParams(ekp)
        ekp
      }
    

    点进去我们可以发现, executor端的kafkaParams主要针对driver端的做了4点修改, 如下:

      private[kafka010] def fixKafkaParams(kafkaParams: ju.HashMap[String, Object]): Unit = {
        logWarning(s"overriding ${ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG} to false for executor")
        kafkaParams.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false: java.lang.Boolean)
    
        logWarning(s"overriding ${ConsumerConfig.AUTO_OFFSET_RESET_CONFIG} to none for executor")
        kafkaParams.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "none")
    
        // driver and executor should be in different consumer groups
        val originalGroupId = kafkaParams.get(ConsumerConfig.GROUP_ID_CONFIG)
        if (null == originalGroupId) {
          logError(s"${ConsumerConfig.GROUP_ID_CONFIG} is null, you should probably set it")
        }
        val groupId = "spark-executor-" + originalGroupId
        logWarning(s"overriding executor ${ConsumerConfig.GROUP_ID_CONFIG} to ${groupId}")
        kafkaParams.put(ConsumerConfig.GROUP_ID_CONFIG, groupId)
    
        // possible workaround for KAFKA-3135
        val rbb = kafkaParams.get(ConsumerConfig.RECEIVE_BUFFER_CONFIG)
        if (null == rbb || rbb.asInstanceOf[java.lang.Integer] < 65536) {
          logWarning(s"overriding ${ConsumerConfig.RECEIVE_BUFFER_CONFIG} to 65536 see KAFKA-3135")
          kafkaParams.put(ConsumerConfig.RECEIVE_BUFFER_CONFIG, 65536: java.lang.Integer)
        }
      }
    

    在成功poll到数据后, 会将currentOffsets更新为untilOffsets, 然后在下一次compute之前的latestOffsets()方法中会执行
    c.seekToEnd(currentOffsets.keySet.asJava), 将driver端consumer的offset刷到最新.
    最后还会有一个commitAll()方法, 这个方法只在手动调用stream.commitAsync(offsetRanges)后才会起作用, 因为这个方法是从一个队列里取出要提交的offset值, 然后调用kafkaConsumer.commitAsync()进行提交, 而stream.commitAsync(offsetRanges)方法会将要提交的offset保存至队列.

    1. KafkaRDD的实现
      KafkaRDD是继承自RDD的类, 用于在上一步的compute方法中返回的RDD类型, 构造如下
    private[spark] class KafkaRDD[K, V](
        sc: SparkContext,
        val kafkaParams: ju.Map[String, Object],
        val offsetRanges: Array[OffsetRange],
        val preferredHosts: ju.Map[TopicPartition, String],
        useConsumerCache: Boolean
    ) extends RDD[ConsumerRecord[K, V]](sc, Nil) with Logging with HasOffsetRanges
    

    先看它重写的getPartition方法, 这是定义RDD分区的依据

      override def getPartitions: Array[Partition] = {
        offsetRanges.zipWithIndex.map { case (o, i) =>
            new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset)
        }.toArray
      }
    

    在这里我们可以看到, RDD的分区数是根据offsetRanges来的, 和订阅Kafka的分区数是保持一致.
    接下来是compute方法的重写, 这一步返回的是一个KafkaRDDIterator, 而在KafkaRDDIteratornext()方法如下

      override def next(): ConsumerRecord[K, V] = {
        if (!hasNext) {
          throw new ju.NoSuchElementException("Can't call getNext() once untilOffset has been reached")
        }
        val r = consumer.get(requestOffset, pollTimeout)
        requestOffset += 1
        r
      }
    

    我们可以看到, 在这一步就是调用consumer.get()方法来返回数据, 每次返回一个ConsumerRecord[K, V], 这里的consumer对象是KafkaDataConsumer, get方法就是从buffer缓冲区中返回数据, buffer数据来源在poll方法中, 定义如下:

      private def poll(timeout: Long): Unit = {
        val p = consumer.poll(timeout)
        val r = p.records(topicPartition)
        logDebug(s"Polled ${p.partitions()}  ${r.size}")
        buffer = r.listIterator
      }
    

    这里的consumer即是真正的KafkaConsumer了, 其定义在

      /** Create a KafkaConsumer to fetch records for `topicPartition` */
      private def createConsumer: KafkaConsumer[K, V] = {
        val c = new KafkaConsumer[K, V](kafkaParams)
        val topics = ju.Arrays.asList(topicPartition)
        c.assign(topics)
        c
      }
    

    在这里我们可以看到, 这里采用的其实就是assign指定分区的方式进行数据拉取.

    大致的流程就是这些了, 主要设计到了DirectKafkaInputDStream, KafkaRDD, KafkaRDDIterator这么几个类, 其中还涉及到了是否允许数据compacted、consumer缓存等概念, 因为这里主要关注KafkaConsumer消费的过程, 在这里不做进一步的讨论了, 如果不对的地方欢迎一起交流.

    相关文章

      网友评论

          本文标题:sparkstreaming读取kafka数据源流程解析(0.1

          本文链接:https://www.haomeiwen.com/subject/djrswhtx.html