美文网首页Spark源码解读
Spark中Client源码分析(二)

Spark中Client源码分析(二)

作者: lehi | 来源:发表于2016-03-23 00:37 被阅读114次

    继续前一篇的内容。前一篇内容为:

    Spark中Client源码分析(一)http://www.jianshu.com/p/339fde3aff5d

    DriverClient中的代码比较简单,它只有一个main函数,同时,和AppClient一样,它也有一个ClientEndpoint,只是两者的用途不一样。

    1.Client

    Client中唯一的main方法如下:
    <code>
    def main(args: Array[String]) {
    if (!sys.props.contains("SPARK_SUBMIT")) {
    println("WARNING: This client is deprecated and will be removed in a future version of Spark")
    println("Use ./bin/spark-submit with "--master spark://host:port"")
    }
    val conf = new SparkConf()
    val driverArgs = new ClientArguments(args)
    if (!driverArgs.logLevel.isGreaterOrEqual(Level.WARN)) {
    conf.set("spark.akka.logLifecycleEvents", "true")
    }
    conf.set("spark.rpc.askTimeout", "10")
    conf.set("akka.loglevel", driverArgs.logLevel.toString.replace("WARN", "WARNING"))
    Logger.getRootLogger.setLevel(driverArgs.logLevel)
    //创建一个driverClient的Rpc环境,并将得到Master和client的远程引用
    val rpcEnv =
    RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
    val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
    map(rpcEnv.setupEndpointRef(Master.SYSTEM_NAME, _, Master.ENDPOINT_NAME))
    //clientpoint
    rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
    //启动rpc环境
    rpcEnv.awaitTermination()
    }
    </code>

    2.ClientEndpoint

    ClientEndPoint可以看作给Driver传递消息的代理
    属性简单,直接略过。
    (1)构造函数为ClientEndPoint主构造函数
    (2)onstart方法如下,
    <code>
    override def onStart(): Unit = {
    driverArgs.cmd match {
    case "launch" =>
    //driver包装类,使得Worker和Driver的Rpc环境一样,做到共进退
    val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
    //driver类路径
    val classPathConf = "spark.driver.extraClassPath"
    val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
    cp.split(java.io.File.pathSeparator)
    }
    //driver库路径
    val libraryPathConf = "spark.driver.extraLibraryPath"
    val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
    cp.split(java.io.File.pathSeparator)
    }
    //driver Jvm参数
    val extraJavaOptsConf = "spark.driver.extraJavaOptions"
    val extraJavaOpts = sys.props.get(extraJavaOptsConf)
    .map(Utils.splitCommandString).getOrElse(Seq.empty)
    //将所有的在SparkConf中设置的属性赋值给java options的序列
    val sparkJavaOpts = Utils.sparkJavaOpts(conf)
    //所有的javaOpts
    val javaOpts = sparkJavaOpts ++ extraJavaOpts
    val command = new Command(mainClass,
    Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
    sys.env, classPathEntries, libraryPathEntries, javaOpts)
    //将以上所有的信息封装在DriverDescription中
    val driverDescription = new DriverDescription(
    driverArgs.jarUrl,
    driverArgs.memory,
    driverArgs.cores,
    driverArgs.supervise,
    command)
    //异步请求给master发送Driver的信息
    ayncSendToMasterAndForwardReplySubmitDriverResponse
    case "kill" =>
    val driverId = driverArgs.driverId
    ayncSendToMasterAndForwardReplyKillDriverResponse
    }
    }
    </code>
    (3)onstop方法简单,略过。
    (4)receive方法如下,
    <code>
    override def receive: PartialFunction[Any, Unit] = {
    //收到master的响应回来的Driver信息,因为master是管家,Client是老板
    case SubmitDriverResponse(master, success, driverId, message) =>
    logInfo(message)
    if (success) {
    //将当前的activeMasterEndpoint设置为响应消息的master
    activeMasterEndpoint = master
    //找到driver的信息然后退出JVM
    pollAndReportStatus(driverId.get)
    } else if (!Utils.responseFromBackup(message)) {
    System.exit(-1)
    }
    case KillDriverResponse(master, driverId, success, message) =>
    logInfo(message)
    if (success) {
    activeMasterEndpoint = master
    pollAndReportStatus(driverId),详见下①
    } else if (!Utils.responseFromBackup(message)) {
    System.exit(-1)
    }
    }
    </code>
    ①pollAndReportStatus方法如下,用于找到driver的信息然后退出JVM
    <code>
    def pollAndReportStatus(driverId: String) {
    logInfo("... waiting before polling master for driver state")
    Thread.sleep(5000)
    logInfo("... polling master for driver state")
    //master请求得到Driver的信息
    val statusResponse =
    activeMasterEndpoint.askWithRetryDriverStatusResponse
    statusResponse.found match {
    case false =>
    logError(s"ERROR: Cluster master did not recognize $driverId")
    System.exit(-1)
    case true =>
    logInfo(s"State of $driverId is ${statusResponse.state.get}")
    //返回的其实是worker的信息
    (statusResponse.workerId, statusResponse.workerHostPort, statusResponse.state) match {
    case (Some(id), Some(hostPort), Some(DriverState.RUNNING)) =>
    logInfo(s"Driver running on $hostPort ($id)")
    case _ =>
    }
    statusResponse.exception.map { e =>
    logError(s"Exception from cluster was: $e")
    e.printStackTrace()
    System.exit(-1)
    }
    System.exit(0)
    }
    }
    </code>

    相关文章

      网友评论

        本文标题:Spark中Client源码分析(二)

        本文链接:https://www.haomeiwen.com/subject/kkullttx.html