美文网首页
Flink源码-集群启动之JobManager启动续

Flink源码-集群启动之JobManager启动续

作者: 飞_侠 | 来源:发表于2022-02-19 22:25 被阅读0次

Dispatcher启动

Dispatcher启动流程嵌套较多,主要会启动Dispatcher组件,同时拉起jobmaster;附上详细的嵌套图供参考:


image.png

首先接上篇,

DefaultDispatcherResourceManagerComponentFactory中创建dispatcherRunner

 dispatcherRunner =
                    dispatcherRunnerFactory.createDispatcherRunner(
                            highAvailabilityServices.getDispatcherLeaderElectionService(),
                            fatalErrorHandler,
                            new HaServicesJobGraphStoreFactory(highAvailabilityServices),
                            ioExecutor,
                            rpcService,
                            partialDispatcherServices);

最终经过各种套娃,创建DispatcherRunnerLeaderElectionLifecycleManager对象:

  private DispatcherRunnerLeaderElectionLifecycleManager(
            T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
        this.dispatcherRunner = dispatcherRunner;
        this.leaderElectionService = leaderElectionService;
          /**
        * han_pf
        *执行leader选举,选举成功调用dispatcherRunner的leaderContender.grantLeadership方法
         *jm启动的三大组件都需要进行leader选举,
         *
        */
        leaderElectionService.start(dispatcherRunner);
    }
image.png

此处选举成功后会调用DefaultDispatcherRunner.grantLeadership(UUID leaderSessionID)方法,执行选举后的逻辑。

DefaultDispatcherRunner:

    @Override
    public void grantLeadership(UUID leaderSessionID) {
        runActionIfRunning(
                () -> {
                    LOG.info(
                            "{} was granted leadership with leader id {}. Creating new {}.",
                            getClass().getSimpleName(),
                            leaderSessionID,
                            DispatcherLeaderProcess.class.getSimpleName());
                    startNewDispatcherLeaderProcess(leaderSessionID);
                });
    }

DefaultDispatcherRunner会创建出一个DispatcherLeaderProcess然后启动

  private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
        /**
        * han_pf
        *停止已有的,异步操作
        */
        stopDispatcherLeaderProcess();
        /**
        * han_pf
        *创建新的,只是一个对象的创建,继续往下看start方法
        */
        dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);

        final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
        FutureUtils.assertNoException(
                previousDispatcherLeaderProcessTerminationFuture.thenRun(
                        newDispatcherLeaderProcess::start));
    }

如上会调用

AbstractDispatcherLeaderProcess的实现类的onStart方法,

    @Override
    public final void start() {
        runIfStateIs(State.CREATED, this::startInternal);
    }

    private void startInternal() {
        log.info("Start {}.", getClass().getSimpleName());
        state = State.RUNNING;
       /**
        * han_pf
        *执行实现类的onStart方法,实现类有两个SessionDispatcherLeaderProcess和JobDispatcherLeaderProcess,
         * 次数是session模式,所以看SessionDispatcherLeaderProcess
        */
        onStart();
    }

SessionDispatcherLeaderProcess:

    @Override
    protected void onStart() {

        /**
        * han_pf
        *jobGraphStore服务的启动,其实启动一个监听,在主节点迁移时恢复jobGraph执行
        */
        startServices();

        onGoingRecoveryOperation =
                recoverJobsAsync()
                        /**
                        * han_pf
                        *createDispatcherIfRunning 核心方法
                        */
                        .thenAccept(this::createDispatcherIfRunning)
                        .handle(this::onErrorIfRunning);
    }

   /**
    * han_pf
    *一路在创建
    */
    private void createDispatcher(Collection<JobGraph> jobGraphs) {

        final DispatcherGatewayService dispatcherService =
                dispatcherGatewayServiceFactory.create(
                        DispatcherId.fromUuid(getLeaderSessionId()), jobGraphs, jobGraphStore);

        completeDispatcherSetup(dispatcherService);
    }

最终调用DefaultDispatcherGatewayServiceFactory.create方法创建并启动dispatcher

RpcEndpoint

 public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
            DispatcherId fencingToken,
            Collection<JobGraph> recoveredJobs,
            JobGraphWriter jobGraphWriter) {

        final Dispatcher dispatcher;
        try {
            dispatcher =
                    dispatcherFactory.createDispatcher(
                            rpcService,
                            fencingToken,
                            recoveredJobs,
                            (dispatcherGateway, scheduledExecutor, errorHandler) ->
                                    new NoOpDispatcherBootstrap(),
                            PartialDispatcherServicesWithJobGraphStore.from(
                                    partialDispatcherServices, jobGraphWriter));
        } catch (Exception e) {
            throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
        }
        /**
        * han_pf
        * 真正的启动
        */
        dispatcher.start();

        return DefaultDispatcherGatewayService.from(dispatcher);
    }

