美文网首页
Spark源码阅读-KafkaRDD

Spark源码阅读-KafkaRDD

作者: pcqlegend | 来源:发表于2018-05-03 17:18 被阅读0次

    官网翻译 :基于batch的用于消费kafka消息的接口

    class KafkaRDD[
      K: ClassTag,
      V: ClassTag,
      U <: Decoder[_]: ClassTag,
      T <: Decoder[_]: ClassTag,
      R: ClassTag] private[spark] (
        sc: SparkContext,
        kafkaParams: Map[String, String],
        val offsetRanges: Array[OffsetRange],
        leaders: Map[TopicAndPartition, (String, Int)],
        messageHandler: MessageAndMetadata[K, V] => R
      ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {
      override def getPartitions: Array[Partition] = {
        offsetRanges.zipWithIndex.map { case (o, i) =>
            val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))
            new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)
        }.toArray
      }
    

    入参 SparkContext,kafkaParams,offsetRanges,leaders,messageHandler
    主要看一下如何产生partition的,即compute方法

     override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {
        val part = thePart.asInstanceOf[KafkaRDDPartition]
        assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))
        if (part.fromOffset == part.untilOffset) {//如果起始的offset相同的话,则跳过
          log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +
            s"skipping ${part.topic} ${part.partition}")
          Iterator.empty
        } else {
          new KafkaRDDIterator(part, context)
        }
      }
    

    再看下KafkaRDDIterator ,首先看下getNext方法

    override def getNext(): R = {
          if (iter == null || !iter.hasNext) {//如果当前分区的iterator为空则获取下一个batch
            iter = fetchBatch
          }
          if (!iter.hasNext) {
            assert(requestOffset == part.untilOffset, errRanOutBeforeEnd(part))
            finished = true
            null.asInstanceOf[R]
          } else {
            val item = iter.next()
            if (item.offset >= part.untilOffset) {//如果获取的offset已经大于要消费的offset则返回异常
              assert(item.offset == part.untilOffset, errOvershotEnd(item.offset, part))
              finished = true
              null.asInstanceOf[R]
            } else {
              requestOffset = item.nextOffset
              messageHandler(new MessageAndMetadata(
                part.topic, part.partition, item.message, item.offset, keyDecoder, valueDecoder))
            }
          }
        }
    

    继续看fetchBatch

     private def fetchBatch: Iterator[MessageAndOffset] = {
          val req = new FetchRequestBuilder()
            .addFetch(part.topic, part.partition, requestOffset, kc.config.fetchMessageMaxBytes)
            .build()
          val resp = consumer.fetch(req)
          handleFetchErr(resp)
          // kafka may return a batch that starts before the requested offset
          resp.messageSet(part.topic, part.partition)
            .iterator
            .dropWhile(_.offset < requestOffset)
        }
    

    发送FetchRequestBuilder
    consumer是创建parttion的时候创建的

    private class KafkaRDDIterator(
          part: KafkaRDDPartition,
          context: TaskContext) extends NextIterator[R] {
    
        context.addTaskCompletionListener{ context => closeIfNeeded() }
    
        log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +
          s"offsets ${part.fromOffset} -> ${part.untilOffset}")
    
        val kc = new KafkaCluster(kafkaParams)
        val keyDecoder = classTag[U].runtimeClass.getConstructor(classOf[VerifiableProperties])
          .newInstance(kc.config.props)
          .asInstanceOf[Decoder[K]]
        val valueDecoder = classTag[T].runtimeClass.getConstructor(classOf[VerifiableProperties])
          .newInstance(kc.config.props)
          .asInstanceOf[Decoder[V]]
        val consumer = connectLeader
        var requestOffset = part.fromOffset
        var iter: Iterator[MessageAndOffset] = null
    
        // The idea is to use the provided preferred host, except on task retry attempts,
        // to minimize number of kafka metadata requests //
        private def connectLeader: SimpleConsumer = {
          if (context.attemptNumber > 0) {
            kc.connectLeader(part.topic, part.partition).fold(
              errs => throw new SparkException(
                s"Couldn't connect to leader for topic ${part.topic} ${part.partition}: " +
                  errs.mkString("\n")),
              consumer => consumer
            )
          } else {//不用获取leader直接访问对应host,能够减少对kafka metadata的请求
            kc.connect(part.host, part.port)
          }
        }
    

    相关文章

      网友评论

          本文标题:Spark源码阅读-KafkaRDD

          本文链接:https://www.haomeiwen.com/subject/ykklrftx.html