RpcEndpoint启动流程
HelloworldServer{
......
def main(args: Array[String]): Unit = {
//val host = args(0)
val host = "localhost"
val config = RpcEnvServerConfig(new RpcConf(), "hello-server", host, 52345)
val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
val helloEndpoint: RpcEndpoint = new HelloEndpoint(rpcEnv)
rpcEnv.setupEndpoint("hello-service", helloEndpoint)
rpcEnv.awaitTermination()
}
......
}
image.png
Spark RPC 服务端 NettyRpcEnvFactory.create(config)
val rpcEnv: RpcEnv = NettyRpcEnvFactory.create(config)
object NettyRpcEnvFactory extends RpcEnvFactory {
......
def create(config: RpcEnvConfig): RpcEnv = {
val conf = config.conf
// Use JavaSerializerInstance in multiple threads is safe. However, if we plan to support
// KryoSerializer in future, we have to use ThreadLocal to store SerializerInstance
val javaSerializerInstance =
new JavaSerializer(conf).newInstance().asInstanceOf[JavaSerializerInstance]
//根据配置以及地址,new 一个 NettyRpcEnv ,
val nettyEnv =
new NettyRpcEnv(conf, javaSerializerInstance, config.bindAddress)
//如果是服务端创建的,那么会启动服务。服务端和客户端都会通过这个方法创建一个 NettyRpcEnv ,但区别就在这里了。
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
//启动服务的方法,下一步就是调用这个方法了
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
try {
Utils.startServiceOnPort(config.port, startNettyRpcEnv, conf, config.name)._1
} catch {
case NonFatal(e) =>
nettyEnv.shutdown()
throw e
}
}
nettyEnv
}
......
}
主要的功能是创建 RPCEnv ,即 NettyRpcEnv(客户端在后面说) 。以及通过下面这行代码
nettyEnv.startServer(config.bindAddress, actualPort)
去调用相应的方法启动服务端的服务。下面进入到这个方法中去看看
class NettyRpcEnv(
val conf: RpcConf,
javaSerializerInstance: JavaSerializerInstance,
host: String) extends RpcEnv(conf) {
......
def startServer(bindAddress: String, port: Int): Unit = {
// here disable security
val bootstraps: java.util.List[TransportServerBootstrap] = java.util.Collections.emptyList()
//TransportContext 属于 spark.network 中的部分,负责 RPC 消息在网络中的传输
server = transportContext.createServer(bindAddress, port, bootstraps)
//在每个 RpcEndpoint 注册的时候都会注册一个默认的 RpcEndpointVerifier,它的作用是客户端调用的时候先用它来询问 Endpoint 是否存在。
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}
......
}
执行完毕之后这个 create 方法就结束。这个流程主要就是开启一些服务,然后返回一个新的 NettyRpcEnv。
Spark RPC 服务端 rpcEnv.setupEndpoint("hello-service", helloEndpoint)
这条代码会去调用 NettyRpcEnv 中相应的方法
class NettyRpcEnv(
val conf: RpcConf,
javaSerializerInstance: JavaSerializerInstance,
host: String) extends RpcEnv(conf) {
......
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
dispatcher.registerRpcEndpoint(name, endpoint)
}
......
}
我们看到,这个方法主要是调用 dispatcher 进行注册的,Dispatcher 是消息的分发器,负责将消息分发给适合的 endpoint
参考:
https://www.jianshu.com/p/5d37f4f370fb
https://www.cnblogs.com/johnny666888/p/11128634.html
网友评论