美文网首页
Spark源码分析十一-RPC启动流程

Spark源码分析十一-RPC启动流程

作者: 无色的叶 | 来源:发表于2020-07-28 11:23 被阅读0次

RpcEndpoint启动流程

HelloworldServer{
  ......
  def main(args: Array[String]): Unit = {
    //val host = args(0)
    val host = "localhost"
    val config = RpcEnvServerConfig(new RpcConf(), "hello-server", host, 52345)
    val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
    val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
    rpcEnv.setupEndpoint("hello-service", helloEndpoint)
    rpcEnv.awaitTermination()
  }
  ......
}
image.png

Spark RPC 服务端 NettyRpcEnvFactory.create(config)

val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
object NettyRpcEnvFactory extends RpcEnvFactory {
    ......
    def create(config: RpcEnvConfig): RpcEnv = {
        val conf = config.conf
    
        // Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
        // KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
        val javaSerializerInstance =
        new JavaSerializer(conf).newInstance().asInstanceOf[JavaSerializerInstance]
        //根据配置以及地址,new 一个 NettyRpcEnv ,
        val nettyEnv =
        new NettyRpcEnv(conf, javaSerializerInstance, config.bindAddress)
        //如果是服务端创建的,那么会启动服务。服务端和客户端都会通过这个方法创建一个 NettyRpcEnv ,但区别就在这里了。
        if (!config.clientMode) {
        val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
            //启动服务的方法,下一步就是调用这个方法了
            nettyEnv.startServer(config.bindAddress, actualPort)
            (nettyEnv, nettyEnv.address.port)
        }
        try {
            Utils.startServiceOnPort(config.port, startNettyRpcEnv, conf, config.name)._1
        } catch {
            case NonFatal(e) =>
            nettyEnv.shutdown()
            throw e
        }
        }
        nettyEnv
    }
    ......
}

主要的功能是创建 RPCEnv ,即 NettyRpcEnv(客户端在后面说) 。以及通过下面这行代码

nettyEnv.startServer(config.bindAddress, actualPort)

去调用相应的方法启动服务端的服务。下面进入到这个方法中去看看

class NettyRpcEnv(
                   val conf: RpcConf,
                   javaSerializerInstance: JavaSerializerInstance,
                   host: String) extends RpcEnv(conf) {
  ......
  def startServer(bindAddress: String, port: Int): Unit = {
    // here disable security
    val bootstraps: java.util.List[TransportServerBootstrap] = java.util.Collections.emptyList()
    //TransportContext 属于 spark.network 中的部分,负责 RPC 消息在网络中的传输
    server = transportContext.createServer(bindAddress, port, bootstraps)
    //在每个 RpcEndpoint 注册的时候都会注册一个默认的 RpcEndpointVerifier,它的作用是客户端调用的时候先用它来询问 Endpoint 是否存在。
    dispatcher.registerRpcEndpoint(
      RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
  }
  ......
}

执行完毕之后这个 create 方法就结束。这个流程主要就是开启一些服务,然后返回一个新的 NettyRpcEnv。

Spark RPC 服务端 rpcEnv.setupEndpoint("hello-service", helloEndpoint)

这条代码会去调用 NettyRpcEnv 中相应的方法

class NettyRpcEnv(
                   val conf: RpcConf,
                   javaSerializerInstance: JavaSerializerInstance,
                   host: String) extends RpcEnv(conf) {
  ......
  override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
    dispatcher.registerRpcEndpoint(name, endpoint)
  }
  ......
}

我们看到,这个方法主要是调用 dispatcher 进行注册的,Dispatcher 是消息的分发器,负责将消息分发给适合的 endpoint

参考:
https://www.jianshu.com/p/5d37f4f370fb
https://www.cnblogs.com/johnny666888/p/11128634.html

相关文章

网友评论

      本文标题:Spark源码分析十一-RPC启动流程

      本文链接:https://www.haomeiwen.com/subject/ybxfrktx.html