美文网首页
Flink源码-集群启动之JobManager的启动

Flink源码-集群启动之JobManager的启动

作者: 飞_侠 | 来源:发表于2022-02-13 20:48 被阅读0次

    主要内容

    首先要熟悉flink架构基础知识,了解client、jobmanager、taskmanager的基本用途;本文通过源码角度分析各个组件的启动流程深入理解client、jobmanager、taskmanager的功能和之间的交互
    首先集群启动脚本分析,也是便于我们从源头找到阅读flink源码的流程。

    集群启动脚本start-cluster

    首先集群启动时会执行start-cluster脚本,如下:

    $ ./bin/start-cluster.sh
    

    start-cluster核心代码如下:

    "${FLINK_BIN_DIR}"/jobmanager.sh start "${master}" "${webuiport}"
    

    如上述会调用jobmanager.sh脚本

         ENTRYPOINT=standalonesession
    
       "${FLINK_BIN_DIR}"/flink-daemon.sh $STARTSTOP $ENTRYPOINT "${args[@]}"
    

    jommanager.sh脚本中会调用Flink-daemon.sh脚本

        (standalonesession)
            CLASS_TO_RUN=org.apache.flink.runtime.entrypoint.StandaloneSessionClusterEntrypoint 
    

    jobmanager启动的入口类

    jobmanager.sh脚本中会调用启动的入口类StandaloneSessionClusterEntrypoint,分析如下:

    jobmanager功能概述

    概述:jobmanager启动过程首先初始化各种服务(initializeServices(configuration, pluginManager)),包括haServices,blobServer,heartbeatServices,metricRegistry,archivedExecutionGraphStore,之后会启动三大组件WebMonitorEndpoint、ResourceManager、DispatcherRunner;WebMonitorEndpoint主要功能接收来自客户端的请求服务,然后转发到不同的handler处理,比如提交作业submitJob会调用SubmitJobHandler处理。;ResourceManager负责资源的分配和管理;Dispatcher 负责用于接收作业的提交,并进行持久化,生成要执行的作业管理器任务,并在主任务失败时进行恢复,还会会拉起一个JobMaster。

    启动过程涉及的核心类及方法

    StandaloneSessionClusterEntrypoint.main()->ClusterEntrypoint.runCluster()
    具体过程:

    public static void main(String[] args) {
            // startup checks and logging
            EnvironmentInformation.logEnvironmentInfo(
                    LOG, StandaloneSessionClusterEntrypoint.class.getSimpleName(), args);
            SignalHandler.register(LOG);
            JvmShutdownSafeguard.installAsShutdownHook(LOG);
            //获取配置
            final EntrypointClusterConfiguration entrypointClusterConfiguration =
                    ClusterEntrypointUtils.parseParametersOrExit(
                            args,
                            new EntrypointClusterConfigurationParserFactory(),
                            StandaloneSessionClusterEntrypoint.class);
            Configuration configuration = loadConfiguration(entrypointClusterConfiguration);
    
            StandaloneSessionClusterEntrypoint entrypoint =
                    new StandaloneSessionClusterEntrypoint(configuration);
          //启动集群
            ClusterEntrypoint.runClusterEntrypoint(entrypoint);
        }
    

    执行main方法启动集群jobmanager,ClusterEntrypoint.runClusterEntrypoint(entrypoint);

    最终会调用ClusterEntrypoint的runCluster()方法:

      private void runCluster(Configuration configuration, PluginManager pluginManager)
                throws Exception {
            synchronized (lock) {
                /**
                *初始化各种服务(6个):
                *commonRpcService 一个基于akka得actorSystem,内部会启动一个actor;主节点启动完会自己给自己发送一个hello消息,最终是有这个service处理的。
                *ioExecutor 专门处理IO的服务。
                *haServices 具体是哪种高可用实现是读取flink-conf.yml配置(high-availability = zookeeper),一般是创建一个zookeeperHaService内部主要使用curator框架。
                *blobServer做大文件的传送服务比如用户作业的jar包,TM上传log文件等
                 *heartbeatServices
                */
                initializeServices(configuration, pluginManager);
    
                // write host information into configuration
                configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.getAddress());
                configuration.setInteger(JobManagerOptions.PORT, commonRpcService.getPort());
    
                final DispatcherResourceManagerComponentFactory
                        dispatcherResourceManagerComponentFactory =
                                createDispatcherResourceManagerComponentFactory(configuration);
                //创建组件
                clusterComponent =
                        dispatcherResourceManagerComponentFactory.create(
                                configuration,
                                ioExecutor,
                                commonRpcService,
                                haServices,
                                blobServer,
                                heartbeatServices,
                                metricRegistry,
                                archivedExecutionGraphStore,
                                new RpcMetricQueryServiceRetriever(
                                        metricRegistry.getMetricQueryServiceRpcService()),
                                this);
    
            }
        }
    

    调用DefaultDispatcherResourceManagerComponentFactory.create()创建WebMonitorEndpoint、ResourceManager、Dispatcher;代码如下:

     public DispatcherResourceManagerComponent create(
                Configuration configuration,
                Executor ioExecutor,
                RpcService rpcService,
                HighAvailabilityServices highAvailabilityServices,
                BlobServer blobServer,
                HeartbeatServices heartbeatServices,
                MetricRegistry metricRegistry,
                ArchivedExecutionGraphStore archivedExecutionGraphStore,
                MetricQueryServiceRetriever metricQueryServiceRetriever,
                FatalErrorHandler fatalErrorHandler)
                throws Exception {
    
            LeaderRetrievalService dispatcherLeaderRetrievalService = null;
            LeaderRetrievalService resourceManagerRetrievalService = null;
            /**
             * TODO 创建三大组件
             */
            WebMonitorEndpoint<?> webMonitorEndpoint = null;
            ResourceManager<?> resourceManager = null;
            DispatcherRunner dispatcherRunner = null;
            ......
          ///返回DispatcherRestEndpoint类型的对象
            webMonitorEndpoint = restEndpointFactory.createRestEndpoint(
                                configuration,
                                dispatcherGatewayRetriever,
                                resourceManagerGatewayRetriever,
                                blobServer,
                                executor,
                                metricFetcher,
                                highAvailabilityServices.getClusterRestEndpointLeaderElectionService(),
                                fatalErrorHandler);
    
                log.debug("Starting Dispatcher REST endpoint.");
                //TODO 启动的关键方法-重点分析
                webMonitorEndpoint.start();
    
    resourceManager = resourceManagerFactory.createResourceManager(
                                configuration,
                                ResourceID.generate(),
                                rpcService,
                                highAvailabilityServices,
                                heartbeatServices,
                                fatalErrorHandler,
                                new ClusterInformation(hostname, blobServer.getPort()),
                                webMonitorEndpoint.getRestBaseUrl(),
                                metricRegistry,
                                hostname,
                                ioExecutor);
    
     dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
                                highAvailabilityServices.getDispatcherLeaderElectionService(),
                                fatalErrorHandler,
                                new HaServicesJobGraphStoreFactory(highAvailabilityServices),
                                ioExecutor,
                                rpcService,
                                partialDispatcherServices);
    
                log.debug("Starting ResourceManager.");
                resourceManager.start();
    
    
    

    如上首先初始化各种服务,之后创建三大组件WebMonitorEndpoint、ResourceManager、DispatcherRunner。

    分析WebMonitorEndpoint、ResourceManager、DispatcherRunner组件启动过程

    WebMonitorEndpoint启动

    WebMointorEndpoint类图:

    image.png

    调用从父类RestServerEndpoint继承的start()方法,start()方法主要作用有3个:1初始化各个handler 2.启动netty服务 3,进行leader选举.

    /**
         * Starts this REST server endpoint.
         *
         * @throws Exception if we cannot start the RestServerEndpoint
         */
        public final void start() throws Exception {
          .....
           //1.初始化handler  调用DispatcherRestEndpoint对象的initializeHandlers方法
          handlers = initializeHandlers(restAddressFuture);
          //2.启动netty引导器 ,ServerBootstrap
          bootstrap = new ServerBootstrap();
          //3.启动WebMonitEndpoint  leader选举
          startInternal();
          .....
    
        }
    

    调用DispatcherRestEndpoint类的initializeHandlers()方法初始化handlers:

    protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(
                final CompletableFuture<String> localAddressFuture) {
    
    
            /**
             *         //TODO  ChannelInboundHandler channelRead0()方法,这个方法会自动被Netty去调用执行
             *         不管初始的哪个handler,里面都有一个 handlerRequest的方法
             *         channelRead0()的底层,最终调用的就是handler.handleRequest()方法,
             *         当我们提交job的时候,最终由WebMonitorEndpoint 接收到,跳转到JobSubmitHandler来执行
             *         最终执行请求的就是handle Request()方法
             */
            List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers =
                  //调用父类WebMonitorEndpoint的方法
                    super.initializeHandlers(localAddressFuture);
    
            // Add the Dispatcher specific handlers
    
            final Time timeout = restConfiguration.getTimeout();
            //处理用户提交作业请求的handler
            JobSubmitHandler jobSubmitHandler =
                    new JobSubmitHandler(
                            leaderRetriever, timeout, responseHeaders, executor, clusterConfiguration);
    
            handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler));
    
            return handlers;
        }
    

    调用WebMonitorEndpoint类的startInternal() :

    
    /**
    *启动leader选举
    *
    */
     public void startInternal() throws Exception {
    
          //执行leader选举
            leaderElectionService.start(this);
            startExecutionGraphCacheCleanupTask();
    
            if (hasWebUI) {
                log.info("Web frontend listening at {}.", getRestBaseUrl());
            }
        }
    
    

    leaderElectionService.start()方法在Flink代码中会经常看到,一般在需要leader选举时都会调用。主要使用Curator框架,leader选举成功将会回调当前类的grantLeadership()方法。
    未完待续,下篇将继续分析Dispatcher的创建流程及作用。

    相关文章

      网友评论

          本文标题:Flink源码-集群启动之JobManager的启动

          本文链接:https://www.haomeiwen.com/subject/wcsxlrtx.html