美文网首页
Apache Spark之Rpc(下)

Apache Spark之Rpc(下)

作者: pokerWu | 来源:发表于2020-07-17 16:56 被阅读0次
SparkNetty.png

0. Server的创建与启动

上半部分介绍了executor通过rpcEnv获取SparkConfig,介绍了Client的创建,链接,以及消息是如何通过rpcEnv进行发送,最后又是如何将返回值给到调用者。

接下来看一下,server端在获取到请求后是如何投递消息到正确的RpcEndpoint以及如何将响应数据写出的。

上文中介绍到TransportContext,这里封装了Netty Server / Client的Handler的操作。Client端处理返回值是靠TransportResponseHandler, Server端处理请求是靠的TransportRequestHandler与NettyRpcHandler

作为在sparkContext中初始化的一部分,NettyRpcEnv会启动执行启TransportServer.

Server作为rpc请求的入口,Server收到请求后会将消息交给Dispatcher进行路由,投递到正确的RpcEndpoint的消息队列中

NettyRpcEnv初始化

NettyRpcEnv#create

def create(config: RpcEnvConfig): RpcEnv = {
    ...
  //创建nettyRpcEnv
    val nettyEnv =
      new NettyRpcEnv(sparkConf, javaSerializerInstance, config.advertiseAddress,
        config.securityManager, config.numUsableCores)
  //as server
    if (!config.clientMode) {
      val startNettyRpcEnv: Int => (NettyRpcEnv, Int) = { actualPort =>
        nettyEnv.startServer(config.bindAddress, actualPort)
        (nettyEnv, nettyEnv.address.port)
      }
      ...
      Utils.startServiceOnPort(config.port, startNettyRpcEnv, sparkConf, config.name)._1
     }
    nettyEnv
  }

NettyRpcEnv#startServer

def startServer(bindAddress: String, port: Int): Unit = {
    ...
    server = transportContext.createServer(bindAddress, port, bootstraps)
    dispatcher.registerRpcEndpoint(
      RpcEndpointVerifier.NAME, new RpcEndpointVerifier(this, dispatcher))
  }

transportContext作为NettyRpcEnv的字段,实例化时已经创建好了。启动完成后立即就注册了一个RpcEndpoint。这个在跟踪完启动过程再来叙述。

[common/network-common] org.apache.spark.network.TransportContext#createServer

 /** Create a server which will attempt to bind to a specific host and port. */
  public TransportServer createServer(
      String host, int port, List<TransportServerBootstrap> bootstraps) {
    return new TransportServer(this, host, port, rpcHandler, bootstraps);
  }

进一跟进TransportServer的创建

TransportServer

[common/network-common] org.apache.spark.network.server.TransportServer

public class TrasnportServer implements closeable {
  //唯一的构造函数
  public TransportServer(
      TransportContext context,
      String hostToBind,
      int portToBind,
      RpcHandler appRpcHandler,
      List<TransportServerBootstrap> bootstraps) {
    ...
    init(hostToBind, portToBind);
    ...
  }
  
 private void init(String hostToBind, int portToBind) {
   ...
   //标准的netty server的建立了
   EventLoopGroup workerGroup =  NettyUtils.createEventLoop(ioMode, conf.serverThreads(),
      conf.getModuleName() + "-server");
   bootstrap = new ServerBootstrap()
      .group(bossGroup, workerGroup)
      .channel(NettyUtils.getServerChannelClass(ioMode))
      .option(ChannelOption.ALLOCATOR, pooledAllocator)
      .option(ChannelOption.SO_REUSEADDR, !SystemUtils.IS_OS_WINDOWS)
      .childOption(ChannelOption.ALLOCATOR, pooledAllocator);
   ...
   //这里面需要大篇幅介绍,暂时就不管了,先关注server建立
   bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      ...
    });
    InetSocketAddress address = hostToBind == null ?
        new InetSocketAddress(portToBind): new InetSocketAddress(hostToBind, portToBind);
    channelFuture = bootstrap.bind(address);
   ...
 }
}

NettyRpcEnv启动了server之后,紧接着就注册了一个RpcEndpoit。这里就引入了新的概念Dispatcher, RpcEndpoint

Dispatcher

[spark-core] org.apache.spark.rpc.netty.Dispatcher

