美文网首页
Spark学习笔记(2)通信分析

Spark学习笔记(2)通信分析

作者: 灯火gg | 来源:发表于2019-01-29 17:52 被阅读0次

    什么是RPC通信

    RPC是指远程过程调用,也就是说两台服务器A,B,一个应用部署在A服务器上,想要调用B服务器上应用提供的函数或者方法,由于不在同一个内存空间,不能直接调用,需要通过网络来表达调用的语义和传达调用的数据。

    SparkRPC通信

    Spark1.6+推出以RPCEnv,RPCEndopoint,RPCEndpointRef为核心的新型架构下的通信方式。具体实现方式又Akka和Netty两种方式。kka是基于Scala的Actor的分布式消息通信系统,Netty是由JBOSS提供的一个java开源框架。Netty提供异步的、事件驱动的网络应用程序框架和工具,用以快速开发高性能、高可靠性的网络服务器和客户端程序。

    RPC

    spark 启动消息通信基本过程

    首先启动Spark会执行sbin/start-all.sh

    # Start all spark daemons.
    # Starts the master on this node.
    # Starts a worker on each node specified in conf/slaves
    
    sbin="`dirname "$0"`"
    sbin="`cd "$sbin"; pwd`"
    
    TACHYON_STR=""
    
    while (( "$#" )); do
    case $1 in
        --with-tachyon)
          TACHYON_STR="--with-tachyon"
          ;;
      esac
    shift
    done
    
    # Load the Spark configuration
    . "$sbin/spark-config.sh"
    
    # Start Master
    "$sbin"/start-master.sh $TACHYON_STR
    
    # Start Workers
    "$sbin"/start-slaves.sh $TACHYON_STR
    
    

    通过查看start-master.sh,发现先启动master,再启动worker。

    "$sbin"/spark-daemon.sh start org.apache.spark.deploy.master.Master 1 \
      --ip $SPARK_MASTER_IP --port $SPARK_MASTER_PORT --webui-port $SPARK_MASTER_WEBUI_PORT \
      $ORIGINAL_ARGS
    

    最终执行spark-daemon.sh start org.apache.spark.deploy.master.Master

    接下来我们来看Master的main方法

    private[deploy] object Master extends Logging {
      val SYSTEM_NAME = "sparkMaster"
      val ENDPOINT_NAME = "Master"
    
      def main(argStrings: Array[String]) {
        Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
          exitOnUncaughtException = false))
        Utils.initDaemon(log)
        val conf = new SparkConf
        val args = new MasterArguments(argStrings, conf)
        val (rpcEnv, _, _) = 
    //将脚本中传入的RpcEnv和host,port启动Master
    startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, conf)
        rpcEnv.awaitTermination()
      }
    
    /**
       * Start the Master and return a three tuple of:
       *   (1) The Master RpcEnv
       *   (2) The web UI bound port
       *   (3) The REST server bound port, if any
       */
      def startRpcEnvAndEndpoint(
          host: String,
          port: Int,
          webUiPort: Int,
          conf: SparkConf): (RpcEnv, Int, Option[Int]) = {
        val securityMgr = new SecurityManager(conf)
        val rpcEnv = RpcEnv.create(SYSTEM_NAME, host, port, conf, securityMgr)
        val masterEndpoint = rpcEnv.setupEndpoint(ENDPOINT_NAME,
          new Master(rpcEnv, rpcEnv.address, webUiPort, securityMgr, conf))
        //master终端点send a message to the corresponding [[RpcEndpoint]],这个RpcEndpoint就是Master
        val portsResponse = masterEndpoint.askSync[BoundPortsResponse](BoundPortsRequest)
        (rpcEnv, portsResponse.webUIPort, portsResponse.restPort)
      }
    

    创建消息通信框架使用的RpcEnv,终端点MasterEndpoint
    之后Master回再receiveAndReply方法进行回复

    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
        case RequestSubmitDriver(description) =>
          if (state != RecoveryState.ALIVE) {
            val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
              "Can only accept driver submissions in ALIVE state."
            context.reply(SubmitDriverResponse(self, false, None, msg))
          } else {
            logInfo("Driver submitted " + description.command.mainClass)
            val driver = createDriver(description)
            persistenceEngine.addDriver(driver)
            waitingDrivers += driver
            drivers.add(driver)
            schedule()
    
            // TODO: It might be good to instead have the submission client poll the master to determine
            //       the current status of the driver. For now it's simply "fire and forget".
    
            context.reply(SubmitDriverResponse(self, true, Some(driver.id),
              s"Driver successfully submitted as ${driver.id}"))
          }
    
        case RequestKillDriver(driverId) =>
          if (state != RecoveryState.ALIVE) {
            val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
              s"Can only kill drivers in ALIVE state."
            context.reply(KillDriverResponse(self, driverId, success = false, msg))
          } else {
            logInfo("Asked to kill driver " + driverId)
            val driver = drivers.find(_.id == driverId)
            driver match {
              case Some(d) =>
                if (waitingDrivers.contains(d)) {
                  waitingDrivers -= d
                  self.send(DriverStateChanged(driverId, DriverState.KILLED, None))
                } else {
                  // We just notify the worker to kill the driver here. The final bookkeeping occurs
                  // on the return path when the worker submits a state change back to the master
                  // to notify it that the driver was successfully killed.
                  d.worker.foreach { w =>
                    w.endpoint.send(KillDriver(driverId))
                  }
                }
                // TODO: It would be nice for this to be a synchronous response
                val msg = s"Kill request for $driverId submitted"
                logInfo(msg)
                context.reply(KillDriverResponse(self, driverId, success = true, msg))
              case None =>
                val msg = s"Driver $driverId has already finished or does not exist"
                logWarning(msg)
                context.reply(KillDriverResponse(self, driverId, success = false, msg))
            }
          }
    
        case RequestDriverStatus(driverId) =>
          if (state != RecoveryState.ALIVE) {
            val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
              "Can only request driver status in ALIVE state."
            context.reply(
              DriverStatusResponse(found = false, None, None, None, Some(new Exception(msg))))
          } else {
            (drivers ++ completedDrivers).find(_.id == driverId) match {
              case Some(driver) =>
                context.reply(DriverStatusResponse(found = true, Some(driver.state),
                  driver.worker.map(_.id), driver.worker.map(_.hostPort), driver.exception))
              case None =>
                context.reply(DriverStatusResponse(found = false, None, None, None, None))
            }
          }
    
        case RequestMasterState =>
          context.reply(MasterStateResponse(
            address.host, address.port, restServerBoundPort,
            workers.toArray, apps.toArray, completedApps.toArray,
            drivers.toArray, completedDrivers.toArray, state))
    
        case BoundPortsRequest =>
          context.reply(BoundPortsResponse(address.port, webUi.boundPort, restServerBoundPort))
    
        case RequestExecutors(appId, requestedTotal) =>
          context.reply(handleRequestExecutors(appId, requestedTotal))
    
        case KillExecutors(appId, executorIds) =>
          val formattedExecutorIds = formatExecutorIds(executorIds)
          context.reply(handleKillExecutors(appId, formattedExecutorIds))
      }
    

    Worker启动与Master类似

     def main(argStrings: Array[String]) {
        Thread.setDefaultUncaughtExceptionHandler(new SparkUncaughtExceptionHandler(
          exitOnUncaughtException = false))
        Utils.initDaemon(log)
        val conf = new SparkConf
        val args = new WorkerArguments(argStrings, conf)
        val rpcEnv = startRpcEnvAndEndpoint(args.host, args.port, args.webUiPort, args.cores,
          args.memory, args.masters, args.workDir, conf = conf)
        // With external shuffle service enabled, if we request to launch multiple workers on one host,
        // we can only successfully launch the first worker and the rest fails, because with the port
        // bound, we may launch no more than one external shuffle service on each host.
        // When this happens, we should give explicit reason of failure instead of fail silently. For
        // more detail see SPARK-20989.
        val externalShuffleServiceEnabled = conf.getBoolean("spark.shuffle.service.enabled", false)
        val sparkWorkerInstances = scala.sys.env.getOrElse("SPARK_WORKER_INSTANCES", "1").toInt
        require(externalShuffleServiceEnabled == false || sparkWorkerInstances <= 1,
          "Starting multiple workers on one host is failed because we may launch no more than one " +
            "external shuffle service on each host, please set spark.shuffle.service.enabled to " +
            "false or set SPARK_WORKER_INSTANCES to 1 to resolve the conflict.")
        rpcEnv.awaitTermination()
      }
    
      def startRpcEnvAndEndpoint(
          host: String,
          port: Int,
          webUiPort: Int,
          cores: Int,
          memory: Int,
          masterUrls: Array[String],
          workDir: String,
          workerNumber: Option[Int] = None,
          conf: SparkConf = new SparkConf): RpcEnv = {
    
        // The LocalSparkCluster runs multiple local sparkWorkerX RPC Environments
        val systemName = SYSTEM_NAME + workerNumber.map(_.toString).getOrElse("")
        val securityMgr = new SecurityManager(conf)
        val rpcEnv = RpcEnv.create(systemName, host, port, conf, securityMgr)
        val masterAddresses = masterUrls.map(RpcAddress.fromSparkURL(_))
        rpcEnv.setupEndpoint(ENDPOINT_NAME, new Worker(rpcEnv, webUiPort, cores, memory,
          masterAddresses, ENDPOINT_NAME, workDir, conf, securityMgr))
        rpcEnv
      }
    

    1.worker节点向master节点发送注册消息。
    2.注册成功后,返回成功消息或者失败消息。
    3.worker定时向master发送心跳。
    如图:


    Spark启动消息通信

    a)、当master启动后,随之启动各worker,worker启动时会创建通信环境RpcEnv和终端点Endpoint,并向Master发送注册Worker的消息RegisterWorker。

    由于Worker可能需要注册多个Master(HA),在Worker类的tryRegisterAllMasters方法中创建注册线程池。把需要注册的请求,放入线程池中,然后通过启动线程池来注册。

    b)、注册过程:

    获取master终端引用,调用registerWithMaster

    //通过构造方法Worker调用onStart
    override def onStart() {
        assert(!registered)
        logInfo("Starting Spark worker %s:%d with %d cores, %s RAM".format(
          host, port, cores, Utils.megabytesToString(memory)))
        logInfo(s"Running Spark version ${org.apache.spark.SPARK_VERSION}")
        logInfo("Spark home: " + sparkHome)
        createWorkDir()
        shuffleService.startIfEnabled()
        webUi = new WorkerWebUI(this, workDir, webUiPort)
        webUi.bind()
    
        workerWebUiUrl = s"http://$publicAddress:${webUi.boundPort}"
        registerWithMaster()  //向Master进行注册Worker
    
        metricsSystem.registerSource(workerSource)
        metricsSystem.start()
        // Attach the worker metrics servlet handler to the web ui after the metrics system is started.
        metricsSystem.getServletHandlers.foreach(webUi.attachHandler)
      }
    
    //Spark使用ZK来实现Master的HA,首先会创建一个线程池 registerMasterThreadPool,注册一个阻塞的Action,这个线城池要实现向所有master进行注册。
    private def tryRegisterAllMasters(): Array[JFuture[_]] = {
          masterRpcAddresses.map { masterAddress =>
            registerMasterThreadPool.submit(new Runnable {
              override def run(): Unit = {
                try {
                  logInfo("Connecting to master " + masterAddress + "...")
                  val masterEndpoint = rpcEnv.setupEndpointRef(masterAddress, Master.ENDPOINT_NAME)
     //发送注册RegisterWorker             sendRegisterMessageToMaster(masterEndpoint)
                } catch {
                  case ie: InterruptedException => // Cancelled
                  case NonFatal(e) => logWarning(s"Failed to connect to master $masterAddress", e)
                }
              }
            })
          }
      }
    //Ask方法定义为:
    /**
       * Send a message to the corresponding [[RpcEndpoint.receiveAndReply)]] and return a [[Future]] to
       * receive the reply within a default timeout.
       *
       * This method only sends the message once and never retries.
       */
      def ask[T: ClassTag](message: Any): Future[T] = ask(message, defaultAskTimeout)
    private def registerWithMaster(masterEndpoint: RpcEndpointRef): Unit = {
        masterEndpoint.ask[RegisterWorkerResponse](RegisterWorker(
          workerId, host, port, self, cores, memory, workerWebUiUrl))
          .onComplete {
            // This is a very fast action so we can use "ThreadUtils.sameThread"
            case Success(msg) =>
              Utils.tryLogNonFatalError {
                handleRegisterResponse(msg)
              }
            case Failure(e) =>
              logError(s"Cannot register with master: ${masterEndpoint.address}", e)
              System.exit(1)
          }(ThreadUtils.sameThread)
      }
    

    c) Master收到消息后,需要对Worker发送的信息进行验证、记录。如果注册成功,则发送RegisteredWorker消息给对应的Worker,告诉Worker已经完成注册,随之进程步骤3,即Worker定期发送心跳信息给Master;如果注册失败,则会发送RegisterWorkerFailed消息,Worker打印出错误日志并结束worker启动。

    d) 在Master中,Master接收到Worker注册信息后,先判断Master当前状态是处于standby状态,如果是则忽略该消息,如果在注册列表中发现了该worker的编号,则发送注册失败的消息。判断完毕后,使用registerWorker方法把该Worker加入到列表中,用户集群进行处理任务时进行调度。Master.receiveAndReply方法中注册Worer代码:

    此处Master会处理workerid, workerhost, workerPort, workRef终端点引用,worker的cores, memory,worker的workerWebUiUrl地址
    里面的逻辑大概是首先判断Master是不是standBy,是的话返回MasterInStandby,在检查idToWorker是否注册过该Worker,不能重复注册。如果以上两种情况均未发生,则会去注册worker, 创建WorkerInfo封住该Worker的具体信息,然后调用registerWorker(worker)方法:
    
     case RegisterWorker(
          id, workerHost, workerPort, workerRef, cores, memory, workerWebUiUrl, masterAddress) =>
          logInfo("Registering worker %s:%d with %d cores, %s RAM".format(
            workerHost, workerPort, cores, Utils.megabytesToString(memory)))
          if (state == RecoveryState.STANDBY) {
            workerRef.send(MasterInStandby)
          } else if (idToWorker.contains(id)) {
            workerRef.send(RegisterWorkerFailed("Duplicate worker ID")) //已经存在
          } else {
            val worker = new WorkerInfo(id, workerHost, workerPort, cores, memory,
              workerRef, workerWebUiUrl)
            if (registerWorker(worker)) {
              persistenceEngine.addWorker(worker) //注册worker
              workerRef.send(RegisteredWorker(self, masterWebUiUrl, masterAddress))
              schedule()
            } else {
              val workerAddress = worker.endpoint.address
              logWarning("Worker registration failed. Attempted to re-register worker at same " +
                "address: " + workerAddress)
              workerRef.send(RegisterWorkerFailed("Attempted to re-register worker at same address: "
                + workerAddress))
            }
          }
    

    此处Master会处理workerid, workerhost, workerPort, workRef终端点引用,worker的cores, memory,worker的workerWebUiUrl地址
    里面的逻辑大概是首先判断Master是不是standBy,是的话返回MasterInStandby,在检查idToWorker是否注册过该Worker,不能重复注册。如果以上两种情况均未发生,则会去注册worker, 创建WorkerInfo封住该Worker的具体信息,然后调用registerWorker(worker)方法:

    private def registerWorker(worker: WorkerInfo): Boolean = {
        // There may be one or more refs to dead workers on this same node (w/ different ID's),
        // remove them.
        workers.filter { w =>
          (w.host == worker.host && w.port == worker.port) && (w.state == WorkerState.DEAD)
        }.foreach { w =>
          workers -= w
        }
    
        val workerAddress = worker.endpoint.address
        if (addressToWorker.contains(workerAddress)) {
          val oldWorker = addressToWorker(workerAddress)
          if (oldWorker.state == WorkerState.UNKNOWN) {
            // A worker registering from UNKNOWN implies that the worker was restarted during recovery.
            // The old worker must thus be dead, so we will remove it and accept the new worker.
            removeWorker(oldWorker)
          } else {
            logInfo("Attempted to re-register worker at same address: " + workerAddress)
            return false
          }
        }
    
        workers += worker
        idToWorker(worker.id) = worker
        addressToWorker(workerAddress) = worker
        true
      }
    
     .onComplete {
            // This is a very fast action so we can use "ThreadUtils.sameThread"
            case Success(msg) =>
              Utils.tryLogNonFatalError {
                handleRegisterResponse(msg)
              }
            case Failure(e) =>
              logError(s"Cannot register with master: ${masterEndpoint.address}", e)
              System.exit(1)
    

    e) 当worker接收到注册成功后,会定时发送heartbeat给Master,以便Master了解Worker的实时状态。间隔时间可以设置。

    Worker收到注册成功后会先设置registered = true表明注册成功,然后更新Master信息, 记录此Worker现在注册给哪个Master,之后就会启动定时任务发送心跳, 同时Worker还会向Master汇报Worker中Executor的最新状态如每个Executor的对应处理的appid, executor本身id,executer使用的cores, executor的状态以及Driver的信息.
    val execs = executors.values.map { e =>
              new ExecutorDescription(e.appId, e.execId, e.cores, e.state)
            }
    masterRef.send(WorkerLatestState(workerId, execs.toList, drivers.keys.toSeq))
    
    private val HEARTBEAT_MILLIS = conf.getLong("spark.worker.timeout", 60) * 1000 / 4
    

    当 Worker获取到注册成功消息后,先记录日志并更新Master信息,然后启动定时调度进程发送心跳信息,该调度进程时间间隔为上面所所定义的HEARTBEAT_MILLIS 值。
    Master在收到Worker注册请求,返回注册成功之后还会执行一步骤:
    schedule()

    /**
       * Schedule the currently available resources among waiting apps. This method will be called
       * every time a new app joins or resource availability changes.
       */
      private def schedule(): Unit = {
        if (state != RecoveryState.ALIVE) {
          return
        }
        // Drivers take strict precedence over executors
        val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
        val numWorkersAlive = shuffledAliveWorkers.size
        var curPos = 0
        for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
          // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
          // start from the last worker that was assigned a driver, and continue onwards until we have
          // explored all alive workers.
          var launched = false
          var numWorkersVisited = 0
          while (numWorkersVisited < numWorkersAlive && !launched) {
            val worker = shuffledAliveWorkers(curPos)
            numWorkersVisited += 1
            if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
              launchDriver(worker, driver)
              waitingDrivers -= driver
              launched = true
            }
            curPos = (curPos + 1) % numWorkersAlive
          }
        }
        startExecutorsOnWorkers()
      }
    

    意思是当新加入了Worker节点,获取所有可用的Alive的Worker, 查看是否有waiting的App没有分到资源的, 有的话遍历这个waitingDrivers(对应wainting状态的APP), 根据内存和核数是否满足if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) 来判断是否launch driver, 然后就调用startExecutorsOnWorkers()来启动Worker的Executors

    Spark运行时通信

    driver提交程序时,sc会向Master发送注册应用消息。Master会为之注册Executor,Executor发送注册成功消息,当RDD出发computing时,DAGScheduler进行划分Stage,并将Stage转化为Taskset,接着由TaskScheduler向注册的Executor发送执行消息,Executor接受到消息后启动运行。最后当所有任务运行时,由Driver处理结果并回收资源。如图:


    运行时通信.png

    1)执行程序main方法,启动SparkContext,在初始化时,先实例化StandaloneScheduleBackend对象继承CoarseGrainedSchedulerBackend会先实例化父类,在实例化该对象时候start过程继承DriverEndpoint和创建AppClient的ClientEndpoint实际上是两个终端点,然后再初始化TaskSchedulerImpl,调用backend.start

    CoarseGrainedSchedulerBackend的start方法:

    override def start() {
        val properties = new ArrayBuffer[(String, String)]
        for ((key, value) <- scheduler.sc.conf.getAll) {
          if (key.startsWith("spark.")) {
            properties += ((key, value))
          }
        }
    
        // TODO (prashant) send conf instead of properties
        driverEndpoint = createDriverEndpointRef(properties)
      }
    
      protected def createDriverEndpointRef(
          properties: ArrayBuffer[(String, String)]): RpcEndpointRef = {
        rpcEnv.setupEndpoint(ENDPOINT_NAME, createDriverEndpoint(properties))
      }
    
      protected def createDriverEndpoint(properties: Seq[(String, String)]): DriverEndpoint = {
        new DriverEndpoint(rpcEnv, properties)
      }
    

    创建了StandaloneAppClient
    (2)在StandaloneAppClient通过tryResisterAllMasters来实现Application向Master的注册
    (3)当Master收到注册请求之后进行处理, 注册完毕之后会发送注册成功消息给StandaloneApplClient, 然后调用startExecutorsOnWorkers方法运行应用。
    (4)Executor注册过程
    a)调用startExecutorsOnWorkers会分配资源来运行应用程序, 调用allcateWorkerResourceToExecutors实现在Worker中启动Executor,allcateWorkerResourceToExecutors里面有个lanchExecutor方法,这里面会调用send(LaunchTask)给Worker, Worker收到后会实例化ExecutorRunner对象,在ExecutorRunner创建进程生成器ProcessBuilder,然后此生成器根据ApplicationInfo中的command创建CoarseGrainedExecutorBackend对象,也就是Executor运行的容器, 最后Worker向Master发送ExecutorStateChanged通知Executor容器创建完毕,
    b)进程生成器创建CoarseGrainedExecutorBackend对象时,调用了start方法,其半生对象会注册Executor终端点,会触发onStart方法,会发送注册Executor消息RegisterExecutor到Driverpoint,如果注册成功Driverpoint会返回RegisteredExecutor消息给ExecutorEndppoint。当ExecutorEndppoint实际上是CoarseGrainedExecutorBackend收到注册成功, 则会创建Executor对象。
    c)DriverEndpoint会创建一个守护线程,监听是否有taskSets过来

    private val reviveThread =
          ThreadUtils.newDaemonSingleThreadScheduledExecutor("driver-revive-thread")
    
        override def onStart() {
          // Periodically revive offers to allow delay scheduling to work
          val reviveIntervalMs = conf.getTimeAsMs("spark.scheduler.revive.interval", "1s")
    
          reviveThread.scheduleAtFixedRate(new Runnable {
            override def run(): Unit = Utils.tryLogNonFatalError {
              Option(self).foreach(_.send(ReviveOffers))
            }
          }, 0, reviveIntervalMs, TimeUnit.MILLISECONDS)
        }
    

    调用makeoffers

    // Make fake resource offers on all executors
        private def makeOffers() {
          // Filter out executors under killing
          val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
          val workOffers = activeExecutors.map { case (id, executorData) =>
            new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
          }.toSeq
          launchTasks(scheduler.resourceOffers(workOffers))
        }
    
    executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
    
    def launchTask(
          context: ExecutorBackend,
          taskId: Long,
          attemptNumber: Int,
          taskName: String,
          serializedTask: ByteBuffer): Unit = {
        val tr = new TaskRunner(context, taskId = taskId, attemptNumber = attemptNumber, taskName,
          serializedTask)
        runningTasks.put(taskId, tr)
        threadPool.execute(tr)
      }
    

    TaskRunner的run方法体内就会执行Task, 当执行完毕时会向Driver汇报此Task在Executor上执行完毕了。

    execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)
    
    ------------------------------------------------------------------------------------
    override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
        val msg = StatusUpdate(executorId, taskId, state, data)
        driver match {
          case Some(driverRef) => driverRef.send(msg)
          case None => logWarning(s"Drop $msg because has not yet connected to driver")
        }
      }
    

    相关文章

      网友评论

          本文标题:Spark学习笔记(2)通信分析

          本文链接:https://www.haomeiwen.com/subject/tlwxsqtx.html