美文网首页sparkspark-in-actionspark
Spark Streaming源码解读之Receiver生成全生

Spark Streaming源码解读之Receiver生成全生

作者: 阳光男孩spark | 来源:发表于2016-05-24 12:03 被阅读304次

    一:Receiver启动的方式设想

    1.Spark Streaming通过Receiver持续不断的从外部数据源接收数据,并把数据汇报给Driver端,由此每个Batch Durations就可以根据汇报的数据生成不同的Job,在不同的机器之上启动,每个reveiver 相当于一个分片,由于Sapark core 感觉不到它的特殊性,按普通的调度,即有可能在同一个Executor之中启动多个Receiver,这种情况之下导致负载不均匀或者由于Executor运行本身的故障,task 有可能启动失败,整个job启动就失败,即receiver启动失败。

    启动Receiver

    1. 从Spark Core的角度来看,Receiver的启动Spark Core并不知道, Receiver是通过Job的方式启动的,运行在Executor之上的,由task运行。

    2. 一般情况下,只有一个Receiver,但是可以创建不同的数据来源的InputDStream.

    3.启动Receiver的时候,实其上一个receiver就是一个partition分片,由一个Job启动,这个Job里面有RDD的transformations操作和action的操作,随着定时器触发,不断的产生有数据接收,每个时间段中产生的接收数据实其上就是一个partition分片,

    4.  以上设计思想产生的如下问题:

    (1)如果有多个InputDStream,那就要启动多个Receiver,每个Receiver也就相当于分片partition,那我启动Receiver的时候理想的情况下是在不同的机器上启动Receiver,但是SparkCore的角度来看就是应用程序,感觉不到Receiver的特殊性,所以就会按照正常的Job启动的方式来处理,极有可能在一个Executor上启动多个Receiver.这样的话就可能导致负载不均衡。(2)有可能启动Receiver失败,只要集群存在,Receiver就不应该启动失败。

    (3)从运行过程中看,一个Reveiver就是一个partition的话,启动的由一个Task,如果Task启动失败,相应的Receiver也会失败。由此,可以得出,对于Receiver失败的话,后果是非常严重的,那么在SparkStreaming如何防止这些事的呢?Spark Streaming源码分析,在Spark Streaming之中就指定如下信息:

    一是Spark使用一个Job启动一个Receiver.最大程度的保证了负载均衡。

    二是Spark Streaming已经指定每个Receiver运行在那些Executor上,在Receiver运行之前就指定了运行的地方!

    三是 如果Receiver启动失败,此时并不是Job失败,在内部会重新启动Receiver.

    在StreamingContext的start方法被调用的时候,JobScheduler的start

    def start(): Unit = synchronized {

    state match {

    caseINITIALIZED =>

    startSite.set(DStream.getCreationSite())

    StreamingContext.ACTIVATION_LOCK.synchronized {

    StreamingContext.assertNoOtherContextIsActive()

    try {

    validate()

    // Startthe streaming scheduler in a new thread, so that

    thread local properties

    // likecall sites and job groups can be reset without

    affecting those of the

    //current thread.

    ThreadUtils.runInNewThread("streaming-start") {

    sparkContext.setCallSite(startSite.get)

    sparkContext.clearJobGroup()

    sparkContext.setLocalProperty(SparkContext.SPARK_JOB_INTERRUPT_ON_CANCEL,"false")

    //启动子线程,一方面为了本地初始化工作,另外一方面是不要阻塞主线程。

    scheduler.start()

    }

    state =StreamingContextState.ACTIVE

    } catch {

    caseNonFatal(e) =>

    logError("Error starting the context, marking it as

    stopped",e)

    scheduler.stop(false)

    state =StreamingContextState.STOPPED

    throw e

    }

    StreamingContext.setActiveContext(this)

    }

    shutdownHookRef = ShutdownHookManager.addShutdownHook(

    StreamingContext.SHUTDOWN_HOOK_PRIORITY)(stopOnShutdown)

    //Registering Streaming Metrics at the start of the

    StreamingContext

    assert(env.metricsSystem != null)

    env.metricsSystem.registerSource(streamingSource)

    uiTab.foreach(_.attach())

    logInfo("StreamingContext started")

    case ACTIVE=>

    logWarning("StreamingContext has already been started")

    case STOPPED=>

    throw newIllegalStateException("StreamingContext has already

    been stopped")

    }

    }

    2.而在JobScheduler的start方法中ReceiverTracker的start方法被调用,Receiver就启动了。

    def start(): Unit = synchronized {

    if (eventLoop !=null) return // scheduler has already been

    started

    logDebug("Starting JobScheduler")

    eventLoop = newEventLoop[JobSchedulerEvent]("JobScheduler")

    {

    overrideprotected def onReceive(event: JobSchedulerEvent):

    Unit = processEvent(event)

    overrideprotected def onError(e: Throwable): Unit =

    reportError("Error in jobscheduler", e)

    }

    eventLoop.start()

    // attach ratecontrollers of input streams to receive batch

    completion updates

    for {

    inputDStream<- ssc.graph.getInputStreams

    rateController<- inputDStream.rateController

    }ssc.addStreamingListener(rateController)

    listenerBus.start(ssc.sparkContext)

    receiverTracker =new ReceiverTracker(ssc)

    inputInfoTracker= new InputInfoTracker(ssc)

    //启动receiverTracker

    receiverTracker.start()

    jobGenerator.start()

    logInfo("Started JobScheduler")

    }

    3.ReceiverTracker的start方法启动RPC消息通信体,为啥呢?因为receiverTracker会监控整个集群中的Receiver,Receiver转过来要向ReceiverTrackerEndpoint汇报自己的状态,接收的数据,包括生命周期等信息

    def start(): Unit = synchronized {

    if(isTrackerStarted) {

    throw newSparkException("ReceiverTracker already started")

    }

    //Receiver的启动是依据输入数据流的。

    if(!receiverInputStreams.isEmpty) {

    endpoint =ssc.env.rpcEnv.setupEndpoint(

    "ReceiverTracker",

    newReceiverTrackerEndpoint(ssc.env.rpcEnv))

    if(!skipReceiverLaunch) launchReceivers()

    logInfo("ReceiverTracker started")

    trackerState =Started

    }

    }

    4.基于ReceiverInputDStream(是在Driver端)来获得具体的Receivers实例,然后再把他们分不到Worker节点上。一个ReceiverInputDStream只产生一个Receiver

    private def launchReceivers(): Unit = {

    val receivers =receiverInputStreams.map(nis => {

    //一个数据输入来源(receiverInputDStream)只产生一个Receiver

    val rcvr =nis.getReceiver()

    rcvr.setReceiverId(nis.id)

    rcvr

    })

    runDummySparkJob()

    logInfo("Starting " + receivers.length + "receivers")

    //此时的endpoint就是上面代码中在ReceiverTracker的start方法中构造的ReceiverTrackerEndpoint

    endpoint.send(StartAllReceivers(receivers))

    }

    5. 其中runDummySparkJob()为了确保所有节点活着,而且避免所有的receivers集中在一个节点上。

    private def runDummySparkJob(): Unit = {

    if(!ssc.sparkContext.isLocal) {

    ssc.sparkContext.makeRDD(1 to 50, 50).map(x => (x,

    1)).reduceByKey(_+ _, 20).collect()

    }

    assert(getExecutors.nonEmpty)

    }

    ReceiverInputDStream中的getReceiver()方法获得receiver对象然后将它发送到worker节点上实例化receiver,然后去接收数据。

    def getReceiver(): Receiver[T] //返回的是Receiver对象

    6.  根据继承关系,这里看一下SocketInputDStream中的getReceiver方法。

    def getReceiver(): Receiver[T] = {

    newSocketReceiver(host, port, bytesToObjects,

    storageLevel)

    }

    }

    启动后台线程,调用receive方法。

    private[streaming]

    class SocketReceiver[T: ClassTag](

    host: String,

    port: Int,

    bytesToObjects:InputStream => Iterator[T],

    storageLevel:StorageLevel

    ) extendsReceiver[T](storageLevel) with Logging {

    def onStart() {

    // Start thethread that receives data over a connection

    newThread("Socket Receiver") {

    setDaemon(true)

    override defrun() { receive() }

    }.start()

    }

    启动socket开始接收数据。

    /** Create a socket connection and receive data untilreceiver is

    stopped */

    def receive() {

    var socket:Socket = null

    try {

    logInfo("Connecting to " + host + ":" + port)

    socket = newSocket(host, port)

    logInfo("Connected to " + host + ":" + port)

    val iterator= bytesToObjects(socket.getInputStream())

    while(!isStopped && iterator.hasNext) {

    store(iterator.next)

    }

    if(!isStopped()) {

    restart("Socket data stream had no more data")

    } else {

    logInfo("Stopped receiving")

    }

    } catch {

    case e:java.net.ConnectException =>

    restart("Error connecting to " + host + ":" + port,e)

    caseNonFatal(e) =>

    logWarning("Error receiving data", e)

    restart("Error receiving data", e)

    } finally {

    if (socket !=null) {

    socket.close()

    logInfo("Closed socket to " + host + ":" + port)

    }

    }

    }

    }

    7. ReceiverTrackerEndpoint源码如下:

    /** RpcEndpoint to receive messages from the receivers.*/

    private class ReceiverTrackerEndpoint(override valrpcEnv: RpcEnv)

    extends ThreadSafeRpcEndpoint {

    // TODO Removethis thread pool after

    https://github.com/apache/spark/issues/7385 is merged

    private valsubmitJobThreadPool =

    ExecutionContext.fromExecutorService(

    ThreadUtils.newDaemonCachedThreadPool("submit-job-thread-pool"))

    private valwalBatchingThreadPool =

    ExecutionContext.fromExecutorService(

    ThreadUtils.newDaemonCachedThreadPool("wal-batching-thread-pool"))

    @volatile privatevar active: Boolean = true

    override defreceive: PartialFunction[Any, Unit] = {

    // Localmessages

    caseStartAllReceivers(receivers) =>

    valscheduledLocations =

    // schedulingPolicy调度策略

    //receivers就是要启动的receiver

    //getExecutors获得集群中的Executors的列表

    // scheduleReceivers就可以确定receiver可以运行在哪些Executor上

    schedulingPolicy.scheduleReceivers(receivers,getExecutors)

    for (receiver<- receivers) {

    //

    scheduledLocations根据receiver的Id就找到了当前那些Executors可以运行Receiver

    val executors= scheduledLocations(receiver.streamId)

    updateReceiverScheduledExecutors(receiver.streamId,

    executors)

    receiverPreferredLocations(receiver.streamId)

    =receiver.preferredLocation

    //上述代码之后要启动的Receiver确定了,具体Receiver运行在哪些Executors上也确定了。

    //循环receivers,每次将一个receiver传入过去。

    startReceiver(receiver, executors)

    }

    //用于接收RestartReceiver消息,从新启动Receiver.

    caseRestartReceiver(receiver) =>

    // Oldscheduled executors minus the ones that are not active

    any more

    //如果Receiver失败的话,从可选列表中减去。

    valoldScheduledExecutors =

    //刚在调度为Receiver分配给哪个Executor的时候会有一些列可选的Executor列表

    getStoredScheduledExecutors(receiver.streamId)

    //从新获取Executors

    valscheduledLocations = if (oldScheduledExecutors.nonEmpty)

    {

    // Tryglobal scheduling again

    oldScheduledExecutors

    } else {

    //如果可选的Executor使用完了,则会重新执行rescheduleReceiver重新获取Executor.

    valoldReceiverInfo =

    receiverTrackingInfos(receiver.streamId)

    // Clear"scheduledLocations" to indicate we are going to

    do local scheduling

    valnewReceiverInfo = oldReceiverInfo.copy(

    state =ReceiverState.INACTIVE, scheduledLocations =

    None)

    receiverTrackingInfos(receiver.streamId) =

    newReceiverInfo

    schedulingPolicy.rescheduleReceiver(

    receiver.streamId,

    receiver.preferredLocation,

    receiverTrackingInfos,

    getExecutors)

    }

    // Assumethere is one receiver restarting at one time, so we

    don't need to update

    //receiverTrackingInfos

    //重复调用startReceiver

    startReceiver(receiver, scheduledLocations)

    case c:CleanupOldBlocks =>

    receiverTrackingInfos.values.flatMap(_.endpoint).foreach(_.send(c))

    caseUpdateReceiverRateLimit(streamUID, newRate) =>

    for (info<- receiverTrackingInfos.get(streamUID); eP

    <- info.endpoint) {

    eP.send(UpdateRateLimit(newRate))

    }

    // Remotemessages

    caseReportError(streamId, message, error) =>

    reportError(streamId, message, error)

    }

    8.  从注释中可以看到,Spark Streaming指定receiver在那些Executors运行,而不是基于Spark

    Core中的Task来指定。

    Spark使用submitJob的方式启动Receiver,而在应用程序执行的时候会有很多Receiver,这个时候是启动一个Receiver呢,还是把所有的Receiver通过这一个Job启动?

    在ReceiverTracker的receive方法中startReceiver方法第一个参数就是receiver,从实现的可以看出for循环不 断取出receiver,然后调用startReceiver。由此就可以得出一个Job只启动一个Receiver.

    如果Receiver启动失败,此时并不会认为是作业失败,会重新发消息给ReceiverTrackerEndpoint重新启动Receiver,这样也就确保了Receivers一定会被启动,这样就不会像Task启动Receiver的话如果失败受重试次数的影响。

    private def startReceiver(

    receiver:Receiver[_],

    // scheduledLocations指定的是在具体的那台物理机器上执行。

    scheduledLocations: Seq[TaskLocation]): Unit = {

    //判断下Receiver的状态是否正常。

    defshouldStartReceiver: Boolean = {

    // It's okay tostart when trackerState is Initialized or

    Started

    !(isTrackerStopping || isTrackerStopped)

    }

    val receiverId =receiver.streamId

    //如果不需要启动Receiver则会调用onReceiverJobFinish()

    if(!shouldStartReceiver) {

    onReceiverJobFinish(receiverId)

    return

    }

    valcheckpointDirOption = Option(ssc.checkpointDir)

    valserializableHadoopConf =

    newSerializableConfiguration(ssc.sparkContext.hadoopConfiguration)

    //startReceiverFunc封装了在worker上启动receiver的动作。

    // Function tostart the receiver on the worker node

    valstartReceiverFunc: Iterator[Receiver[_]] => Unit =

    (iterator:Iterator[Receiver[_]]) => {

    if(!iterator.hasNext) {

    throw newSparkException(

    "Could not start receiver as object not found.")

    }

    if(TaskContext.get().attemptNumber() == 0) {

    valreceiver = iterator.next()

    assert(iterator.hasNext == false)

    // ReceiverSupervisorImpl是Receiver的监控器,同时负责数据的写等操作。

    valsupervisor = new ReceiverSupervisorImpl(

    receiver,SparkEnv.get, serializableHadoopConf.value,

    checkpointDirOption)

    supervisor.start()

    supervisor.awaitTermination()

    } else {

    //如果你想重新启动receiver的话,你需要重新完成上面的调度,从新schedule,而不是Task重试。

    // It'srestarted by TaskScheduler, but we want to

    reschedule it again. So exit it.

    }

    }

    // Create the RDDusing the scheduledLocations to run the

    receiver in a Spark job

    val receiverRDD:RDD[Receiver[_]] =

    if(scheduledLocations.isEmpty) {

    ssc.sc.makeRDD(Seq(receiver), 1)

    } else {

    valpreferredLocations =

    scheduledLocations.map(_.toString).distinct

    ssc.sc.makeRDD(Seq(receiver -> preferredLocations))

    }

    //receiverId可以看出,receiver只有一个

    receiverRDD.setName(s"Receiver $receiverId")

    ssc.sparkContext.setJobDescription(s"Streaming job running

    receiver$receiverId")

    ssc.sparkContext.setCallSite(Option(ssc.getStartSite()).getOrElse(Utils.getCallSite()))

    //每个Receiver的启动都会触发一个Job,而不是一个作业的Task去启动所有的Receiver.

    //应用程序一般会有很多Receiver,

    //调用SparkContext的submitJob,为了启动Receiver,启动了Spark一个作业.

    val future =ssc.sparkContext.submitJob[Receiver[_], Unit,

    Unit](

    receiverRDD,startReceiverFunc, Seq(0), (_, _) => Unit,

    ())

    // We will keeprestarting the receiver job until ReceiverTracker

    is stopped

    future.onComplete{

    case Success(_)=>

    // shouldStartReceiver默认是true

    if(!shouldStartReceiver) {

    onReceiverJobFinish(receiverId)

    } else {

    logInfo(s"Restarting Receiver $receiverId")

    self.send(RestartReceiver(receiver))

    }

    case Failure(e)=>

    if(!shouldStartReceiver) {

    onReceiverJobFinish(receiverId)

    } else {

    logError("Receiver has been stopped. Try to restart it.",

    e)

    logInfo(s"Restarting Receiver $receiverId")

    //RestartReceiver

    self.send(RestartReceiver(receiver))

    }

    //使用线程池的方式提交Job,这样的好处是可以并发的启动Receiver。

    }(submitJobThreadPool)

    logInfo(s"Receiver ${receiver.streamId} started")

    }

    9. 当Receiver启动失败的话,就会调用ReceiverTrackEndpoint重新启动一个Spark

    Job去启动Receiver.

    /**

    * This messagewill trigger ReceiverTrackerEndpoint to restart a

    Spark job for the receiver.

    */

    private[streaming] case class

    RestartReceiver(receiver:Receiver[_])

    extendsReceiverTrackerLocalMessage

    11. 当Receiver关闭的话,并不需要重新启动Spark Job.

    /**

    * Call when areceiver is terminated. It means we won't restart

    its Spark job.

    */

    private def onReceiverJobFinish(receiverId: Int): Unit ={

    receiverJobExitLatch.countDown()

    //使用foreach将receiver从receiverTrackingInfo中去掉。

    receiverTrackingInfos.remove(receiverId).foreach {

    receiverTrackingInfo=>

    if(receiverTrackingInfo.state == ReceiverState.ACTIVE) {

    logWarning(s"Receiver $receiverId exited but didn't

    deregister")

    }

    }

    }

    12.

    Supervisor.start(),在子类ReceiverSupervisorImpl中并没有start方法,因此调用的是父类ReceiverSupervisor的start方法。

    /** Start the supervisor */

    def start() {

    onStart() //具体实现是子类实现的。

    startReceiver()

    }

    Onstart方法源码如下:

    /**

    * Called whensupervisor is started.

    * Note that thismust be called before the receiver.onStart() is

    called to ensure

    * things like[[BlockGenerator]]s are started before the receiver

    starts sending data.

    */

    protected def onStart() { }

    其具体实现是在子类的ReceiverSupervivorImpl的onstart方法

    override protected def onStart() {

    registeredBlockGenerators.foreach { _.start() }

    }

    此时的start方法调用的是BlockGenerator的start方法。

    /** Start block generating and pushing threads. */

    def start(): Unit = synchronized {

    if (state ==Initialized) {

    state = Active

    blockIntervalTimer.start()

    blockPushingThread.start()

    logInfo("Started BlockGenerator")

    } else {

    throw newSparkException(

    s"Cannotstart BlockGenerator as its not in the Initialized

    state [state =$state]")

    }

    }

    备注:

    资料来源于:DT_大数据梦工厂(Spark发行版本定制)

    更多私密内容,请关注微信公众号:DT_Spark

    如果您对大数据Spark感兴趣,可以免费听由王家林老师每天晚上20:00开设的Spark永久免费公开课,地址YY房间号:68917580

    相关文章

      网友评论

        本文标题:Spark Streaming源码解读之Receiver生成全生

        本文链接:https://www.haomeiwen.com/subject/ikdzrttx.html