
0. Server的创建与启动
上半部分介绍了executor通过rpcEnv获取SparkConfig,介绍了Client的创建,链接,以及消息是如何通过rpcEnv进行发送,最后又是如何将返回值给到调用者。
接下来看一下,server端在获取到请求后是如何投递消息到正确的RpcEndpoint以及如何将响应数据写出的。
上文中介绍到TransportContext,这里封装了Netty Server / Client的Handler的操作。Client端处理返回值是靠TransportResponseHandler, Server端处理请求是靠的TransportRequestHandler与NettyRpcHandler
作为在sparkContext中初始化的一部分,NettyRpcEnv会启动执行启TransportServer.
Server作为rpc请求的入口,Server收到请求后会将消息交给Dispatcher进行路由,投递到正确的RpcEndpoint的消息队列中
NettyRpcEnv初始化
NettyRpcEnv#create
def create(config: RpcEnvConfig): RpcEnv = {
...
//创建nettyRpcEnv
val nettyEnv =
new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
config.securityManager, config.numUsableCores)
//as server
if (!config.clientMode) {
val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
nettyEnv.startServer(config.bindAddress, actualPort)
(nettyEnv, nettyEnv.address.port)
}
...
Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
}
nettyEnv
}
NettyRpcEnv#startServer
def startServer(bindAddress: String, port: Int): Unit = {
...
server = transportContext.createServer(bindAddress, port, bootstraps)
dispatcher.registerRpcEndpoint(
RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
}
transportContext作为NettyRpcEnv的字段,实例化时已经创建好了。启动完成后立即就注册了一个RpcEndpoint。这个在跟踪完启动过程再来叙述。
[common/network-common] org.apache.spark.network.TransportContext#createServer
/** Create a server which will attempt to bind to a specific host and port. */
public TransportServer createServer(
String host, int port, List<TransportServerBootstrap> bootstraps) {
return new TransportServer(this, host, port, rpcHandler, bootstraps);
}
进一跟进TransportServer的创建
TransportServer
[common/network-common] org.apache.spark.network.server.TransportServer
public class TrasnportServer implements closeable {
//唯一的构造函数
public TransportServer(
TransportContext context,
String hostToBind,
int portToBind,
RpcHandler appRpcHandler,
List<TransportServerBootstrap> bootstraps) {
...
init(hostToBind, portToBind);
...
}
private void init(String hostToBind, int portToBind) {
...
//标准的netty server的建立了
EventLoopGroup workerGroup = NettyUtils.createEventLoop(ioMode, conf.serverThreads(),
conf.getModuleName() + "-server");
bootstrap = new ServerBootstrap()
.group(bossGroup, workerGroup)
.channel(NettyUtils.getServerChannelClass(ioMode))
.option(ChannelOption.ALLOCATOR, pooledAllocator)
.option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
.childOption(ChannelOption.ALLOCATOR, pooledAllocator);
...
//这里面需要大篇幅介绍,暂时就不管了,先关注server建立
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
...
});
InetSocketAddress address = hostToBind == null ?
new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
channelFuture = bootstrap.bind(address);
...
}
}
NettyRpcEnv启动了server之后,紧接着就注册了一个RpcEndpoit。这里就引入了新的概念Dispatcher, RpcEndpoint
Dispatcher
[spark-core] org.apache.spark.rpc.netty.Dispatcher
//A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
private val endpoints: ConcurrentMap[String, MessageLoop] // 注册所有的MessageLoop
private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] //endpoint与endpointRef的映射
private lazy val sharedLoop: SharedMessageLoop //多个endpoint共享的MessageLoop.
...
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
//位置标识一个rpcEndpoint, RpcEndponitRef也是靠这个对象寻址RpcEndpoint
val addr = RpcEndpointAddress(nettyEnv.address, name)
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
...
// This must be done before assigning RpcEndpoint to MessageLoop, as MessageLoop sets Inbox be
// active when registering, and endpointRef must be put into endpointRefs before onStart is called.
endpointRefs.put(endpoint, endpointRef)
var messageLoop: MessageLoop = null
try {
messageLoop = endpoint match {
case e: IsolatedRpcEndpoint =>
new DedicatedMessageLoop(name, e, this)
case _ =>
//共享messagLoop,主要是共用线程池,内部提供更多的操作可以互斥/并行
sharedLoop.register(name, endpoint)
sharedLoop
}
endpoints.put(name, messageLoop)
} catch {...}
}
endpointRef
}
}
在看TransportServer Handler的设置
[common/network-common] org.apache.spark.network.server.TransportServer#init
private void init(String hostToBind, int portToBind) {
...
//这里的代码和client端配置handler非常的相似
bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) {
logger.debug("New connection accepted for remote address {}.", ch.remoteAddress());
RpcHandler rpcHandler = appRpcHandler;
for (TransportServerBootstrap bootstrap : bootstraps) {
rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
}
//server/client都通过TransportContext进行handler的逻辑控制
context.initializePipeline(ch, rpcHandler);
}
});
...
}
Server Netty Handler
继续回顾TransportServer启动的过程,注册了Channel Handler来处理socket链接。TransportContext创建了TransportChannelHandler并注册到了ServerBootstrap.childeHanlder中。即TransportChannelHandler的责任就是处理外部请求。
[common/network-common] org.apache.spark.network.TranportContext#createChannelHandler
/**
* Creates the server- and client-side handler which is used to handle both RequestMessages and
* ResponseMessages. The channel is expected to have been successfully created, though certain
* properties (such as the remoteAddress()) may not be available yet.
*/
private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
//client处理返回值
TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
TransportClient client = new TransportClient(channel, responseHandler);
boolean separateChunkFetchRequest = conf.separateChunkFetchRequest();
ChunkFetchRequestHandler chunkFetchRequestHandler = null;
if (!separateChunkFetchRequest) {
chunkFetchRequestHandler = new ChunkFetchRequestHandler(
client, rpcHandler.getStreamManager(),
conf.maxChunksBeingTransferred(), false /* syncModeEnabled */);
}
//server处理请求
TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
rpcHandler, conf.maxChunksBeingTransferred(), chunkFetchRequestHandler);
return new TransportChannelHandler(client, responseHandler, requestHandler,
conf.connectionTimeoutMs(), separateChunkFetchRequest, closeIdleConnections, this);
}
1.接收到请求
当请求来到server端时。请求从TransportChannelHandler委托给了TransportRequestHandler。这里处理的请求种类很多,我们只关注RpcRequest这类请求,其它种类的请求会在其它模块中涉及到。
TransportRequestHandler
[common/network-common] org.apache.spark.network.server.TransportRequestHandler
/**
* A handler that processes requests from clients and writes chunk data back. Each handler is
* attached to a single Netty channel, and keeps track of which streams have been fetched via this
* channel, in order to clean them up if the channel is terminated (see #channelUnregistered).
*
* The messages should have been processed by the pipeline setup by {@link TransportServer}.
*/
public class TransportRequestHandler extends MessageHandler<RequestMessage> {
/** The Netty channel that this handler is associated with. */
private final Channel channel;
/** Client on the same channel allowing us to talk back to the requester. */
private final TransportClient reverseClient;
/** Handles all RPC messages. */
private final RpcHandler rpcHandler;
...
@Override
public void handle(RequestMessage request) throws Exception {
if (request instanceof ChunkFetchRequest) {
chunkFetchRequestHandler.processFetchRequest(channel, (ChunkFetchRequest) request);
//现在我们关注远程的rpc请求
} else if (request instanceof RpcRequest) {
processRpcRequest((RpcRequest) request);
} else if (request instanceof OneWayMessage) {
processOneWayMessage((OneWayMessage) request);
} else if (request instanceof StreamRequest) {
processStreamRequest((StreamRequest) request);
} else if (request instanceof UploadStream) {
processStreamUpload((UploadStream) request);
} else {
throw new IllegalArgumentException("Unknown request type: " + request);
}
}
//处理rpcRequest
private void processRpcRequest(final RpcRequest req) {
try {
//委托给rpcHandler
rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
//回调,处理最后的返回
@Override
public void onSuccess(ByteBuffer response) {
respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
}
...
});
} catch (Exception e) {...}
}
}
rpcHandler是NettyEnv在创建TransportContext时一同创建的,作为了TransportContext的构造函数参数,同样,TransportContext在创建TransportRequestHandler时作为构造函数参数传入,这里的rpcHandler即为NettyRpcHandler,该类与NettyRpcEnv在同一文件中。
[spark-core] org.apache.spark.rpc.netty.NettyRpcHandler#receive
override def receive(
client: TransportClient,
message: ByteBuffer,
callback: RpcResponseCallback): Unit = {
val messageToDispatch = internalReceive(client, message)
//交给Dispatcher去路由到正确的endpoint
dispatcher.postRemoteMessage(messageToDispatch, callback)
}
识别RpcEndpoint
如何知道正确的RpcEndpoint, 从上篇知道,对一个endpoint发起请求需要使用其对应的RpcEndpointRef,RpcEndpoint都绑定了一个endpointName,那么就需要在请求体中带有endpoint的识别信息,server端才能找到对应的RpcEndpoint。
[spark-core] org.apache.spark.rpc.netty.RequestMessae#serialze
/** Manually serialize [[RequestMessage]] to minimize the size. */
def serialize(nettyEnv: NettyRpcEnv): ByteBuffer = {
val bos = new ByteBufferOutputStream()
val out = new DataOutputStream(bos)
try {
//写入本地地址
writeRpcAddress(out, senderAddress)
//写入server端地址
writeRpcAddress(out, receiver.address)
//写endpointName
out.writeUTF(receiver.name)
val s = nettyEnv.serializeStream(out)
try {
//case class,以我们的例子,这里就是 RetrieveSparkAppConfig
s.writeObject(content)
} finally {
s.close()
}
} finally {
out.close()
}
bos.toByteBuffer
}
2.消息投递 Dispatcher
继续回到Dispatcher中,跟踪消息如何投递到endpoint.
[spark-core] org.apache.spark.rpc.netty.Dispatcher
/**
* A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
*/
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
//endponitName -> messageLoop
private val endpoints: ConcurrentMap[String, MessageLoop] =
new ConcurrentHashMap[String, MessageLoop]
...//
/** Posts a message sent by a remote endpoint. */
def postRemoteMessage(message: RequestMessage, callback: RpcResponseCallback): Unit = {
val rpcCallContext =
new RemoteNettyRpcCallContext(nettyEnv, callback, message.senderAddress)
val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)
postMessage(message.receiver.name, rpcMessage, (e) => callback.onFailure(e))
}
/**
* Posts a message to a specific endpoint.
*
* @param endpointName name of the endpoint.
* @param message the message to post
* @param callbackIfStopped callback function if the endpoint is stopped.
*/
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
...
//根据request消息体里的endpointName找到与endponit绑定的Message队列
val loop = endpoints.get(endpointName)
...
//投递消息
loop.post(endpointName, message)
...
}
}
到此消息就已经投递到正确的endpoint的消息队列中。接下来我们需要跟踪消息处理,查看消息的处理与返回过程。
3.Endpoint消费消息
回顾一下上面说到的Server端的启动,driverEndpoint是注册到Dispatcher中,在注册过程中,dispacther还为他创建了一个MessageLoop。MessageLoop的设计稍后详细分析。现在继续跟踪请求。
DriverEndpoint使用的是DedicatedMessageLoop为MessageLoop的子类。是一种单独给一个endpoint使用MessageLoop。 MessageLoop还可以是共享,多个endponit共用一个MessageLoop,但是每个endpoint都用一个属于自己的Inbox,这里的Inbox可以比作是信箱,Dispatcher的路由过程就是找到正确的Inbox,让后把消息加入到Inbox的消息队列中。
[spark-core] org.apache.spark.rpc.netty.DedicatedMessageLoop
/**
* A message loop that is dedicated to a single RPC endpoint.
*/
private class DedicatedMessageLoop(
name: String,
endpoint: IsolatedRpcEndpoint,
dispatcher: Dispatcher) extends MessageLoop(dispatcher) {
private val inbox = new Inbox(name, endpoint) //绑定的信箱
//inbox的工作线程
override protected val threadpool = if (endpoint.threadCount() > 1) {
ThreadUtils.newDaemonCachedThreadPool(s"dispatcher-$name", endpoint.threadCount())
} else {
ThreadUtils.newDaemonSingleThreadExecutor(s"dispatcher-$name")
}
//创建实例时就启动线程池开始工作
(1 to endpoint.threadCount()).foreach { _ =>
threadpool.submit(receiveLoopRunnable)
}
//同样,在创建实例时标记inbox为激活状态
//工作线程从active的inboxs队列中取出激活的inbox开始处理消息
// Mark active to handle the OnStart message.
setActive(inbox)
//投递消息到inbox的消息队列中
override def post(endpointName: String, message: InboxMessage): Unit = {
require(endpointName == name)
inbox.post(message)
setActive(inbox)
}
}
到这里,消息一经放入到Inbox中,且当前的Inbox处于激活状态。那么在Inbox的工作线程里,就会对消息进行处理,也就是receiveLoopRunnable
进行。这个变量属于MessageLoop这个类,是一个Runable实例,run方法执行了MessageLoop的receiveLoop.
[spark-core] org.apache.spark.rpc.netty.MessageLoop#receiveLoop
private def receiveLoop(): Unit = {
try {
while (true) {
try {
//active是一个队列,表示哪些inbox有消息
val inbox = active.take()
//PoisonPill中断信号
if (inbox == MessageLoop.PoisonPill) {
// Put PoisonPill back so that other threads can see it.
setActive(MessageLoop.PoisonPill)
return
}
//处理消息
inbox.process(dispatcher)
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
}
} catch {...}
}
Inbox根据消息的类型同选择调用endpoint的方法。
对于我们的请求数据类型。进入到DriverEndpoint的receiveAndReply方法
[spark-core] org.apache.spark.scheduer.cluster.DriverEndpoint#
override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
...
//获取SparkAppConfig
case RetrieveSparkAppConfig(resourceProfileId) =>
// note this will be updated in later prs to get the ResourceProfile from a
// ResourceProfileManager that matches the resource profile id
// for now just use default profile
val rp = ResourceProfile.getOrCreateDefaultProfile(conf)
val reply = SparkAppConfig(
sparkProperties,
SparkEnv.get.securityManager.getIOEncryptionKey(),
Option(delegationTokens.get()),
rp)
context.reply(reply)
}
这里的context变量是对TransprtRequestHandler总处理请求的RpcResponseCallback的封装,也就是RemoteNettyRpcCallContext,
4.返回请求
这样多层的封装。我们看到,整个通信会有多种这样的对象封装对象,似乎表达的都是一个意思。但是这里的做法,是为了对上层使用屏蔽掉本地或者远程。对使用者透明,实现层进行多维度封装。
那么进行到这里,我们就了解到,最终返回是触发TransportRequestHandler中的定义的回调。
[common/network-common] org.apache.spark.network.server.TransportRequestHandler#processRpcRequest
rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
@Override
public void onSuccess(ByteBuffer response) {
//endpoint完成处理,写出返回值
respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
}
...//
});
[common/network-common] org.apache.spark.network.server.TransportRequestHandler#respond
/**
* Responds to a single message with some Encodable object. If a failure occurs while sending,
* it will be logged and the channel closed.
*/
private ChannelFuture respond(Encodable result) {
SocketAddress remoteAddress = channel.remoteAddress();
//通过netty channel将数据返回client端
return channel.writeAndFlush(result).addListener(future -> {
if (future.isSuccess()) {
logger.trace("Sent result {} to client {}", result, remoteAddress);
} else {
logger.error(String.format("Error sending result %s to %s; closing connection",
result, remoteAddress), future.cause());
channel.close();
}
});
}
}
注:基于Apache Spark 3.0
作者:pokerwu
本作品采用知识共享署名-非商业性使用 4.0 国际许可协议进行许可。
网友评论