其中Dispatcher是一个RpcEndpoint,构造方法执行完后将回调onStart()方法


image.png

Dispatcher:

@Override
    public void onStart() throws Exception {
        try {
        /**
        * han_pf
        *注册服务-metrics
        */
            startDispatcherServices();
        } catch (Throwable t) {
            final DispatcherException exception =
                    new DispatcherException(
                            String.format("Could not start the Dispatcher %s", getAddress()), t);
            onFatalError(exception);
            throw exception;
        }
        /**
        * han_pf
        *   恢复原有任务,
         * 启动dispatcher,
         * 拉起jobmaster
        */
        startRecoveredJobs();

        this.dispatcherBootstrap =
                this.dispatcherBootstrapFactory.create(
                        getSelfGateway(DispatcherGateway.class),
                        this.getRpcService().getScheduledExecutor(),
                        this::onFatalError);
    }

onstart()方法会首先注册metrics服务,之后执行startRecoveredJobs()进行已有任务的恢复。同时Dispatcher也会接收客户端新提交的任务,根据任务信息拉起一个新的jobMaster负责任务的执行。

 private void runRecoveredJob(final JobGraph recoveredJob) {
     checkNotNull(recoveredJob);
     try {
         runJob(recoveredJob, ExecutionType.RECOVERY);
     } catch (Throwable throwable) {
         onFatalError(
                 new DispatcherException(
                         String.format(
                                 "Could not start recovered job %s.", recoveredJob.getJobID()),
                         throwable));
     }
 }



/**
 * han_pf
 * 客户端新提交任务是也会通过webMonitorEndpoint的handler处理后转发到Dispatcher,最终也会执行到此方法。
 */
 private void runJob(JobGraph jobGraph, ExecutionType executionType) throws Exception {
     Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
     long initializationTimestamp = System.currentTimeMillis();
     JobManagerRunner jobManagerRunner =
             createJobManagerRunner(jobGraph, initializationTimestamp);


     //TODO 每个任务,对应一个jobManagerRunner
     runningJobs.put(jobGraph.getJobID(), jobManagerRunner);



JobManagerRunner createJobManagerRunner(JobGraph jobGraph, long initializationTimestamp)
         throws Exception {
     final RpcService rpcService = getRpcService();
/**
     * han_pf
     * 只是对象的创建和需要的工厂类的赋值,jobMasterServiceFactory,SlotPoolServiceSchedulerFactory,jobManagerLeaderElectionService等
      * 重要看start()方法
     *jobManagerRunner实现类JobMasterServiceLeadershipRunner
     */
     JobManagerRunner runner =
             jobManagerRunnerFactory.createJobManagerRunner(
                     jobGraph,
                     configuration,
                     rpcService,
                     highAvailabilityServices,
                     heartbeatServices,
                     jobManagerSharedServices,
                     new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
                     fatalErrorHandler,
                     initializationTimestamp);
 //
     runner.start();
     return runner;
 }      




 }

JobMasterServiceLeadershipRunner:

 @Override
    public void start() throws Exception {
        LOG.debug("Start leadership runner for job {}.", getJobID());
        /**
        * han_pf
        * 又看到熟悉的leader选举方法
        */
        leaderElectionService.start(this);
    }

   @Override
    public void grantLeadership(UUID leaderSessionID) {
        runIfStateRunning(
                () -> startJobMasterServiceProcessAsync(leaderSessionID),
                "starting a new JobMasterServiceProcess");
    }

    @GuardedBy("lock")
    private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
        sequentialOperation =
                sequentialOperation.thenRun(
                        () ->
                                runIfValidLeader(
                                        leaderSessionId,
                                        ThrowingRunnable.unchecked(
                                                () ->
                                                    /**
                                                        * han_pf
                                                        * 关键创建jobmaster并启动
                                                        */
                                                        verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
                                                                leaderSessionId)),
                                        "verify job scheduling status and create JobMasterServiceProcess"));

        handleAsyncOperationError(sequentialOperation, "Could not start the job manager.");
    }

    @GuardedBy("lock")
    private void verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)
            throws FlinkException {
        final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus =
                getJobSchedulingStatus();

        if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE) {
            jobAlreadyDone();
        } else {
            createNewJobMasterServiceProcess(leaderSessionId);
        }
    }


 @GuardedBy("lock")
    private void createNewJobMasterServiceProcess(UUID leaderSessionId) throws FlinkException {
        Preconditions.checkState(jobMasterServiceProcess.closeAsync().isDone());
          ......
        ......
        /**
            * han_pf
            * 又开始一路创建,目前还没看到jobMaster启动
             * 到DefaultJobMasterServiceProcessFactory进行创建工作
            */
        jobMasterServiceProcess = jobMasterServiceProcessFactory.create(leaderSessionId);
      
     ........
    }

