美文网首页
Spark学习笔记三_下 Spark 核心原理

Spark学习笔记三_下 Spark 核心原理

作者: BitGuo | 来源:发表于2019-10-29 17:24 被阅读0次

架构简要介绍在前几片博客提过了

Spark的消息通信原理

通信模块类图

首先看一下Spark的消息通信的类图


Spark通信类图

最核心的是左上角的虚线框的四个类
首先定义了RpcEnvFactoryRpcEnv两个抽象类,在RpcEnv中定义的是RPC通信框架的启动停止和关闭等抽象方法,在RpcEnvFactoty中定义的是创建的抽象方法。然后下面两个类分别使用Netty对继承的方法进行了实现,分别是NettyRpcEnvFactoryNettyRpcEnv
NettyRpcEnv中启动终端点的方法为setupEndpoint,在此方法中会将RpcEndPointRpcEndpointRef相互以键值对的形式存放在线程安全的ConcurrentHashMap中。
RpcEnv的object提供反射的方式来创建该类的一个静态实例。
在各模块进行通信的时候,需要调用这些类。比如MasterWorker等。会先使用RpcEnv的静态方法创建RpcEnv实例,然后实例化Master,其中Master是ThreadSafeRpcEndPoint的一个子类,所以创建的Master是一个线程安全的实例。接着通过先前创建的RpcEnv的实例来调用启动终端点方法,把Master终端点和其对于的Ref注册到RpcEnv中去,这一步是由方法setupEndpoint来实现的。
在通信的时候,其他对象获得了Master的终端店的Ref便可以发送消息给Master节点。

Spark启动时通信

意思就是启动过程中的通信,主要是MasterWorker节点之间的通信。

详细过程如下:

  1. Master启动后,随后启动各个Worker节点,Worker启动时和Master启动时流程类似,首先创建通信环境RpcEnv的静态对象,创建一个Worker EndPoint,随后就于Master通信,向Master发送一个注册Worker的消息RegisterWorker
    一个Worker可能回注册到多个Master中去,这时候需要在WorkertryRegisterAllMasters中创建注册线程池registerMasterThreadPool。然后启动注册线程去注册,每个注册过程:先获取Master的一个终端点EndPoint的引用然后调用registerWithMaster方法。

  2. Master收到注册消息,开始对Worker的消息进行验证和记录。注册成功则返回给Worker一个RegisteredWorker消息,否则发送RegisterWorkerFailed消息,worker收到消息后打印出错日志并结束Worker启动。
    验证过程:Master维持一张注册列表。接收到WorkerRegisterWorker消息后,Master首先对自己的状态进行验证,如果自己处于StandBy状态则忽略Worker的消息。否则在注册表中查询该Worker的节点,若找到了,则返回RegisterWorkerFailedWorker,否则调用registerWorker方法把该Worker加入注册表中。

  3. Worker收到成功注册的消息,先记录日志并更新Master消息,则之后会提供定时调度进程发送心跳信息HeartbeatMaster

Spark运行时消息通信


运行时存在几个对象:
客户端端点 client EndPoint,客户端的驱动程序 Driver,SparkContext,以及Worker中的Executor和worker endpoint,Master的端点Master endpoint。

这些对象之间的协作如下:
1.首先客户端执行应用程序会先创建一个Spark ContextSpark Context在启动过程中先实例化一个ScahdulerBackend对象,在standalone时是创建的SparkDeploySchedulerBackend对象,该对象是Driver EndPoint的子类,在该对象启动过程中又会创建Client Endpoint对象。
此时客户端的对象都创建结束。
之后Client EndPoint对象会有一个tryRegisterAllMasters方法,就和启动时通信中Worker持有的一样,Client EndPoint 向 Master发送注册应用的请求。

  1. Master收到client请求,在registerApplication方法中记录应用信息并把该应用加入到等待运行的应用列表中去。同时调用startExecutorOnWorker方法运行应用,在方法首先获取满足条件的worker节点(内存满足启动Executor所需,核数大于1),最后向该worker发送 LaunchExecutor消息。

  2. Worker收到Master请求后。第一步通过SPARK_EXECUTOR_DIRS环境变量创建Executor的执行目录当程序执行结束之后由Worker剔除。第二步实例化一个Executor对象,在该对象启动中会创建一个进程生成器ProcessBuilder,然后该生成器使用command创建CoarseGrainedExecutorBackend对象,该对象是Executor运行的容器。生成完成后,第三步,worker向Master返回ExecutorStateChanged消息告知Master,Executor容器创建完成。

  3. 3中创建的CoarseGrainedExecutorBackend对象的启动方法onStart会向DriverEndPoint发送Executor的注册消息。

  4. Driver EndPoint收到了注册消息RegisterExecutor之后会先判断8Executor*是否已经被注册过了,若是则返回注册失败消息,否则返回注册成功消息,并Driver终端记录该Executor并在makeOffers()中给该Executor分配运行任务资源,发后发生LaunchTask消息。

  5. CoarseGrainedExecutorBackend对象收到注册成功消息后,在CoarseGrainedExecutorBackend容器中实例化Executor对象,启动完毕后定时向Driver发送心跳信息,等待从DriverEndPoint终端点发送执行任务的消息。

  6. CoarseGrainedExecutorBackend对象接受LaunchTask消息后,调用Executor的launchTask,在执行时创建TaskRunner进程。处理完毕后发送StatusUpdate消息返回给CoarseGrainedExecutorBackendCoarseGrainedExecutorBackend对象将StatusUpdate返回给DriverEndPointDriverEndPoint收到消息调用taskSchedulerImplstatusUpdate*。

相关文章

网友评论

      本文标题:Spark学习笔记三_下 Spark 核心原理

      本文链接:https://www.haomeiwen.com/subject/raegvctx.html