//A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {

  private val endpoints: ConcurrentMap[String, MessageLoop] // 注册所有的MessageLoop
  private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] //endpoint与endpointRef的映射
  private lazy val sharedLoop: SharedMessageLoop //多个endpoint共享的MessageLoop.
  ...
  
 def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
   //位置标识一个rpcEndpoint, RpcEndponitRef也是靠这个对象寻址RpcEndpoint
    val addr = RpcEndpointAddress(nettyEnv.address, name)
    val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
    synchronized {
      ...
      // This must be done before assigning RpcEndpoint to MessageLoop, as MessageLoop sets Inbox be
      // active when registering, and endpointRef must be put into endpointRefs before onStart is called.
      endpointRefs.put(endpoint, endpointRef)
      var messageLoop: MessageLoop = null
      try {
        messageLoop = endpoint match {
          case e: IsolatedRpcEndpoint =>
            new DedicatedMessageLoop(name, e, this)
          case _ =>
          //共享messagLoop,主要是共用线程池,内部提供更多的操作可以互斥/并行
            sharedLoop.register(name, endpoint)
            sharedLoop
        }
        endpoints.put(name, messageLoop)
      } catch {...}
    }
    endpointRef
  }
}

在看TransportServer Handler的设置

[common/network-common] org.apache.spark.network.server.TransportServer#init

private void init(String hostToBind, int portToBind) {
  ...
  //这里的代码和client端配置handler非常的相似
  bootstrap.childHandler(new ChannelInitializer<SocketChannel>() {
      @Override
      protected void initChannel(SocketChannel ch) {
        logger.debug("New connection accepted for remote address {}.", ch.remoteAddress());

        RpcHandler rpcHandler = appRpcHandler;
        for (TransportServerBootstrap bootstrap : bootstraps) {
          rpcHandler = bootstrap.doBootstrap(ch, rpcHandler);
        }
        //server/client都通过TransportContext进行handler的逻辑控制
        context.initializePipeline(ch, rpcHandler);
      }
    });
  ...
}

Server Netty Handler

继续回顾TransportServer启动的过程,注册了Channel Handler来处理socket链接。TransportContext创建了TransportChannelHandler并注册到了ServerBootstrap.childeHanlder中。即TransportChannelHandler的责任就是处理外部请求。

[common/network-common] org.apache.spark.network.TranportContext#createChannelHandler

/**
   * Creates the server- and client-side handler which is used to handle both RequestMessages and
   * ResponseMessages. The channel is expected to have been successfully created, though certain
   * properties (such as the remoteAddress()) may not be available yet.
   */
  private TransportChannelHandler createChannelHandler(Channel channel, RpcHandler rpcHandler) {
    //client处理返回值
    TransportResponseHandler responseHandler = new TransportResponseHandler(channel);
    TransportClient client = new TransportClient(channel, responseHandler);
    boolean separateChunkFetchRequest = conf.separateChunkFetchRequest();
    ChunkFetchRequestHandler chunkFetchRequestHandler = null;
    if (!separateChunkFetchRequest) {
      chunkFetchRequestHandler = new ChunkFetchRequestHandler(
        client, rpcHandler.getStreamManager(),
        conf.maxChunksBeingTransferred(), false /* syncModeEnabled */);
    }
    //server处理请求
    TransportRequestHandler requestHandler = new TransportRequestHandler(channel, client,
      rpcHandler, conf.maxChunksBeingTransferred(), chunkFetchRequestHandler);
    return new TransportChannelHandler(client, responseHandler, requestHandler,
      conf.connectionTimeoutMs(), separateChunkFetchRequest, closeIdleConnections, this);
  }

1.接收到请求

当请求来到server端时。请求从TransportChannelHandler委托给了TransportRequestHandler。这里处理的请求种类很多,我们只关注RpcRequest这类请求,其它种类的请求会在其它模块中涉及到。

TransportRequestHandler

[common/network-common] org.apache.spark.network.server.TransportRequestHandler

/**
 * A handler that processes requests from clients and writes chunk data back. Each handler is
 * attached to a single Netty channel, and keeps track of which streams have been fetched via this
 * channel, in order to clean them up if the channel is terminated (see #channelUnregistered).
 *
 * The messages should have been processed by the pipeline setup by {@link TransportServer}.
 */
public class TransportRequestHandler extends MessageHandler<RequestMessage> {
  /** The Netty channel that this handler is associated with. */
  private final Channel channel;
  /** Client on the same channel allowing us to talk back to the requester. */
  private final TransportClient reverseClient;
  /** Handles all RPC messages. */
  private final RpcHandler rpcHandler;
  ...
    
  @Override
  public void handle(RequestMessage request) throws Exception {
    if (request instanceof ChunkFetchRequest) {
      chunkFetchRequestHandler.processFetchRequest(channel, (ChunkFetchRequest) request);
      //现在我们关注远程的rpc请求
    } else if (request instanceof RpcRequest) {
      processRpcRequest((RpcRequest) request);
    } else if (request instanceof OneWayMessage) {
      processOneWayMessage((OneWayMessage) request);
    } else if (request instanceof StreamRequest) {
      processStreamRequest((StreamRequest) request);
    } else if (request instanceof UploadStream) {
      processStreamUpload((UploadStream) request);
    } else {
      throw new IllegalArgumentException("Unknown request type: " + request);
    }
  }
  //处理rpcRequest
  private void processRpcRequest(final RpcRequest req) {
    try {
      //委托给rpcHandler
      rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
        //回调,处理最后的返回
        @Override
        public void onSuccess(ByteBuffer response) {
          respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
        }
        ...
      });
    } catch (Exception e) {...}
  }
}

