美文网首页Spark
Spark RPC 通信机制

Spark RPC 通信机制

作者: wangdy12 | 来源:发表于2018-07-21 20:50 被阅读0次

    相关概念

    主要涉及RpcEnv,RpcEndpoint,RpcEndpointRef,其中RpcEnv是通信的基础,每个通信节点上都需要实现该类,其内部实现了消息的传输处理机制,RpcEndpoint表示一个可以接收RPC消息的对象,远程节点通过RpcEndpointRef向相应的RpcEndpoint发送消息

    RpcEnv

    RpcEnv 抽象类表示一个 RPC Environment,管理着整个RpcEndpoint的生命周期,目前唯一的实现类是NettyRpcEnv,具体功能是

    • 注册RpcEndpoint
    • 将来自 RpcEndpointRef的消息发送给相应的RpcEndpoint

    RpcEnv会在所有的通信节点上创建,例如master,worker,driver,executor都会创建一个RpcEnv

    Driver端上的RpcEnv在SparkEnv初始化时创建:

        val systemName = if (isDriver) driverSystemName else executorSystemName
        val rpcEnv = RpcEnv.create(systemName, bindAddress, advertiseAddress, port.getOrElse(-1), conf,
          securityManager, numUsableCores, !isDriver)
    

    RpcEnv.create内部通过工厂方法创建RpcEnv,具体是通过实现RpcEnvFactory接口的NettyRpcEnvFactory工厂类,创建RpcEnv的具体实现类NettyRpcEnv

      //指明该RpcEnv的名称,监听地址和端口
      def create(
          name: String,
          bindAddress: String,
          advertiseAddress: String,
          port: Int,
          conf: SparkConf,
          securityManager: SecurityManager,
          numUsableCores: Int,
          clientMode: Boolean): RpcEnv = {
        val config = RpcEnvConfig(conf, name, bindAddress, advertiseAddress, port, securityManager,
          numUsableCores, clientMode)
        new NettyRpcEnvFactory().create(config)
      }
    

    RpcEndpoint

    RpcEndPoint 代表具体的通信节点,例如Master、Worker、CoarseGrainedSchedulerBackend中的DriverEndpoint、CoarseGrainedExecutorBackend等,都实现了该接口,在具体的函数中定义了消息传递来时的处理逻辑,整个生命周期是constructor -> onStart -> receive* -> onStop,即调用构造函数,然后向RpcEnv注册,内部调用onStart,之后如果收到消息,RpcEnv会调用receive*方法,结束时调用onStop方法

    private[spark] trait RpcEndpoint {
      // 当前RpcEndpoint注册的RpcEnv
      val rpcEnv: RpcEnv
      // 获取该RpcEndpoint对应的RpcEndpointRef
      final def self: RpcEndpointRef
    
      // 处理RpcEndpointRef.send发送的消息
      def receive: PartialFunction[Any, Unit]
      // 处理RpcEndpointRef.ask发送的消息,通过RpcCallContext返回消息或异常
      def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit]
    
      //一系列的回调函数
      def onError(cause: Throwable): Unit
      def onConnected(remoteAddress: RpcAddress): Unit
      def onDisconnected(remoteAddress: RpcAddress): Unit
      def onNetworkError(cause: Throwable, remoteAddress: RpcAddress): Unit
      def onStart(): Unit
      def onStop(): Unit
    
      // 停止RpcEndpoint
      final def stop(): Unit 
    }
    

    它的子类是ThreadSafeRpcEndpoint,Spark中实现的Endpoint大多是继承这个类,应该线程安全的处理消息,即RpcEnv中的Dispatcher在处理该Endpoint对应的Inbox内的消息时,只能单线程处理消息,不能进行多线程同时处理多个消息

    RpcEndpointRef

    RpcEndPointRef 是对远程RpcEndpoint的一个引用,内部记录了RpcEndpoint的位置信息

    private[spark] abstract class RpcEndpointRef(conf: SparkConf)
      extends Serializable with Logging {
      // 最大重连次数(3),重新尝试的等待事件(3s),默认的超时事件(120s)
      private[this] val maxRetries = RpcUtils.numRetries(conf)
      private[this] val retryWaitMs = RpcUtils.retryWaitMs(conf)
      private[this] val defaultAskTimeout = RpcUtils.askRpcTimeout(conf)
    
      // 对应RpcEndpoint的地址,名称
      def address: RpcAddress
      def name: String
    
      // 发送一个消息
      def send(message: Any): Unit
      // 发送消息到相应的`RpcEndpoint.receiveAndReply`,异步
      def ask[T: ClassTag](message: Any, timeout: RpcTimeout): Future[T]
      // 发送消息到相应的`RpcEndpoint.receiveAndReply`,阻塞等待回复的结果
      def askSync[T: ClassTag](message: Any): T
      ....
    }
    

    地址表示

    • RpcAddress(host: String, port: Int):Rpc environment的地址
    • RpcEndpointAddress(rpcAddress: RpcAddress, name: String):RPC endpoint的地址,其中的rpcAddress指的是endpoint所在的RpcEnv地址,name指的是endpoint的名称

    NettyRpcEnv实现

    NettyRpcEnvFactory

    • 通过RpcEnvConfig创建NettyRpcEnv
    • 如果非clientMode,在特定的地址和端口上启动服务端startServer(bindAddress: String, port: Int)

    NettyRpcEnv

    内部涉及的部分字段和函数如下:

    private[netty] class NettyRpcEnv(
        val conf: SparkConf,
        //JavaSerializerInstance可以在多线性情况下运行
        javaSerializerInstance: JavaSerializerInstance,
        host: String,
        securityManager: SecurityManager,
        numUsableCores: Int) extends RpcEnv(conf) with Logging {
    
      private[netty] val transportConf : TransportConf 传输上下文的配置信息,其中默认的netty线程数目为8
      private val dispatcher: Dispatcher 消息分发器,负责将RPC消息发送到对应的endpoint
      private val streamManager : NettyStreamManager 用于文件传输
      private val transportContext : TransportContext 传输的核心,用来创建TransportServer和TransportClientFactory
      private val clientFactory : TransportClientFactory 用来创建TransportClient
      @volatile private var fileDownloadFactory: TransportClientFactory 用来创建用于file下载的TransportClient,使得与主RPC传输分离
      val timeoutScheduler : ScheduledThreadPoolExecutor 线程池,超时控制相关
      private[netty] val clientConnectionExecutor:ThreadPoolExecutor 客户端连接线程池,线程池默认最大线程数目64
      @volatile private var server: TransportServer
      // 向远程RpcAddress发送消息时,将消息放到相应的Outbox中即可
      private val outboxes = new ConcurrentHashMap[RpcAddress, Outbox]()
    
      // 在特定地址端口上启动服务 即创建一个TransportServer
      def startServer(bindAddress: String, port: Int): Unit
      //该RpcEnv监听的地址 地址+端口
      def address: RpcAddress
      //注册RpcEndpoint,返回RpcEndpointRef
      def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef
      //检索出对应的RpcEndpointRef
      def setupEndpointRef(address: RpcAddress, endpointName: String): RpcEndpointRef
      // 获取RpcEndpoint对应的RpcEndpointRef
      private[rpc] def endpointRef(endpoint: RpcEndpoint): RpcEndpointRef
      // 等待RpcEnv退出
      def awaitTermination(): Unit
    }
    

    Dispatcher

    进行消息的异步处理,内部有一个线程池,每个线程执行MessageLoop任务,不停将放置在阻塞队列中receivers中的EndpointData消息取出分发到相应的endpoint,如果为PoisonPill消息,关闭线程池

    其内部记录了该节点上所有的RpcEndpoint

      private val endpoints: ConcurrentMap[String, EndpointData]
      private val endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef]
      // 存储EndpointData数据的阻塞队列
      private val receivers = new LinkedBlockingQueue[EndpointData]
      // 创建一个线程名前缀名称为dispatcher-event-loop的线程池,默认线程数目是JVM可获取的核数与2的最大值,用来处理消息InboxMessage
      private val threadpool: ThreadPoolExecutor
    
      // 注册RpcEndpoint 将相关信息添加到endpoints和endpointRefs,receivers集合中
      def registerRpcEndpoint(name: String, endpoint: RpcEndpoint)
      // 是否存在该endpoint
      def verify(name: String): Boolean
    
      // 内部的线程池,如果没有指定线程数目,使用核数作为线程数目
      private val threadpool: ThreadPoolExecutor = {
        val availableCores =
          if (numUsableCores > 0) numUsableCores else Runtime.getRuntime.availableProcessors()
        val numThreads = nettyEnv.conf.getInt("spark.rpc.netty.dispatcher.numThreads",
          math.max(2, availableCores))
        val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
        for (i <- 0 until numThreads) {
          pool.execute(new MessageLoop)
        }
        pool
      }
    

    EndpointData

    每个endpoint都有一个对应的EndpointDataEndpointData内部包含了RpcEndpointNettyRpcEndpointRef信息,与一个Inbox,收信箱Inbox内部有一个InboxMessage链表,发送到该endpoint的消息,就是添加到该链表,同时将整个EndpointData添加Dispatcher到阻塞队列receivers中,由Dispatcher线程异步处理

    InboxMessageInbox内的消息,所有的RPC消息都继承自InboxMessage

    • OneWayMessage:不需要Endpoint回复的消息
    • RpcMessage:需要Endpoint回复的消息
    • OnStart:Inbox实例化后自动添加一个OnStart,用于通知对应的RpcEndpoint启动
    • OnStop:用于关闭对应的RpcEndpoint
    • RemoteProcessConnected、RemoteProcessDisconnected、RemoteProcessConnectionError:告诉所有的endpoints,远程连接状态相关的信息

    注册RpcEndpoint

    NettyRpcEnv内注册RpcEndpoint

      override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
        dispatcher.registerRpcEndpoint(name, endpoint)
      }
    

    Dispatcher.registerRpcEndpoint调用:

    • 创建RpcEndpointAddress,记录endpoint的地址,端口,名称
    • 创建一个对应的RpcEndpointRef
    • 创建一个EndpointData(内部的Inbox内在构造时会放入OnStart消息),放入endpoints缓存
    • 记录RpcEndpointRpcEndpointRef的映射关系到endpointRefs缓存
    • EndpointData放入阻塞队列receives,分发器异步调用对应的OnStart函数
    • 返回RpcEndpointRef

    RpcCallContext

    当发送的消息类型是RpcMessage时,需要回复消息,需要在其中封装NettyRpcCallContext,用来向客户端发送消息

    private[spark] trait RpcCallContext {
      // 回复消息给发送方
      def reply(response: Any): Unit
      // 回复失败给发送方
      def sendFailure(e: Throwable): Unit
      // 获取发送方地址
      def senderAddress: RpcAddress
    }
    

    NettyRpcCallContext为实现RpcCallContext接口的抽象类,有两个具体的实现类

    • LocalNettyRpcCallContext : 接收方和发送方在地址相同,即同一进程,直接通过Promise进行回调
    • RemoteNettyRpcCallContext : 不在一起的时候,使用远程连接式的回调

    TransportContext

    传输上下文TransportContext,内部包含传输配置信息TransportConf,以及对收到的RPC消息进行处理的RpcHandler,用来创建TransportServer和TransportClientFactory,底层依赖Netty实现

    Netty中的相关概念:
    每个Channel都有一个 ChannelPipeline,在Channel创建时会被自动创建

    • ChannelPipeline:内部有一个由ChannelHandlerContext组成的双向链表,每个ChannelHandlerContext对应一个ChannelHandler
    • ChannelHandler:处理I/O事件或拦截I/O操作,并将其转发到ChannelPipeline中的下一个ChannelHandler,子接口ChannelOutboundHandlerChannelInboundHandler分别用于处理发送和接收的I/O

    TransportConf

    传输上下文的配置信息,使用SparkTransportConf.fromSparkConf方法来构造

    内部实际使用的是一份克隆的SparkConf存储配置属性,默认分配给网络传输的IO线程数是系统可用处理器的数量,但线程数目最多为8,最终确定的线程数将被用于设置客户端传输线程数(spark.$module.io.clientThreads)和服务端传输线程数(spark.$module.io.serverThreads),此外还将spark.rpc.io.numConnectionsPerPeer属性设置为1

    内部包含大量io相关的配置属性,及其默认值,例如IO模式,缓存大小,线程数目等,属性名称为"spark." + 模块名称 + "." + 后缀,其中模块名称为rpc,在NettyRpcEnv中创建TransportConf时指定

    TransportClientFactory

    TransportContext.createClientFactory方法创建,是用来创建TransportClient的工厂类,内部包含一个连接池ConcurrentHashMap<SocketAddress, ClientPool> connectionPool,进行缓存,方便重复使用

    连接池中每个SocketAddress对应一个客户端池ClientPool,其内有一个TransportClient数组,数组大大小由spark.rpc.io.numConnectionsPerPeer指定,即本节点和远程节点建立的连接数目,默认为1

    TransportClientFactory构造函数中包含内部Netty客户端相关的配置,具体类型取决于ioMode:NIO或者EPOLL(Java NIO 在Linux下默认使用的就是epoll)

    • 事件处理:NioEventLoopGroup/EpollEventLoopGroup
    • 通道:NioSocketChannel/EpollSocketChannel
    • 缓存分配器:PooledByteBufAllocator

    每个TransportClient和一个远程地址通信,由TransportClientFactory创建,流程如下

    • 通过host和port构造未解析的远程连接地址InetSocketAddress
    • 从连接池connectionPool中获取该地址对应的ClientPool,为空则初始化一个客户端池
    • ClientPool中随机选取一个TransportClient
    • 如果TransportClient不为空并且处于活跃状态,更新该客户端的最后一次请求事件,直接返回该TransportClient
    • 否则创建一个新的TransportClient
    image

    Channel中处理 I/O 事件的ChanelHandler核心是TransportChannelHandler,此外还有编解码相关和空闲状态检查相关的handler

    TransportClientBootstrap

    TransportClientFactory创建Client成功连接到远程服务端以后,先执行引导程序,主要用来进行初始信息交互,例如SaslClientBootstrap进行SASL认证,完成后才会返回该新建的TransportClient

    TransportClient

    TransportClient内部包含一个通道Channel,以及一个TransportResponseHandler,此类用于向服务器发出请求,而TransportResponseHandler负责处理来自服务器的响应,是线程安全的,可以从多个线程调用

    client用来发送五种RequestMessage:ChunkFetchRequest、OneWayMessage、RpcRequest、StreamRequest、UploadStream

    • 发送RPC请求sendRpc:每个RpcRequest对应一个使用UUID生成的requestId,将requestId与对应的RpcResponseCallback映射关系记录到TransportResponseHandleroutstandingRpcs字段中,当收到返回消息时调用相应的回调,主要功能是通过Channel.writeAndFlush将RPC请求发送出去
    • 请求块数据fetchChunk:首先使用流的标记和块的索引创建StreamChunkId,向TransportResponseHandleroutstandingFetches添加索引与ChunkReceivedCallback回调的映射关系,使用channel.writeAndFlush发送ChunkFetchRequest请求,接收到服务器端的相应时,执行相应的回调
    • 请求流数据stream:在TransportResponseHandlerstreamCallbacks添加streamIdStreamCallback回调的映射关系,发送StreamRequest请求,收到响应时执行对应的回调
    • 发送不需要回复的数据send,直接通过Channel.writeAndFlush发送OneWayMessage
    • 发送流数据uploadStream:发送UploadStream数据流到远端

    TransportServerBootstrap

    当客户端和服务器建立连接后,在服务端对应的管道上运行的引导程序

    TransportServer

    RPC框架的服务端,只要RpcEnvConfig.clientMode不为ture,都会启动服务

    调用TransportServer.startServer启动服务,通过TransportContext.createServer创建服务端,然后内部会向NettyRpcEnvdispatcher中注册本身的RpcEndpoint,名称为endpoint-verifier,类型为RpcEndpointVerifier

    它的作用是,当远程节点需要创建该RpcEnv上的Endpoint的一个引用时(setupEndpointRef方法),因为每个RpcEnv上都有RpcEndpointVerifier,所以远端可以直接创建一个RpcEndpointVerifier对应的ref,通过它发送CheckExistence(name: String)消息,查询该dispatcher内部的endpoints缓存中是否存在的名称为name的endpoint,从而确定是否可以创建该RpcEndpointRef

    TransportServer内部包含的变量如下:

    • context:传输上下文TransportContext,用来配置channelHandler
    • conf:传输配置TransportContext
    • appRpcHandler:RPC请求处理器RpcHandler
    • bootstraps:TransportServerBootstrap类型,当客户端连接到服务器端时,在通道创建时执行的引导程序

    其创建过程就是标准的netty服务端创建方式,对于已经链接进来的client Chanel,ChanelHandler的配置和客户端配置类似

    RpcHandler

    TransportClient.sendRpc发送的RPC消息进行处理,内部通过Dispatcher将收到的RPC分发到对应的Endpoint

    // RpcHandler部分定义
    public abstract class RpcHandler {
      // 接收一个RPC消息,具体逻辑执行由子类实现,处理完成后通过RpcResponseCallback回调,如果不需要回调返回消息的,传入参数为OneWayRpcCallback,只打印日志
      public abstract void receive(
          TransportClient client,
          ByteBuffer message,
          RpcResponseCallback callback);
    
      // 获取StreamManager
      public abstract StreamManager getStreamManager();
      // Channel处于活跃状态时调用
      public void channelActive(TransportClient client) { }
      // 非活跃状态时调用
      public void channelInactive(TransportClient client) { }
      // 产生异常时调用
      public void exceptionCaught(Throwable cause, TransportClient client) { }
      ...
    }
    

    NettyStreamManager用于提供NettyRpcEnv的文件流服务,可以将文件,目录和jar包注册到其中,然后根据请求,将相应文件的信息封装为FileSegmentManagedBuffer,可以用来处理StreamRequest类型的消息

    管道初始化

    管道初始化过程中都使用了TransportContext.initializePipeline创建的TransportChannelHandler

    TransportChannelHandler是单个传输层的通道handler,用于将请求委派给TransportRequestHandler并响应TransportResponseHandler。在传输层中创建的所有通道都是双向的。当客户端使用RequestMessage发送给Netty通道(由服务器的RequestHandler处理)时,服务器将生成ResponseMessage(由客户端的ResponseHandler处理)。但是,服务器也会在同一个Channel上获取句柄,因此它可能会向客户端发送RequestMessages。这意味着客户端还需要一个RequestHandler,而Server需要一个ResponseHandler,用于客户端对服务器请求的响应进行响应。

    此类还处理来自io.netty.handler.timeout.IdleStateHandler的超时信息。如果存在未完成的提取fetch或RPC请求但是在“requestTimeoutMs”时间内通道上没有的流量,我们认为连接超时。注意这是双工通道;如果客户端不断发送但是没有响应,为简单起见不认为是超时

    TransportChannelHandler内部使用MessageHandler处理Message,其中MessageHandler有两种类型,分别用来处理客户端请求/处理服务端的响应,Message共有10种类型

    TransportRequestHandler

    处理客户端的五种请求信息RequestMessage,内部包含RpcHandler处理RPC信息,TransportClient用来和请求方通信

    • RpcRequest:通过RpcHandler接收请求,实际类型为NettyRpcHandler,这里将ByteBuffer类型的RPC请求转换为RequestMessage,加入到对应endpoint的inbox内,由Dispatcher负责处理,通过RpcResponseCallback回复RpcResponse/RpcFailure
    • OneWayMessage:类似前一种情况,最后不对客户端进行回复
    • ChunkFetchRequest:通过StreamManager.getChunk处理要请求的数据块,同时返回结果描述ChunkFetchSuccess/ChunkFetchFailure
    • StreamRequest:通过StreamManager.openStream获取请求的流数据,最后返回结果StreamResponse/StreamFailure给客户端
    • UploadStream:处理客户端上传的流

    SaslServerBootstrap类型的引导首先会对服务器端的RPCHandler进行代理,与客户端进行认证交互,认证成功后,将加解密的Handler添加到通道的Pipeline中,后续的消息交给被代理的RPCHandler进行代理

    TransportResponseHandler

    处理服务端对请求的响应,内部会记录需要回复的请求的ID,以及对应的callback函数,一共由六种ResponseMessage,根据消息类型和ID,执行响应的回调

    • ChunkFetchFailure
    • ChunkFetchSuccess
    • RpcFailure
    • RpcResponse
    • StreamFailure
    • StreamResponse

    其他ChannelHandler

    • MessageEncoder:对Message进行编码,frame格式如下
    length(long类型,消息长度,8字节)|message type(消息类型,1个字节)|message meta(消息元数据,例如RpcRequest消息的元数据长度为12,包括requestId和消息体长度bodysize)|message body(消息体,具体的信息,例如ChunkFetchSuccess中的块数据)
    
    • MessageDecoder:解码,从中获取消息类型,和消息内容,创建对应的消息
    • TransportFrameDecoder:处理TCP中的粘包拆包,得到一个完整的消息,内部主要依靠的就是每个frame前8个字节,表示整个frame的长度
    • IdleStateHandler:实现心跳功能,触发IdleStateEvent事件,当通道内没有执行读取,写入操作时,底层通过向线程任务队列中添加定时任务,如果空闲超时,则会触发TransportChannelHandler.userEventTriggered方法。在这个方法如果确认超时,会关闭通道

    Outbox

    可以理解为发出消息的盒子,每个地址对应个盒子

    NettyRpcEnv中outboxes : ConcurrentHashMap[RpcAddress, Outbox]字段,每个远程RpcAddress对应一个Outbox,OutBox其内部包含一个OutboxMessage的链表,所有向远端发送的消息都要封装为OutboxMessage

    调用Outbox.send方法发送消息时,将消息添加到OutboxMessage链表中,如果远程连接还未建立,会先通过NettyRpcEnv中的clientConnectionExecutor线程池执行建立连接的任务,即创建特定RpcAddress上的TransportClient,然后发送消息

    OutboxMessage有两个子类,OneWayOutboxMessageRpcOutboxMessage,表明不会有回复和存在回复两种消息类型,分别对应调用RpcEndpoint的receivereceiveAndReply方法,当TransportClient发送消息时,如果Message是RpcOutboxMessage,先会创建一个UUID,底层TransportResponseHandler维护一个发送消息ID与其Callback的HashMap,当Netty收到完整的远程RpcResponse时候,做反序列化,回调相应的Callback,进而执行Spark中的业务逻辑,即Promise/Future的响应

    Rpc底层通信框架

    实际流程分析

    当通过RpcEndpointRef发送需要回复的消息时:

    • RpcEndpointRef.ask
    • NettyRpcEndpointRef.ask 构建RequestMessage(senderAddress,receiver,message content)

    如果远程地址与当前NettyRpcEnv相同:

    • NettyRpcEnv.ask,创建一个Promise对象,设定Future完成后的回调
    • Dispatcher.postLocalMessage,构建RpcMessage和回调上下文LocalNettyRpcCallContext
    • Dispatcher.postMessage,将RequestMessage消息重构成RpcMessage,放置到对应endpoint的inbox内,dispatcher内部的线程池会取出消息,然后根据消息类型,执行不同的操作,调用endpoint的receiveAndReply,内部回调RpcCallContext.reply返回结果

    如果需要连接远程地址时:

    • NettyRpcEnv.ask,将RequestMessage序列化,构造一个RpcOutboxMessage,设定消息成功和失败时对应的的回调
    • NettyRpcEnv.postToOutbox,如果尚未创建对应的TransportClient,将消息放入目的地址相应的Outbox,必要时新建Outbox以及对应的TransportClient
    • 通过message.sendWith(client)调用TransportClient.sendRpc发送RpcRequest消息,记录该requestId对应的回调RpcResponseCallback,用来收到回复后调用

    上述步骤都是异步执行的,当将消息放置到相应位置后,就会返回,然后:

    • 使用timeoutScheduler设定一个计时器,用于超时处理,超时抛出TimeoutException异常

    在Spark 1.6之前,底层使用的Akka进行PRC,它是基于Actor的RPC通信系统,但是无法适用大的package/stream的数据传输,所以还有Netty通信框架,所以将两套通信框架合并统一使用netty,并且akka使用时版本必须保证一致,否则会出现很多问题。但是RpcEnv参照了Akka的思路,内部原理基本一致,都是按照MailBox的设计思路来实现的

    image

    参考

    其他Spark源码分析,记录在GitBook

    相关文章

      网友评论

        本文标题:Spark RPC 通信机制

        本文链接:https://www.haomeiwen.com/subject/rnlkmftx.html