美文网首页Spark源码解析spark大数据
Spark Streaming源码解读之No Receivers

Spark Streaming源码解读之No Receivers

作者: 阳光男孩spark | 来源:发表于2016-05-31 16:04 被阅读268次

          前面的课程中主要是使用ReceiverInputDStream,是针对Receiver方式开展的剖析。

           企业级Spark Streaming应用程序开发中在越来越多的采用No Receivers的方式。NoReceiver方式有自己的优势,比如更大的控制的自由度、语义一致性等等。所以对NoReceivers方式和Receiver方式都需要进一步研究、思考。

          其实NoReceivers方式更符合操作、处理数据的思路的。作为计算框架的Spark,底层会有数据来源,不使用Receiver,直接操作数据源,是更自然的方式。操作数据来源的封装器一定是RDD类型的。

    Streaming中为了封装推出了KafkaRDD,只不过针对不同来源的数据,定制了相应的RDD。

    KafkaRDD:

    /**

    * A batch-oriented interface for consuming from Kafka.

    * Starting and ending offsets are specified in advance,

    * so that you can control exactly-once semantics.

    * @param kafkaParams Kafka http://kafka.apache.org/documentation.html#configuration">

    * configuration parameters. Requires "metadata.broker.list" or "bootstrap.servers" to be set

    * with Kafka broker(s) specified in host1:port1,host2:port2 form.

    * @param offsetRanges offset ranges that define the Kafka data belonging to this RDD

    * @param messageHandler function for translating each message into the desired type

    */

    private[kafka]

    class KafkaRDD[

    K: ClassTag,

    V: ClassTag,

    U <: Decoder[_]: ClassTag,

    T <: Decoder[_]: ClassTag,

    R: ClassTag] private[spark] (

    sc: SparkContext,

    kafkaParams: Map[String, String],

    val offsetRanges: Array[OffsetRange],

    leaders: Map[TopicAndPartition, (String, Int)],

    messageHandler: MessageAndMetadata[K, V] => R

    ) extends RDD[R](sc, Nil) with Logging with HasOffsetRanges {

    ...

    注释中说明这是基于batch的kafka消费接口,特别强调了语义一致性。

    OffsetRange:

    /**

    * Represents any object that has a collection of [[OffsetRange]]s. This can be used to access the

    * offset ranges in RDDs generated by the direct Kafka DStream (see

    * [[KafkaUtils.createDirectStream()]]).

    * {{{

    *   KafkaUtils.createDirectStream(...).foreachRDD { rdd =>

    *      val offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges

    *      ...

    *   }

    * }}}

    */

    trait HasOffsetRanges {

    def offsetRanges: Array[OffsetRange]

    }

    /**

    * Represents a range of offsets from a single Kafka TopicAndPartition. Instances of this class

    * can be created with `OffsetRange.create()`.

    * @param topic Kafka topic name

    * @param partition Kafka partition id

    * @param fromOffset Inclusive starting offset

    * @param untilOffset Exclusive ending offset

    */

    final class OffsetRange private(

    val topic: String,

    val partition: Int,

    // 起始偏移量(包括)

    val fromOffset: Long,

    // 终止偏移量(不包括)

    val untilOffset: Long) extends Serializable {

    import OffsetRange.OffsetRangeTuple

    ...

    注释中指出OffsetRange

    代表了一个Kafka的Topic和Partition的偏移量范围。实例可以被OffsetRange.create()所创建。

    这里说的偏移量的单位是消息数量。

    OffsetRang伴生对象:

    /**

    * Companion object the provides methods to create instances of [[OffsetRange]].

    */

    object OffsetRange {

    def create(topic: String, partition: Int, fromOffset: Long, untilOffset: Long): OffsetRange =

    new OffsetRange(topic, partition, fromOffset, untilOffset)

    ...

    KafkaRDD.getPartitions:

    override def getPartitions: Array[Partition] = {

    offsetRanges.zipWithIndex.map { case (o, i) =>

    val (host, port) = leaders(TopicAndPartition(o.topic, o.partition))

    new KafkaRDDPartition(i, o.topic, o.partition, o.fromOffset, o.untilOffset, host, port)

    }.toArray

    }

    KafkaRDDPartition:

    /** @param topic kafka topic name

    * @param partition kafka partition id

    * @param fromOffset inclusive starting offset

    * @param untilOffset exclusive ending offset

    * @param host preferred kafka host, i.e. the leader at the time the rdd was created

    * @param port preferred kafka host's port

    */

    private[kafka]

    class KafkaRDDPartition(

    val index: Int,

    val topic: String,

    val partition: Int,

    val fromOffset: Long,

    val untilOffset: Long,

    val host: String,

    val port: Int

    ) extends Partition {

    /** Number of messages this partition refers to */

    def count(): Long = untilOffset - fromOffset

    }

    比较简单。方法只定义了消费数的统计。

    KafkaRDD.compute:

    override def compute(thePart: Partition, context: TaskContext): Iterator[R] = {

    val part = thePart.asInstanceOf[KafkaRDDPartition]

    assert(part.fromOffset <= part.untilOffset, errBeginAfterEnd(part))

    if (part.fromOffset == part.untilOffset) {

    log.info(s"Beginning offset ${part.fromOffset} is the same as ending offset " +

    s"skipping ${part.topic} ${part.partition}")

    Iterator.empty

    } else {

    new KafkaRDDIterator(part, context)

    }

    }

    KafkaRDDIterator:

    private class KafkaRDDIterator(

    part: KafkaRDDPartition,

    context: TaskContext) extends NextIterator[R] {

    context.addTaskCompletionListener{ context => closeIfNeeded() }

    log.info(s"Computing topic ${part.topic}, partition ${part.partition} " +

    s"offsets ${part.fromOffset} -> ${part.untilOffset}")

    val kc = newKafkaCluster(kafkaParams)

    KafkaCluster:

    class KafkaCluster(val kafkaParams: Map[String, String]) extends Serializable {

    注意使用的Kafka的版本要求。

    KafkaCluster.connect:

    def connect(host: String, port: Int): SimpleConsumer =

    new SimpleConsumer(host, port, config.socketTimeoutMs,

    config.socketReceiveBufferBytes, config.clientId)

    SimpleConsumer:

    class SimpleConsumer(val host : scala.Predef.String, val port : scala.Int, val soTimeout : scala.Int, val bufferSize : scala.Int, val clientId : scala.Predef.String) extends scala.AnyRef with kafka.utils.Logging {

    def close() : scala.Unit = { /* compiled code */ }

    def send(request : kafka.api.TopicMetadataRequest) : kafka.api.TopicMetadataResponse = { /* compiled code */ }

    def send(request : kafka.api.ConsumerMetadataRequest) : kafka.api.ConsumerMetadataResponse = { /* compiled code */ }

    def fetch(request : kafka.api.FetchRequest) : kafka.api.FetchResponse = { /* compiled code */ }

    def getOffsetsBefore(request : kafka.api.OffsetRequest) : kafka.api.OffsetResponse = { /* compiled code */ }

    def commitOffsets(request : kafka.api.OffsetCommitRequest) : kafka.api.OffsetCommitResponse = { /* compiled code */ }

    def fetchOffsets(request : kafka.api.OffsetFetchRequest) : kafka.api.OffsetFetchResponse = { /* compiled code */ }

    def earliestOrLatestOffset(topicAndPartition : kafka.common.TopicAndPartition, earliestOrLatest : scala.Long, consumerId : scala.Int) : scala.Long = { /* compiled code */ }

    }

    KafkaUtils.createDirectStream:

    def createDirectStream[

    K: ClassTag,

    V: ClassTag,

    KD <: Decoder[K]: ClassTag,

    VD <: Decoder[V]: ClassTag] (

    ssc: StreamingContext,

    kafkaParams: Map[String, String],

    topics: Set[String]

    ): InputDStream[(K, V)] = {

    val messageHandler = (mmd: MessageAndMetadata[K, V]) => (mmd.key, mmd.message)

    val kc = new KafkaCluster(kafkaParams)

    val fromOffsets =getFromOffsets(kc, kafkaParams, topics)

    new DirectKafkaInputDStream[K, V, KD, VD, (K, V)](

    ssc, kafkaParams, fromOffsets, messageHandler)

    }

    KafkaUtils.getFromOffsets:

    private[kafka] def getFromOffsets(

    kc: KafkaCluster,

    kafkaParams: Map[String, String],

    topics: Set[String]

    ): Map[TopicAndPartition, Long] = {

    val reset = kafkaParams.get("auto.offset.reset").map(_.toLowerCase)

    val result = for {

    topicPartitions <- kc.getPartitions(topics).right

    leaderOffsets <- (if (reset == Some("smallest")) {

    kc.getEarliestLeaderOffsets(topicPartitions)

    } else {

    kc.getLatestLeaderOffsets(topicPartitions)

    }).right

    } yield {

    leaderOffsets.map { case (tp, lo) =>

    (tp, lo.offset)

    }

    }

    KafkaCluster.checkErrors(result)

    }

    获得偏移量。

    KafkaUtils重载了多个createDirectStream。

    KafkaUtils.createDirectStream:

    def createDirectStream[

    K: ClassTag,

    V: ClassTag,

    KD <: Decoder[K]: ClassTag,

    VD <: Decoder[V]: ClassTag,

    R: ClassTag] (

    ssc: StreamingContext,

    kafkaParams: Map[String, String],

    fromOffsets: Map[TopicAndPartition, Long],

    messageHandler: MessageAndMetadata[K, V] => R

    ):InputDStream[R] = {

    val cleanedHandler = ssc.sc.clean(messageHandler)

    newDirectKafkaInputDStream[K, V, KD, VD, R](

    ssc, kafkaParams, fromOffsets, cleanedHandler)

    }

    本身是一个InputDStream。实现时是生成DirectKafkaInputDStream对象。

    classDirectKafkaInputDStream[

    K: ClassTag,

    V: ClassTag,

    U <: Decoder[K]: ClassTag,

    T <: Decoder[V]: ClassTag,

    R: ClassTag](

    ssc_ : StreamingContext,

    val kafkaParams: Map[String, String],

    val fromOffsets: Map[TopicAndPartition, Long],

    messageHandler: MessageAndMetadata[K, V] => R

    ) extends InputDStream[R](ssc_) with Logging {

    // kafka缺省最大重试次数为一次,为确保语义一致性。

    val maxRetries = context.sparkContext.getConf.getInt(

    "spark.streaming.kafka.maxRetries", 1)

    ...

    每个Kafka的topic/partition对应一个RDD

    partition。

    DirectKafkaInputDStream.compute:

    override defcompute(validTime: Time): Option[KafkaRDD[K, V, U, T, R]] = {

    val untilOffsets = clamp(latestLeaderOffsets(maxRetries))

    val rdd =KafkaRDD[K, V, U, T, R](

    context.sparkContext, kafkaParams, currentOffsets, untilOffsets, messageHandler)

    // Report the record number and metadata of this batch interval to InputInfoTracker.

    val offsetRanges = currentOffsets.map { case (tp, fo) =>

    val uo = untilOffsets(tp)

    OffsetRange(tp.topic, tp.partition, fo, uo.offset)

    }

    val description = offsetRanges.filter { offsetRange =>

    // Don't display empty ranges.

    offsetRange.fromOffset != offsetRange.untilOffset

    }.map { offsetRange =>

    s"topic: ${offsetRange.topic}\tpartition: ${offsetRange.partition}\t" +

    s"offsets: ${offsetRange.fromOffset} to ${offsetRange.untilOffset}"

    }.mkString("\n")

    // Copy offsetRanges to immutable.List to prevent from being modified by the user

    val metadata = Map(

    "offsets" -> offsetRanges.toList,

    StreamInputInfo.METADATA_KEY_DESCRIPTION -> description)

    val inputInfo = StreamInputInfo(id, rdd.count, metadata)

    ssc.scheduler.inputInfoTracker.reportInfo(validTime, inputInfo)

    currentOffsets = untilOffsets.map(kv => kv._1 -> kv._2.offset)

    Some(rdd)

    }

    KafkaRDD的实例和DirectKafkaInputDStream,关系是一一对应的。每次compute就是产生一个KafkaRDD。KafkaRDD本身包含多个Partition,其实就是对应了多个Kafka的Partition。一个Partition只能属于一个Topic。

    KafkaRDD.kafkaRDDIterator:

    private class KafkaRDDIterator(

    part: KafkaRDDPartition,

    context: TaskContext) extends NextIterator[R] {

         Direct方式的好处:

         没缓存,就没内存溢出。

         Receiver方式会和Worker的Executor绑定,不方便做分布式(当然已有技巧做到分布式了)。RDD的Direct方式可以容易地做到分布式。

          Receiver方式在数据来不及及时处理而持续延时下去的话,Spark Streaming就有可能崩溃。Direct方式则不会出现这种情况,因为延迟了,就不会做后面的处理。

          完全的语义一致性,确保数据一定会消费,而且不会重复消费。

          Direct方式比Receiver方式性能高。

          根据自己的InputDStream进行配置,可以设置很多DStream。

          backpressure参数很先进。可以试探流进来的速度和当前的处理能力是否一致。如果不一致可以动态调整资源。

    备注:

    资料来源于:DT_大数据梦工厂(Spark发行版本定制)

    更多私密内容,请关注微信公众号:DT_Spark

    如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

    相关文章

      网友评论

        本文标题:Spark Streaming源码解读之No Receivers

        本文链接:https://www.haomeiwen.com/subject/sencdttx.html