美文网首页
Spark源码分析十一-RPC启动流程

Spark源码分析十一-RPC启动流程

作者: 无色的叶 | 来源:发表于2020-07-28 11:23 被阅读0次

    RpcEndpoint启动流程

    HelloworldServer{
      ......
      def main(args: Array[String]): Unit = {
        //val host = args(0)
        val host = "localhost"
        val config = RpcEnvServerConfig(new RpcConf(), "hello-server", host, 52345)
        val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
        val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
        rpcEnv.setupEndpoint("hello-service", helloEndpoint)
        rpcEnv.awaitTermination()
      }
      ......
    }
    
    image.png

    Spark RPC 服务端 NettyRpcEnvFactory.create(config)

    val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
    
    object NettyRpcEnvFactory extends RpcEnvFactory {
        ......
        def create(config: RpcEnvConfig): RpcEnv = {
            val conf = config.conf
        
            // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
            // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
            val javaSerializerInstance =
            new JavaSerializer(conf).newInstance().asInstanceOf[JavaSerializerInstance]
            //根据配置以及地址,new 一个 NettyRpcEnv ,
            val nettyEnv =
            new NettyRpcEnv(conf, javaSerializerInstance, config.bindAddress)
            //如果是服务端创建的,那么会启动服务。服务端和客户端都会通过这个方法创建一个 NettyRpcEnv ,但区别就在这里了。
            if (!config.clientMode) {
            val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
                //启动服务的方法,下一步就是调用这个方法了
                nettyEnv.startServer(config.bindAddress, actualPort)
                (nettyEnv, nettyEnv.address.port)
            }
            try {
                Utils.startServiceOnPort(config.port, startNettyRpcEnv, conf, config.name)._1
            } catch {
                case NonFatal(e) =>
                nettyEnv.shutdown()
                throw e
            }
            }
            nettyEnv
        }
        ......
    }
    

    主要的功能是创建 RPCEnv ,即 NettyRpcEnv(客户端在后面说) 。以及通过下面这行代码

    nettyEnv.startServer(config.bindAddress, actualPort)
    

    去调用相应的方法启动服务端的服务。下面进入到这个方法中去看看

    class NettyRpcEnv(
                       val conf: RpcConf,
                       javaSerializerInstance: JavaSerializerInstance,
                       host: String) extends RpcEnv(conf) {
      ......
      def startServer(bindAddress: String, port: Int): Unit = {
        // here disable security
        val bootstraps: java.util.List[TransportServerBootstrap] = java.util.Collections.emptyList()
        //TransportContext 属于 spark.network 中的部分,负责 RPC 消息在网络中的传输
        server = transportContext.createServer(bindAddress, port, bootstraps)
        //在每个 RpcEndpoint 注册的时候都会注册一个默认的 RpcEndpointVerifier,它的作用是客户端调用的时候先用它来询问 Endpoint 是否存在。
        dispatcher.registerRpcEndpoint(
          RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
      }
      ......
    }
    

    执行完毕之后这个 create 方法就结束。这个流程主要就是开启一些服务,然后返回一个新的 NettyRpcEnv。

    Spark RPC 服务端 rpcEnv.setupEndpoint("hello-service", helloEndpoint)

    这条代码会去调用 NettyRpcEnv 中相应的方法

    class NettyRpcEnv(
                       val conf: RpcConf,
                       javaSerializerInstance: JavaSerializerInstance,
                       host: String) extends RpcEnv(conf) {
      ......
      override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
        dispatcher.registerRpcEndpoint(name, endpoint)
      }
      ......
    }
    

    我们看到,这个方法主要是调用 dispatcher 进行注册的,Dispatcher 是消息的分发器,负责将消息分发给适合的 endpoint

    参考:
    https://www.jianshu.com/p/5d37f4f370fb
    https://www.cnblogs.com/johnny666888/p/11128634.html

    相关文章

      网友评论

          本文标题:Spark源码分析十一-RPC启动流程

          本文链接:https://www.haomeiwen.com/subject/ybxfrktx.html