美文网首页
Spark Streaming Receiver启动过程分析

Spark Streaming Receiver启动过程分析

作者: 荒湖 | 来源:发表于2017-01-13 10:10 被阅读0次

    —————☼—————☼—————☼—————☼—————☼—————
    Spark Streaming概述
    Spark Streaming 初始化过程
    Spark Streaming Receiver启动过程分析
    Spark Streaming 数据准备阶段分析(Receiver方式)
    Spark Streaming 数据计算阶段分析
    SparkStreaming Backpressure分析
    Spark Streaming Executor DynamicAllocation 机制分析

    —————☼—————☼—————☼—————☼—————☼—————

    Receiver是数据准备阶段的一个主要组件,其负载接入外部数据,其生命周期由ReceiverTracker负责管理。

    Receiver的启动

    1. Receiver抽取与Executor准备

    “Spark Streaming 初始化过程”中提到 JobScheduler在启动时会创建和启动ReceiverTracker.
    在ReceiverTracker创建时,其会从DStreamGraph中抽取出ReceiverInputStream,以便在启动Receiver时从中抽取出Receiver,然后一一启动。

      private val receiverInputStreams = ssc.graph.getReceiverInputStreams()
    

    在ReceiverTracker启动时,其主要做如下两件事:

    • 创建ReceiverTrackerEndpoint,用于接收Receiver的信息
    • 启动Receiver.

    ReceiverTracker的Start方法如下所示:

      /** Start the endpoint and receiver execution thread. */
      def start(): Unit = synchronized {
        if (isTrackerStarted) {
          throw new SparkException("ReceiverTracker already started")
        }
    
        if (!receiverInputStreams.isEmpty) {
          endpoint = ssc.env.rpcEnv.setupEndpoint(
            "ReceiverTracker", new ReceiverTrackerEndpoint(ssc.env.rpcEnv))
          if (!skipReceiverLaunch) launchReceivers()
          logInfo("ReceiverTracker started")
          trackerState = Started
        }
      }
    

    其中 launchReceivers()方法用于启动Receiver, 其代码如下:

     /**
       * Get the receivers from the ReceiverInputDStreams, distributes them to the
       * worker nodes as a parallel collection, and runs them.
       */
      private def launchReceivers(): Unit = {
        val receivers = receiverInputStreams.map { nis =>
          val rcvr = nis.getReceiver()
          rcvr.setReceiverId(nis.id)
          rcvr
        }
    
        runDummySparkJob()
    
        logInfo("Starting " + receivers.length + " receivers")
        // 发送启动指令
        endpoint.send(StartAllReceivers(receivers)) 
      }
    

    此方法的主要操作有:

    • 从ReceiverInputStreams中抽取Receiver, 并将streamId做为Receiver的id.
    • 执行runDummySparkJob,此方法是执行一个简单的SparkJob,目的是为确保应用申请的Executor的最小份额得以满足,最小份额由参数“spark.cores.max” 和 “spark.scheduler.minRegisteredResourcesRatio” 共同决定,默认为申请的所有Executor。当应用已获得的Executor数量小于最小份额时,Job将阻塞并等待Executor注册,直到满足其运行需要的最小限额。
      runDummySparkJob的代码如下:
      /**
       * Run the dummy Spark job to ensure that all slaves have registered. This avoids all the
       * receivers to be scheduled on the same node.
       *
       * TODO Should poll the executor number and wait for executors according to
       * "spark.scheduler.minRegisteredResourcesRatio" and
       * "spark.scheduler.maxRegisteredResourcesWaitingTime" rather than running a dummy job.
       */
      private def runDummySparkJob(): Unit = {
        if (!ssc.sparkContext.isLocal) {
          ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x, 1)).reduceByKey(_ + _, 20).collect()
        }
        assert(getExecutors.nonEmpty)
      }
    
    

    程序逻辑非常简单,目的是使其在不消耗过多资源的情况下,可以保证在调度Recevier时,已有大量的Executor注册完成,从而使Recevier调度时尽量均匀的调度至不同的Executor 。

    • 向ReceiverTrackerEndpoint发送启动所有executor指令(StartAllReceivers)

    在ReceiverTrackerEndpoint收到StartAllReceivers指令后,其将

    • 调度Receiver: 为Receiver设置执行位置信息
    • 启动Receiver

    其实现逻辑如下:

    case StartAllReceivers(receivers) =>
            val scheduledLocations = schedulingPolicy.scheduleReceivers(receivers, getExecutors)
            for (receiver <- receivers) {
              val executors = scheduledLocations(receiver.streamId)
              updateReceiverScheduledExecutors(receiver.streamId, executors)
              receiverPreferredLocations(receiver.streamId) = receiver.preferredLocation
              startReceiver(receiver, executors)
            }
    

    2. Receiver 调度

    Receiver调度工作由ReceiverSchedulingPolicy进行,对Receiver的调度工作主要可以分为如下两个阶段:

    • 全局调度阶段
      此阶段发生在首次调度Receiver时,此阶段会保证receivers尽量均匀的分散在Executors中。调度过程中会为每一个Receiver指定启动的位置信息(location)
    • 局部调度阶段
      此阶段发生在Receiver重启时,仅需启动失败Receiver

    全局调度阶段是必然会发生的,因此将以这种情况为例对Receiver调度进行详细说明。其调度过程如下:

    • 获取所有executor的主要地址信息
    • 创建numReceiversOnExecutor用于记录每个Executor分配的Receiver数目
    • 创建scheduledLocations用于记录用户指定偏好位置的Receiver
    • 调度指定preferredLocation信息的Receiver. 遍历Receivers, 为用户指定的preferredLocation的主机中选择启动Receiver数 最少的Executor做为当前Receiver启动位置,并更新记录scheduledLocations 和numReceiversOnExecutor。
    • 调度未指定preferredLocation信息的Receiver.
      将Executor依照分配的Receiver数目从小到大排序,为Receiver分配一个Executor.
    • 若还有剩余Executor, 将这些Executor 加入到拥有最少候选对象的Receiver列表中。

    至此, Receiver与与Executor的关联联系建立完毕。
    调度的实现代码如下所示:

     /**
       * Try our best to schedule receivers with evenly distributed. However, if the
       * `preferredLocation`s of receivers are not even, we may not be able to schedule them evenly
       * because we have to respect them.
       *
       * Here is the approach to schedule executors:
       * <ol>
       *   <li>First, schedule all the receivers with preferred locations (hosts), evenly among the
       *       executors running on those host.</li>
       *   <li>Then, schedule all other receivers evenly among all the executors such that overall
       *       distribution over all the receivers is even.</li>
       * </ol>
       *
       * This method is called when we start to launch receivers at the first time.
       *
       * @return a map for receivers and their scheduled locations
       */
      def scheduleReceivers(
          receivers: Seq[Receiver[_]],
          executors: Seq[ExecutorCacheTaskLocation]): Map[Int, Seq[TaskLocation]] = {
        if (receivers.isEmpty) {
          return Map.empty
        }
    
        if (executors.isEmpty) {
          return receivers.map(_.streamId -> Seq.empty).toMap
        }
    
        val hostToExecutors = executors.groupBy(_.host)
        val scheduledLocations = Array.fill(receivers.length)(new mutable.ArrayBuffer[TaskLocation])
        val numReceiversOnExecutor = mutable.HashMap[ExecutorCacheTaskLocation, Int]()
        // Set the initial value to 0
        executors.foreach(e => numReceiversOnExecutor(e) = 0)
    
        // Firstly, we need to respect "preferredLocation". So if a receiver has "preferredLocation",
        // we need to make sure the "preferredLocation" is in the candidate scheduled executor list.
        for (i <- 0 until receivers.length) {
          // Note: preferredLocation is host but executors are host_executorId
          receivers(i).preferredLocation.foreach { host =>
            hostToExecutors.get(host) match {
              case Some(executorsOnHost) =>
                // preferredLocation is a known host. Select an executor that has the least receivers in
                // this host
                val leastScheduledExecutor =
                  executorsOnHost.minBy(executor => numReceiversOnExecutor(executor))
                scheduledLocations(i) += leastScheduledExecutor
                numReceiversOnExecutor(leastScheduledExecutor) =
                  numReceiversOnExecutor(leastScheduledExecutor) + 1
              case None =>
                // preferredLocation is an unknown host.
                // Note: There are two cases:
                // 1. This executor is not up. But it may be up later.
                // 2. This executor is dead, or it's not a host in the cluster.
                // Currently, simply add host to the scheduled executors.
    
                // Note: host could be `HDFSCacheTaskLocation`, so use `TaskLocation.apply` to handle
                // this case
                scheduledLocations(i) += TaskLocation(host)
            }
          }
        }
    
        // For those receivers that don't have preferredLocation, make sure we assign at least one
        // executor to them.
        for (scheduledLocationsForOneReceiver <- scheduledLocations.filter(_.isEmpty)) {
          // Select the executor that has the least receivers
          val (leastScheduledExecutor, numReceivers) = numReceiversOnExecutor.minBy(_._2)
          scheduledLocationsForOneReceiver += leastScheduledExecutor
          numReceiversOnExecutor(leastScheduledExecutor) = numReceivers + 1
        }
    
        // Assign idle executors to receivers that have less executors
        val idleExecutors = numReceiversOnExecutor.filter(_._2 == 0).map(_._1)
        for (executor <- idleExecutors) {
          // Assign an idle executor to the receiver that has least candidate executors.
          val leastScheduledExecutors = scheduledLocations.minBy(_.size)
          leastScheduledExecutors += executor
        }
    
        receivers.map(_.streamId).zip(scheduledLocations).toMap
      }
    

    此实现,存在一个问题,如果Receiver设置了preferredLocation且preferredLocation所对应的主机存在此应用的Executor的情况下,也不一定保证Receiver调度至此Executor.

    3. Receiver 启动

    在为Receiver设置完启动位置之后,将调用startReceiver方法启动Receiver, 启动过程如下:

    • 依据preferredLocation将Receiver包装成RDD
     // Create the RDD using the scheduledLocations to run the receiver in a Spark job
          val receiverRDD: RDD[Receiver[_]] =
            if (scheduledLocations.isEmpty) {
              ssc.sc.makeRDD(Seq(receiver), 1)
            } else {
              val preferredLocations = scheduledLocations.map(_.toString).distinct
              ssc.sc.makeRDD(Seq(receiver -> preferredLocations))
            }
          receiverRDD.setName(s"Receiver $receiverId")
    
    • 以SparkJob的形式提交作业, Receiver作为Task 以线程方式执行
     val future = ssc.sparkContext.submitJob[Receiver[_], Unit, Unit](
            receiverRDD, startReceiverFunc, Seq(0), (_, _) => Unit, ())
          // We will keep restarting the receiver job until ReceiverTracker is stopped
          future.onComplete {
            case Success(_) =>
              if (!shouldStartReceiver) {
                onReceiverJobFinish(receiverId)
              } else {
                logInfo(s"Restarting Receiver $receiverId")
                self.send(RestartReceiver(receiver))
              }
            case Failure(e) =>
              if (!shouldStartReceiver) {
                onReceiverJobFinish(receiverId)
              } else {
                logError("Receiver has been stopped. Try to restart it.", e)
                logInfo(s"Restarting Receiver $receiverId")
                self.send(RestartReceiver(receiver))
              }
          }(ThreadUtils.sameThread)
    
    • Task执行, 执行的startReceiverFunc方法,该方法会创建并启动ReceiverSupervisorImpl(Job及Task调度过程此处不再详细说明,同批处理)
        // Function to start the receiver on the worker node
          val startReceiverFunc: Iterator[Receiver[_]] => Unit =
            (iterator: Iterator[Receiver[_]]) => {
              if (!iterator.hasNext) {
                throw new SparkException(
                  "Could not start receiver as object not found.")
              }
              if (TaskContext.get().attemptNumber() == 0) {
                val receiver = iterator.next()
                assert(iterator.hasNext == false)
                val supervisor = new ReceiverSupervisorImpl(
                  receiver, SparkEnv.get, serializableHadoopConf.value, checkpointDirOption)
                supervisor.start()
                supervisor.awaitTermination()
              } else {
                // It's restarted by TaskScheduler, but we want to reschedule it again. So exit it.
              }
            }
    

    其中ReceiverSupervisorImpl 提供了处理Receiver接收数据的所有必要的方法。并且它还创建了BlockGenerator,用于对Receiver接收的数据流进行切片操作。
    其ReceiverSupervisorImpl的Start方法实现如下:

    /** Start the supervisor */
     def start() {
       onStart()
       startReceiver()
     }
    

    其中onStart() 会创建BlockGenerator并启动。
    startReceiver()方法,首先会向ReceiverTracker注册Receiver信息,并验证Receiver是否合法。若合法,则调用Receiver的onStart方法进行数据接收,其实现逻辑如下:

     /** Start receiver */
      def startReceiver(): Unit = synchronized {
        try {
          if (onReceiverStart()) {
            logInfo(s"Starting receiver $streamId")
            receiverState = Started
            receiver.onStart()
            logInfo(s"Called receiver $streamId onStart")
          } else {
            // The driver refused us
            stop("Registered unsuccessfully because Driver refused to start receiver " + streamId, None)
          }
        } catch {
          case NonFatal(t) =>
            stop("Error starting receiver " + streamId, Some(t))
        }
      }
    

    下面以WordCount中的SocketInputDStream中的SocketReceiver为例进行说明,其onStart方法实现如下:

    def onStart() {
    
        logInfo(s"Connecting to $host:$port")
        try {
          socket = new Socket(host, port)
        } catch {
          case e: ConnectException =>
            restart(s"Error connecting to $host:$port", e)
            return
        }
        logInfo(s"Connected to $host:$port")
    
        // Start the thread that receives data over a connection
        new Thread("Socket Receiver") {
          setDaemon(true)
          override def run() { receive() }
        }.start()
      }
    
     /** Create a socket connection and receive data until receiver is stopped */
      def receive() {
        try {
          val iterator = bytesToObjects(socket.getInputStream())
          while(!isStopped && iterator.hasNext) {
            store(iterator.next())
          }
          if (!isStopped()) {
            restart("Socket data stream had no more data")
          } else {
            logInfo("Stopped receiving")
          }
        } catch {
          case NonFatal(e) =>
            logWarning("Error receiving data", e)
            restart("Error receiving data", e)
        } finally {
          onStop()
        }
      }
    }
    

    通过上述实现可知,其将通过socket方式进行数据接收。
    Receiver启动流程至此结束,Receiver启动之后会接收源源不断的数据流并对数据分片,副本分发工作,为计算阶段做准备,接下来将进行数据准备环节的分析。

    相关文章

      网友评论

          本文标题:Spark Streaming Receiver启动过程分析

          本文链接:https://www.haomeiwen.com/subject/jbhzvttx.html