flink中的leader选举使用curatorframeworker,leader选举成功后最终是执行leaderContender的grandLeaderShip()方法. Jobmanager中有以下主要的leaderContender都会执行leader选举。


image.png

DefaultJobMasterServiceProcessFactory:

 @Override
    public JobMasterServiceProcess create(UUID leaderSessionId) {
        return new DefaultJobMasterServiceProcess(
                jobId,
                leaderSessionId,
                jobMasterServiceFactory,
                cause -> createArchivedExecutionGraph(JobStatus.FAILED, cause));
    }

DefaultJobMasterServiceFactory:

public DefaultJobMasterServiceProcess(
            JobID jobId,
            UUID leaderSessionId,
            JobMasterServiceFactory jobMasterServiceFactory,
            Function<Throwable, ArchivedExecutionGraph> failedArchivedExecutionGraphFactory) {
        this.jobId = jobId;
        this.leaderSessionId = leaderSessionId;
        this.jobMasterServiceFuture =
                jobMasterServiceFactory.createJobMasterService(leaderSessionId, this);

      。。。
    //异步操作

}

    @Override
    public CompletableFuture<JobMasterService> createJobMasterService(
            UUID leaderSessionId, OnCompletionActions onCompletionActions) {

        return CompletableFuture.supplyAsync(
                FunctionUtils.uncheckedSupplier(
                        () -> internalCreateJobMasterService(leaderSessionId, onCompletionActions)),
                executor);
    }

private JobMasterService internalCreateJobMasterService(
            UUID leaderSessionId, OnCompletionActions onCompletionActions) throws Exception {
        
        /**
         * han_pf
         * 终于启动起来了,jobMaster也是一个RpcEndpoint,构造方法执行完后将执行生命周期方法,onStart()
         */
        final JobMaster jobMaster =
                new JobMaster(
                        rpcService,
                        JobMasterId.fromUuidOrNull(leaderSessionId),
                        jobMasterConfiguration,
                        ResourceID.generate(),
                        jobGraph,
                        haServices,
                        slotPoolServiceSchedulerFactory,
                        jobManagerSharedServices,
                        heartbeatServices,
                        jobManagerJobMetricGroupFactory,
                        onCompletionActions,
                        fatalErrorHandler,
                        userCodeClassloader,
                        shuffleMaster,
                        lookup ->
                                new JobMasterPartitionTrackerImpl(
                                        jobGraph.getJobID(), shuffleMaster, lookup),
                        new DefaultExecutionDeploymentTracker(),
                        DefaultExecutionDeploymentReconciler::new,
                        initializationTimestamp);

        jobMaster.start();

        return jobMaster;
    }

以上终于启动了jobMaster,jobMaster也是一个RpcEndpoint,构造方法执行完后将执行生命周期方法,onStart();主要的执行逻辑在onStart()方法中生成,

JobMaster:

   @Override
    protected void onStart() throws JobMasterException {
        try {
            startJobExecution();
        } catch (Exception e) {
            final JobMasterException jobMasterException =
                    new JobMasterException("Could not start the JobMaster.", e);
            handleJobMasterError(jobMasterException);
            throw jobMasterException;
        }
    }

/**
    * han_pf
    *核心执行两个方法,
     * 1. 启动关键的两个服务taskManagerHeartbeatManager,resourceManagerHeartbeatManager
     * 2.任务的调度
     *
    */
    private void startJobExecution() throws Exception {
        validateRunsInMainThread();

        JobShuffleContext context = new JobShuffleContextImpl(jobGraph.getJobID(), this);
        shuffleMaster.registerJob(context);
        /**
        * han_pf
        * 真正的启动jobmaster服务
        */
        startJobMasterServices();

        log.info(
                "Starting execution of job '{}' ({}) under job master id {}.",
                jobGraph.getName(),
                jobGraph.getJobID(),
                getFencingToken());
        /**
        * han_pf
        * 启动调度器
        */

        startScheduling();
    }

jobMaster主要的方法startJobExecutio(),做两件大事,1. 第一个作为任务执行的master,创建与TaskManager的心跳管理器,启动slotPoolService 建立起与ResourceManager leader的联系。

  1. 启动任务调度器。
    JobMaster主要的两个成员变量slotPoolService 资源分配流程,schedulerNG 任务调度执行流程;
    在 JM 启动后,SlotPool 连接 ResourceManager,SchedulerNG 分配作业并向 scheduler 申请 slot,scheduler 根据不同策略向 SlotPool 申请资源,申请成功后 ExecutionVertex 进行 Deploy。
    SchedulerNG默认实现类DefaultScheduler创建过程中会生成executionGraph,