rpcHandler是NettyEnv在创建TransportContext时一同创建的,作为了TransportContext的构造函数参数,同样,TransportContext在创建TransportRequestHandler时作为构造函数参数传入,这里的rpcHandler即为NettyRpcHandler,该类与NettyRpcEnv在同一文件中。

[spark-core] org.apache.spark.rpc.netty.NettyRpcHandler#receive

 override def receive(
      client: TransportClient,
      message: ByteBuffer,
      callback: RpcResponseCallback): Unit = {
    val messageToDispatch = internalReceive(client, message)
   //交给Dispatcher去路由到正确的endpoint
    dispatcher.postRemoteMessage(messageToDispatch, callback)
  }

识别RpcEndpoint

如何知道正确的RpcEndpoint, 从上篇知道,对一个endpoint发起请求需要使用其对应的RpcEndpointRef,RpcEndpoint都绑定了一个endpointName,那么就需要在请求体中带有endpoint的识别信息,server端才能找到对应的RpcEndpoint。

[spark-core] org.apache.spark.rpc.netty.RequestMessae#serialze

/** Manually serialize [[RequestMessage]] to minimize the size. */
  def serialize(nettyEnv: NettyRpcEnv): ByteBuffer = {
    val bos = new ByteBufferOutputStream()
    val out = new DataOutputStream(bos)
    try {
      //写入本地地址
      writeRpcAddress(out, senderAddress)
      //写入server端地址
      writeRpcAddress(out, receiver.address)
      //写endpointName
      out.writeUTF(receiver.name)
      val s = nettyEnv.serializeStream(out)
      try {
        //case class,以我们的例子,这里就是 RetrieveSparkAppConfig
        s.writeObject(content)
      } finally {
        s.close()
      }
    } finally {
      out.close()
    }
    bos.toByteBuffer
  }

2.消息投递 Dispatcher

继续回到Dispatcher中,跟踪消息如何投递到endpoint.

[spark-core] org.apache.spark.rpc.netty.Dispatcher

/**
 * A message dispatcher, responsible for routing RPC messages to the appropriate endpoint(s).
 */
private[netty] class Dispatcher(nettyEnv: NettyRpcEnv, numUsableCores: Int) extends Logging {
  //endponitName -> messageLoop
  private val endpoints: ConcurrentMap[String, MessageLoop] =
    new ConcurrentHashMap[String, MessageLoop]
  ...//
  
  /** Posts a message sent by a remote endpoint. */
  def postRemoteMessage(message: RequestMessage, callback: RpcResponseCallback): Unit = {
    val rpcCallContext =
      new RemoteNettyRpcCallContext(nettyEnv, callback, message.senderAddress)
    val rpcMessage = RpcMessage(message.senderAddress, message.content, rpcCallContext)
    postMessage(message.receiver.name, rpcMessage, (e) => callback.onFailure(e))
  }
  /**
   * Posts a message to a specific endpoint.
   *
   * @param endpointName name of the endpoint.
   * @param message the message to post
   * @param callbackIfStopped callback function if the endpoint is stopped.
   */
  private def postMessage(
      endpointName: String,
      message: InboxMessage,
      callbackIfStopped: (Exception) => Unit): Unit = {
    ... 
    //根据request消息体里的endpointName找到与endponit绑定的Message队列
    val loop = endpoints.get(endpointName)
    ...
    //投递消息
    loop.post(endpointName, message)
    ... 
  }
}

到此消息就已经投递到正确的endpoint的消息队列中。接下来我们需要跟踪消息处理,查看消息的处理与返回过程。

3.Endpoint消费消息

回顾一下上面说到的Server端的启动,driverEndpoint是注册到Dispatcher中,在注册过程中,dispacther还为他创建了一个MessageLoop。MessageLoop的设计稍后详细分析。现在继续跟踪请求。

DriverEndpoint使用的是DedicatedMessageLoop为MessageLoop的子类。是一种单独给一个endpoint使用MessageLoop。 MessageLoop还可以是共享,多个endponit共用一个MessageLoop,但是每个endpoint都用一个属于自己的Inbox,这里的Inbox可以比作是信箱,Dispatcher的路由过程就是找到正确的Inbox,让后把消息加入到Inbox的消息队列中。

[spark-core] org.apache.spark.rpc.netty.DedicatedMessageLoop

/**
 * A message loop that is dedicated to a single RPC endpoint.
 */
