美文网首页
SparkStreaming和Kafka

SparkStreaming和Kafka

作者: 天之見證 | 来源:发表于2019-11-06 23:18 被阅读0次

    0. 一些问题

    1. SparkStreaming如何消费Kafka
    2. Kafka的offset如何维护
    3. 如何获取Kafka的offset
    4. Kafka获取offset的时候会不会消费kafka的记录
    5. Kafka的分区和spark task的关系
    6. Kafka的consumer和Spark executor/task的关系
    7. SparkStreaming如何背压
    8. SparkStreaming下一batch拿的数据的offset怎么确定

    1. SparkStreaming如何消费Kafka

    1.1 Driver端

    KafkaUtils.createDirectStream 上创建一个kafka流

    再往里走到 DirectKafkaInputDStream初始化一个真正的流, 入口从其start方法开始

    // DirectKafkaInputDStream.scala
    override def start(): Unit = {
      val c = consumer
      paranoidPoll(c)
      if (currentOffsets.isEmpty) {
        currentOffsets = c.assignment().asScala.map { tp =>
          tp -> c.position(tp)
        }.toMap
      }
    }
    

    在driver端启动一个consumer, 这个consumer 只是用来获取kafka的offset, 并不会消费数据

    // DirectKafkaInputDStream.scala
    @transient private var kc: Consumer[K, V] = null
    def consumer(): Consumer[K, V] = this.synchronized {
      if (null == kc) {
        kc = consumerStrategy.onStart(currentOffsets.mapValues(l => new java.lang.Long(l)).asJava)
      }
      kc
    }
    
    

    其中一种onStart的实现如下: (可以看到这里可以指定offsets)

    // ConsumerStrategy.scala
    private case class Subscribe[K, V](
        topics: ju.Collection[jl.String],
        kafkaParams: ju.Map[String, Object],
        offsets: ju.Map[TopicPartition, jl.Long]
      ) extends ConsumerStrategy[K, V] with Logging {
    
      def executorKafkaParams: ju.Map[String, Object] = kafkaParams
    
      def onStart(currentOffsets: ju.Map[TopicPartition, jl.Long]): Consumer[K, V] = {
        val consumer = new KafkaConsumer[K, V](kafkaParams)
        consumer.subscribe(topics)
        val toSeek = if (currentOffsets.isEmpty) {
          offsets
        } else {
          currentOffsets
        }
        if (!toSeek.isEmpty) {
          // work around KAFKA-3370 when reset is none
          // poll will throw if no position, i.e. auto offset reset none and no explicit position
          // but cant seek to a position before poll, because poll is what gets subscription partitions
          // So, poll, suppress the first exception, then seek
          val aor = kafkaParams.get(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG)
          val shouldSuppress =
            aor != null && aor.asInstanceOf[String].toUpperCase(Locale.ROOT) == "NONE"
          try {
            consumer.poll(0)
          } catch {
            case x: NoOffsetForPartitionException if shouldSuppress =>
              logWarning("Catching NoOffsetForPartitionException since " +
                ConsumerConfig.AUTO_OFFSET_RESET_CONFIG + " is none.  See KAFKA-3370")
          }
          toSeek.asScala.foreach { case (topicPartition, offset) =>
              consumer.seek(topicPartition, offset)
          }
          // we've called poll, we must pause or next poll may consume messages and set position
          consumer.pause(consumer.assignment())
        }
    
        consumer
      }
    }
    

    为了保证driver端的这个offset, 不消费数据手动处理offset

    // DirectKafkaInputDStream.scala
    /**
     * The concern here is that poll might consume messages despite being paused,
     * which would throw off consumer position.  Fix position if this happens.
     */
    private def paranoidPoll(c: Consumer[K, V]): Unit = {
      // don't actually want to consume any messages, so pause all partitions
      c.pause(c.assignment())
      val msgs = c.poll(0)
      if (!msgs.isEmpty) {
        // position should be minimum offset per topicpartition
        msgs.asScala.foldLeft(Map[TopicPartition, Long]()) { (acc, m) =>
          val tp = new TopicPartition(m.topic, m.partition)
          val off = acc.get(tp).map(o => Math.min(o, m.offset)).getOrElse(m.offset)
          acc + (tp -> off)
        }.foreach { case (tp, off) =>
            logInfo(s"poll(0) returned messages, seeking $tp to $off to compensate")
            c.seek(tp, off)
        }
      }
    }
    

    至此才开始初始化currentOffsets

    在生成RDD的时候, 从currentOffsets初始化offsetRanges, 将其当作一个批处理任务来做

    // DirectKafkaInputDStream.scala
    override def compute(validTime: Time): Option[KafkaRDD[K, V]] = {
      val untilOffsets = clamp(latestOffsets())
      val offsetRanges = untilOffsets.map { case (tp, uo) =>
        val fo = currentOffsets(tp)
        OffsetRange(tp.topic, tp.partition, fo, uo)
      }
      val useConsumerCache = context.conf.getBoolean("spark.streaming.kafka.consumer.cache.enabled",
        true)
      val rdd = new KafkaRDD[K, V](context.sparkContext, executorKafkaParams, offsetRanges.toArray,
        getPreferredHosts, useConsumerCache)
    
      // Report the record number and metadata of this batch interval to InputInfoTracker.
      val description = offsetRanges.filter { offsetRange =>
        // Don't display empty ranges.
        offsetRange.fromOffset != offsetRange.untilOffset
      }.map { offsetRange =>
        s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +
          s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"
      }.mkString("\n")
      // Copy offsetRanges to immutable.List to prevent from being modified by the user
      val metadata = Map(
        "offsets" -> offsetRanges.toList,
        StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)
      val inputInfo = StreamInputInfo(id, rdd.count, metadata)
      ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)
    
      currentOffsets = untilOffsets
      commitAll()
      Some(rdd)
    }
    

    1.2 Executor端

    // KafkaRDD.scala
    override def compute(thePart: Partition, context: TaskContext): Iterator[ConsumerRecord[K, V]] = {
      val part = thePart.asInstanceOf[KafkaRDDPartition]
      require(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
      if (part.fromOffset == part.untilOffset) {
        logInfo(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
          s"skipping ${part.topic} ${part.partition}")
        Iterator.empty
      } else {
        logInfo(s"Computing topic ${part.topic}, partition ${part.partition} " +
          s"offsets ${part.fromOffset} -> ${part.untilOffset}")
        if (compacted) {
          new CompactedKafkaRDDIterator[K, V](
            part,
            context,
            kafkaParams,
            useConsumerCache,
            pollTimeout,
            cacheInitialCapacity,
            cacheMaxCapacity,
            cacheLoadFactor
          )
        } else {
          new KafkaRDDIterator[K, V](
            part,
            context,
            kafkaParams,
            useConsumerCache,
            pollTimeout,
            cacheInitialCapacity,
            cacheMaxCapacity,
            cacheLoadFactor
          )
        }
      }
    }
    
    private class KafkaRDDIterator[K, V](
      part: KafkaRDDPartition,
      context: TaskContext,
      kafkaParams: ju.Map[String, Object],
      useConsumerCache: Boolean,
      pollTimeout: Long,
      cacheInitialCapacity: Int,
      cacheMaxCapacity: Int,
      cacheLoadFactor: Float
    ) extends Iterator[ConsumerRecord[K, V]] {
    
      context.addTaskCompletionListener[Unit](_ => closeIfNeeded())
    
      val consumer = {
        KafkaDataConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
        KafkaDataConsumer.acquire[K, V](part.topicPartition(), kafkaParams, context, useConsumerCache)
      }
    
      var requestOffset = part.fromOffset
    
      def closeIfNeeded(): Unit = {
        if (consumer != null) {
          consumer.release()
        }
      }
    
      override def hasNext(): Boolean = requestOffset < part.untilOffset
    
      override def next(): ConsumerRecord[K, V] = {
        if (!hasNext) {
          throw new ju.NoSuchElementException("Can't call getNext() once untilOffset has been reached")
        }
        val r = consumer.get(requestOffset, pollTimeout)
        requestOffset += 1
        r
      }
    }
    

    从上可以看出RDD返回一个Iterator, 而具体元素则是通过迭代器来每次+1 requestOffset 来让consumer拉取数据的

    2. 一些配置和优化

    2.1 每个batch复用kafka的consumer

    executor端consumer的初始化:

    val consumer = {
      KafkaDataConsumer.init(cacheInitialCapacity, cacheMaxCapacity, cacheLoadFactor)
      KafkaDataConsumer.acquire[K, V](part.topicPartition(), kafkaParams, context, useConsumerCache)
    }
    

    KafkaDataConsumer中会维护一个cache, 根据task状态的不同来判断是否从cache 中取consumer

    2.2 LocationStrategies

    有些类似本地性的东西

    策略 含义
    PreferConsistent kafka的partition均匀分布在executor上
    PreferBrokers 当kafka的broker和spark的executor在相同机器时, partition将分配到那个leader分区所在的机器
    PreferFixed 定制partition到机器的分配关系

    2.2.1 Executor和Host的对应关系怎么获取

    // KafkaRDD.scala
    private def executors(): Array[ExecutorCacheTaskLocation] = {
      val bm = sparkContext.env.blockManager
      bm.master.getPeers(bm.blockManagerId).toArray
        .map(x => ExecutorCacheTaskLocation(x.host, x.executorId))
        .sortWith(compareExecutors)
    }
    

    2.3 ConsumerStrategies

    策略 含义
    Subscribe 指定消费特定的topics
    SubscribePattern 通过制定正则来消费匹配的topics
    Assign 指定特定的partiton来消费

    3. 一些编程例子

    3.1 获取offset

    注意这里拿到其实是一个 OffsetRange

    stream.foreachRDD { rdd =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
      rdd.foreachPartition { iter =>
        val o: OffsetRange = offsetRanges(TaskContext.get.partitionId)
        println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
      }
    }
    

    3.2 offset的自己管理

    1. 使用Assign 来连接kafka
    2. foreachRDD中获取当前批次的OffsetRange
    3. foreachRDD结束的时候事务提交offset
    // The details depend on your data store, but the general idea looks like this
    
    // begin from the the offsets committed to the database
    val fromOffsets = selectOffsetsFromYourDatabase.map { resultSet =>
      new TopicPartition(resultSet.string("topic"), resultSet.int("partition")) -> resultSet.long("offset")
    }.toMap
    
    val stream = KafkaUtils.createDirectStream[String, String](
      streamingContext,
      PreferConsistent,
      Assign[String, String](fromOffsets.keys.toList, kafkaParams, fromOffsets)
    )
    
    stream.foreachRDD { rdd =>
      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
    
      val results = yourCalculation(rdd)
    
      // begin your transaction
    
      // update results
      // update offsets where the end of existing offsets matches the beginning of this batch of offsets
      // assert that offsets were updated correctly
      // end your transaction
    }
    

    4. 背压kafka的消费

    从上面可以看到driver端生成OffsetRange 的时候

    val untilOffsets = clamp(latestOffsets())
    val offsetRanges = untilOffsets.map { case (tp, uo) =>
      val fo = currentOffsets(tp)
      OffsetRange(tp.topic, tp.partition, fo, uo)
    }
    

    clamp用来根据offset来生成消费的边界

    protected def clamp(offsets: Map[TopicPartition, Long]): Map[TopicPartition, Long] = {
      maxMessagesPerPartition(offsets).map { mmp =>
        mmp.map { case (tp, messages) =>
            val uo = offsets(tp)
            tp -> Math.min(currentOffsets(tp) + messages, uo)
        }
      }.getOrElse(offsets)
    }
    

    maxMessagesPerPartition 这个方法里面就确定了要消费的消息数的上限

    具体分析相见 背压kafka的消费

    ref: https://spark.apache.org/docs/latest/streaming-kafka-0-10-integration.html

    相关文章

      网友评论

          本文标题:SparkStreaming和Kafka

          本文链接:https://www.haomeiwen.com/subject/kmapbctx.html