driver的网络通信
-SparkContext.SparkEnv
private[spark] def env: SparkEnv = _env
_env = createSparkEnv(_conf, isLocal, listenerBus)
val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
new NettyRpcEnvFactory().create(config)
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager, config.numUsableCores)
private[rpc] class NettyRpcEnvFactory extends RpcEnvFactory with Logging {
def create(config: RpcEnvConfig): RpcEnv = {
... ...
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager, config.numUsableCores)
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
try {
//使用端口启动服务
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
} catch {
... ...
}
}
nettyEnv
}
}
-nettyEnv.startServer(config.bindAddress, actualPort)
def startServer(bindAddress: String, port: Int): Unit = {
... ...
server = transportContext.createServer(bindAddress, port, bootstraps)
//注册通信终端
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}
RpcEndpoint 是用作做接受数据的receive*
def receive: PartialFunction[Any, Unit] = {
case _ => throw new SparkException(self + " does not implement 'receive'")
}
def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
case _ => context.sendFailure(new SparkException(self + " won't reply anything"))
}
RpcEndpointRef 是用来做发送的send or ask
final def self: RpcEndpointRef = {
require(rpcEnv != null, "rpcEnv has not been initialized")
rpcEnv.endpointRef(this)
}
注意的是RpcEndpoint 还有一个收件箱,
inbox message list概念
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
}
if (endpoints.containsKey(name)) {
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
}
// This must be done before assigning RpcEndpoint to MessageLoop, as MessageLoop sets Inbox be
// active when registering, and endpointRef must be put into endpointRefs before onStart is
// called.
endpointRefs.put(endpoint, endpointRef)
//收件箱
var messageLoop: MessageLoop = null
try {
messageLoop = endpoint match {
case e: IsolatedRpcEndpoint =>
new DedicatedMessageLoop(name, e, this)
case _ =>
sharedLoop.register(name, endpoint)
sharedLoop
}
endpoints.put(name, messageLoop)
} catch {
case NonFatal(e) =>
endpointRefs.remove(endpoint)
throw e
}
}
endpointRef
}
private val dispatcher: Dispatcher = new Dispatcher(this, numUsableCores)
//发件箱 根据RpcAddress会有多个
private val outboxes = new ConcurrentHashMapRpcAddress, Outbox
-transportContext.createServer(bindAddress, port, bootstraps)
-new TransportServer(this, host, port, rpcHandler, bootstraps)
//使用ipport启动服务
-TransportServer.init
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, pooledAllocator)
.option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
.childOption(ChannelOption.ALLOCATOR, pooledAllocator);
//NettyUtils
/** Returns the correct ServerSocketChannel class based on IOMode. */
public static Class<? extends ServerChannel> getServerChannelClass(IOMode mode) {
switch (mode) {
case NIO:
return NioServerSocketChannel.class;
case EPOLL:
//epoll方式模拟aio 异步io
return EpollServerSocketChannel.class;
default:
throw new IllegalArgumentException("Unknown io mode: " + mode);
}
}
excutor的 网络通信
CoarseGrainedExecutorBackend[object]
-val env = SparkEnv.createExecutorEnv
-SparkEnv.create
-val rpcEnv = RpcEnv.create
env.rpcEnv.setupEndpoint("Executor",
backendCreateFn(env.rpcEnv, arguments, env, cfg.resourceProfile))
操作流程同driver ...
网友评论