private class DedicatedMessageLoop(
    name: String,
    endpoint: IsolatedRpcEndpoint,
    dispatcher: Dispatcher) extends MessageLoop(dispatcher) { 
  private val inbox = new Inbox(name, endpoint) //绑定的信箱
    //inbox的工作线程
  override protected val threadpool = if (endpoint.threadCount() > 1) {
    ThreadUtils.newDaemonCachedThreadPool(s"dispatcher-$name", endpoint.threadCount())
  } else {
    ThreadUtils.newDaemonSingleThreadExecutor(s"dispatcher-$name")
  }
  //创建实例时就启动线程池开始工作
  (1 to endpoint.threadCount()).foreach { _ =>
    threadpool.submit(receiveLoopRunnable)
  }
  //同样,在创建实例时标记inbox为激活状态
  //工作线程从active的inboxs队列中取出激活的inbox开始处理消息
  // Mark active to handle the OnStart message.
  setActive(inbox)
  //投递消息到inbox的消息队列中
  override def post(endpointName: String, message: InboxMessage): Unit = {
    require(endpointName == name)
    inbox.post(message)
    setActive(inbox)
  }
} 

到这里,消息一经放入到Inbox中,且当前的Inbox处于激活状态。那么在Inbox的工作线程里,就会对消息进行处理,也就是receiveLoopRunnable进行。这个变量属于MessageLoop这个类,是一个Runable实例,run方法执行了MessageLoop的receiveLoop.

[spark-core] org.apache.spark.rpc.netty.MessageLoop#receiveLoop

  private def receiveLoop(): Unit = {
    try {
      while (true) {
        try {
          //active是一个队列,表示哪些inbox有消息
          val inbox = active.take()
          //PoisonPill中断信号
          if (inbox == MessageLoop.PoisonPill) {
            // Put PoisonPill back so that other threads can see it.
            setActive(MessageLoop.PoisonPill)
            return
          }
          //处理消息
          inbox.process(dispatcher)
        } catch {
          case NonFatal(e) => logError(e.getMessage, e)
        }
      }
    } catch {...}
}

Inbox根据消息的类型同选择调用endpoint的方法。

对于我们的请求数据类型。进入到DriverEndpoint的receiveAndReply方法

[spark-core] org.apache.spark.scheduer.cluster.DriverEndpoint#

override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
  ...
  //获取SparkAppConfig
  case RetrieveSparkAppConfig(resourceProfileId) =>
    // note this will be updated in later prs to get the ResourceProfile from a
    // ResourceProfileManager that matches the resource profile id
    // for now just use default profile
    val rp = ResourceProfile.getOrCreateDefaultProfile(conf)
    val reply = SparkAppConfig(
          sparkProperties,
          SparkEnv.get.securityManager.getIOEncryptionKey(),
          Option(delegationTokens.get()),
          rp)
    context.reply(reply)
}

这里的context变量是对TransprtRequestHandler总处理请求的RpcResponseCallback的封装,也就是RemoteNettyRpcCallContext,

4.返回请求

这样多层的封装。我们看到,整个通信会有多种这样的对象封装对象,似乎表达的都是一个意思。但是这里的做法,是为了对上层使用屏蔽掉本地或者远程。对使用者透明,实现层进行多维度封装。

那么进行到这里,我们就了解到,最终返回是触发TransportRequestHandler中的定义的回调。

[common/network-common] org.apache.spark.network.server.TransportRequestHandler#processRpcRequest

   rpcHandler.receive(reverseClient, req.body().nioByteBuffer(), new RpcResponseCallback() {
        @Override
        public void onSuccess(ByteBuffer response) {
          //endpoint完成处理,写出返回值
          respond(new RpcResponse(req.requestId, new NioManagedBuffer(response)));
        }
      ...//
      });

[common/network-common] org.apache.spark.network.server.TransportRequestHandler#respond

/**
   * Responds to a single message with some Encodable object. If a failure occurs while sending,
   * it will be logged and the channel closed.
   */
  private ChannelFuture respond(Encodable result) {
    SocketAddress remoteAddress = channel.remoteAddress();
    //通过netty channel将数据返回client端
    return channel.writeAndFlush(result).addListener(future -> {
      if (future.isSuccess()) {
        logger.trace("Sent result {} to client {}", result, remoteAddress);
      } else {
        logger.error(String.format("Error sending result %s to %s; closing connection",
          result, remoteAddress), future.cause());
        channel.close();
      }
    });
   }
 }

注:基于Apache Spark 3.0

作者:pokerwu
本作品采用知识共享署名-非商业性使用 4.0 国际许可协议进行许可。

相关文章

网友评论

      本文标题:Apache Spark之Rpc(下)

      本文链接:https://www.haomeiwen.com/subject/kttnhktx.html