JobMaster构造方法


            RpcService rpcService,
            JobMasterId jobMasterId,
            JobMasterConfiguration jobMasterConfiguration,
            ResourceID resourceId,
            JobGraph jobGraph,
            HighAvailabilityServices highAvailabilityService,
            SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory,
            JobManagerSharedServices jobManagerSharedServices,
            HeartbeatServices heartbeatServices,
            JobManagerJobMetricGroupFactory jobMetricGroupFactory,
            OnCompletionActions jobCompletionActions,
            FatalErrorHandler fatalErrorHandler,
            ClassLoader userCodeLoader,
            ShuffleMaster<?> shuffleMaster,
            PartitionTrackerFactory partitionTrackerFactory,
            ExecutionDeploymentTracker executionDeploymentTracker,
            ExecutionDeploymentReconciler.Factory executionDeploymentReconcilerFactory,
            long initializationTimestamp)
            throws Exception {
resourceManagerLeaderRetriever =
                highAvailabilityServices.getResourceManagerLeaderRetriever();
this.slotPoolService =
                checkNotNull(slotPoolServiceSchedulerFactory).createSlotPoolService(jid);

this.shuffleMaster = checkNotNull(shuffleMaster);
this.schedulerNG =
                createScheduler(
                        slotPoolServiceSchedulerFactory,
                        executionDeploymentTracker,
                        jobManagerJobMetricGroup,
                        jobStatusListener);

」

DefaultSchedulerFactory:

 @Override
    public SchedulerNG createInstance(
    。。。。。
。。。
 return new DefaultScheduler(
                log,
                jobGraph,
                ioExecutor,
                jobMasterConfiguration,
                schedulerComponents.getStartUpAction(),
                new ScheduledExecutorServiceAdapter(futureExecutor),
                userCodeLoader,
                new CheckpointsCleaner(),
                checkpointRecoveryFactory,
                jobManagerJobMetricGroup,
                schedulerComponents.getSchedulingStrategyFactory(),
                FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
                restartBackoffTimeStrategy,
                new DefaultExecutionVertexOperations(),
                new ExecutionVertexVersioner(),
                schedulerComponents.getAllocatorFactory(),
                initializationTimestamp,
                mainThreadExecutor,
                jobStatusListener,
                executionGraphFactory,
                shuffleMaster,
                rpcTimeout);

」

最终是在SchedulerBase构造方法中生成executionGraph.,具体executionGraph生成过程中的细节后续专门分析四种Graph生成细节进行详细分析。

SchedulerBase:

 /**
        * han_pf
        * executionGraph的构建
        */
        this.executionGraph =
                createAndRestoreExecutionGraph(
                        completedCheckpointStore,
                        checkpointsCleaner,
                        checkpointIdCounter,
                        initializationTimestamp,
                        mainThreadExecutor,
                        jobStatusListener);

schedulerNG.startScheduling()分析

DefaultScheduler

   @Override
    protected void startSchedulingInternal() {
        log.info(
                "Starting scheduling with scheduling strategy [{}]",
                schedulingStrategy.getClass().getName());
        transitionToRunning();
        schedulingStrategy.startScheduling();
    }

PipelinedRegionSchedulingStrategy

    private void maybeScheduleRegion(
            final SchedulingPipelinedRegion region,
            final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
        if (!areRegionInputsAllConsumable(region, consumableStatusCache)) {
            return;
        }

        checkState(
                areRegionVerticesAllInCreatedState(region),
                "BUG: trying to schedule a region which is not in CREATED state");

        final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
                SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
                        regionVerticesSorted.get(region), id -> deploymentOption);
          /**
        * han_pf
        * 分配slot并deploy
        */
        schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
    }

以上,通过Dispatcher的创建,过程中恢复已有作业,同时会接收客户端提交的新的作业,然后拉起Jobmaster真正的管理作业,JobMaster会建立和ResourceManager、TaskManager心跳,用于申请任务所需要的slot和部署任务到对应slot中,关于最后slot的分配及任务的部署后续专门详细分析,通过此篇希望帮助各位梳理Dispacher的启动过程及主要用途,具体核心细节后续可专项分析。

相关文章

网友评论

      本文标题:Flink源码-集群启动之JobManager启动续

      本文链接:https://www.haomeiwen.com/subject/nwhulrtx.html