美文网首页
Spark学习笔记三_下 Spark 核心原理

Spark学习笔记三_下 Spark 核心原理

作者: BitGuo | 来源:发表于2019-10-29 17:24 被阅读0次

    架构简要介绍在前几片博客提过了

    Spark的消息通信原理

    通信模块类图

    首先看一下Spark的消息通信的类图


    Spark通信类图

    最核心的是左上角的虚线框的四个类
    首先定义了RpcEnvFactoryRpcEnv两个抽象类,在RpcEnv中定义的是RPC通信框架的启动停止和关闭等抽象方法,在RpcEnvFactoty中定义的是创建的抽象方法。然后下面两个类分别使用Netty对继承的方法进行了实现,分别是NettyRpcEnvFactoryNettyRpcEnv
    NettyRpcEnv中启动终端点的方法为setupEndpoint,在此方法中会将RpcEndPointRpcEndpointRef相互以键值对的形式存放在线程安全的ConcurrentHashMap中。
    RpcEnv的object提供反射的方式来创建该类的一个静态实例。
    在各模块进行通信的时候,需要调用这些类。比如MasterWorker等。会先使用RpcEnv的静态方法创建RpcEnv实例,然后实例化Master,其中Master是ThreadSafeRpcEndPoint的一个子类,所以创建的Master是一个线程安全的实例。接着通过先前创建的RpcEnv的实例来调用启动终端点方法,把Master终端点和其对于的Ref注册到RpcEnv中去,这一步是由方法setupEndpoint来实现的。
    在通信的时候,其他对象获得了Master的终端店的Ref便可以发送消息给Master节点。

    Spark启动时通信

    意思就是启动过程中的通信,主要是MasterWorker节点之间的通信。

    详细过程如下:

    1. Master启动后,随后启动各个Worker节点,Worker启动时和Master启动时流程类似,首先创建通信环境RpcEnv的静态对象,创建一个Worker EndPoint,随后就于Master通信,向Master发送一个注册Worker的消息RegisterWorker
      一个Worker可能回注册到多个Master中去,这时候需要在WorkertryRegisterAllMasters中创建注册线程池registerMasterThreadPool。然后启动注册线程去注册,每个注册过程:先获取Master的一个终端点EndPoint的引用然后调用registerWithMaster方法。

    2. Master收到注册消息,开始对Worker的消息进行验证和记录。注册成功则返回给Worker一个RegisteredWorker消息,否则发送RegisterWorkerFailed消息,worker收到消息后打印出错日志并结束Worker启动。
      验证过程:Master维持一张注册列表。接收到WorkerRegisterWorker消息后,Master首先对自己的状态进行验证,如果自己处于StandBy状态则忽略Worker的消息。否则在注册表中查询该Worker的节点,若找到了,则返回RegisterWorkerFailedWorker,否则调用registerWorker方法把该Worker加入注册表中。

    3. Worker收到成功注册的消息,先记录日志并更新Master消息,则之后会提供定时调度进程发送心跳信息HeartbeatMaster

    Spark运行时消息通信


    运行时存在几个对象:
    客户端端点 client EndPoint,客户端的驱动程序 Driver,SparkContext,以及Worker中的Executor和worker endpoint,Master的端点Master endpoint。

    这些对象之间的协作如下:
    1.首先客户端执行应用程序会先创建一个Spark ContextSpark Context在启动过程中先实例化一个ScahdulerBackend对象,在standalone时是创建的SparkDeploySchedulerBackend对象,该对象是Driver EndPoint的子类,在该对象启动过程中又会创建Client Endpoint对象。
    此时客户端的对象都创建结束。
    之后Client EndPoint对象会有一个tryRegisterAllMasters方法,就和启动时通信中Worker持有的一样,Client EndPoint 向 Master发送注册应用的请求。

    1. Master收到client请求,在registerApplication方法中记录应用信息并把该应用加入到等待运行的应用列表中去。同时调用startExecutorOnWorker方法运行应用,在方法首先获取满足条件的worker节点(内存满足启动Executor所需,核数大于1),最后向该worker发送 LaunchExecutor消息。

    2. Worker收到Master请求后。第一步通过SPARK_EXECUTOR_DIRS环境变量创建Executor的执行目录当程序执行结束之后由Worker剔除。第二步实例化一个Executor对象,在该对象启动中会创建一个进程生成器ProcessBuilder,然后该生成器使用command创建CoarseGrainedExecutorBackend对象,该对象是Executor运行的容器。生成完成后,第三步,worker向Master返回ExecutorStateChanged消息告知Master,Executor容器创建完成。

    3. 3中创建的CoarseGrainedExecutorBackend对象的启动方法onStart会向DriverEndPoint发送Executor的注册消息。

    4. Driver EndPoint收到了注册消息RegisterExecutor之后会先判断8Executor*是否已经被注册过了,若是则返回注册失败消息,否则返回注册成功消息,并Driver终端记录该Executor并在makeOffers()中给该Executor分配运行任务资源,发后发生LaunchTask消息。

    5. CoarseGrainedExecutorBackend对象收到注册成功消息后,在CoarseGrainedExecutorBackend容器中实例化Executor对象,启动完毕后定时向Driver发送心跳信息,等待从DriverEndPoint终端点发送执行任务的消息。

    6. CoarseGrainedExecutorBackend对象接受LaunchTask消息后,调用Executor的launchTask,在执行时创建TaskRunner进程。处理完毕后发送StatusUpdate消息返回给CoarseGrainedExecutorBackendCoarseGrainedExecutorBackend对象将StatusUpdate返回给DriverEndPointDriverEndPoint收到消息调用taskSchedulerImplstatusUpdate*。

    相关文章

      网友评论

          本文标题:Spark学习笔记三_下 Spark 核心原理

          本文链接:https://www.haomeiwen.com/subject/raegvctx.html