美文网首页玩转大数据Flink
Flink 源码之ExecutionGraph

Flink 源码之ExecutionGraph

作者: AlienPaul | 来源:发表于2021-03-26 11:26 被阅读0次

    Flink源码分析系列文档目录

    请点击:Flink 源码分析系列文档目录

    从JobGraph到ExecutionGraph

    JobGraph通过Dispatcher.submitJob方法提交。这是后续流程的入口方法。该方法调用了Dispatcher.internalSubmitJob,然后是Dispatcher.persistAndRunJob

    Dispatcher.persistAndRunJob方法存储并执行作业。如下所示:

    private void persistAndRunJob(JobGraph jobGraph) throws Exception {
        jobGraphWriter.putJobGraph(jobGraph);
        runJob(jobGraph, ExecutionType.SUBMISSION);
    }
    

    Dispatcher.runJob接收JobGraph和执行类型两个参数。执行类型有两种:提交任务(SUBMISSION)和恢复任务(RECOVERY)。

    private void runJob(JobGraph jobGraph, ExecutionType executionType) {
        // 确保JobID对应的这个作业目前不在运行状态,避免重复提交
        Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
        // 获取启动时时间戳
        long initializationTimestamp = System.currentTimeMillis();
        // 这里将JobManagerRunner创建出来
        // JobManagerRunner接下来会构造出JobManager
        CompletableFuture<JobManagerRunner> jobManagerRunnerFuture =
            createJobManagerRunner(jobGraph, initializationTimestamp);
    
        // 包装JobGraph相关信息供Dispatcher使用
        DispatcherJob dispatcherJob =
            DispatcherJob.createFor(
            jobManagerRunnerFuture,
            jobGraph.getJobID(),
            jobGraph.getName(),
            initializationTimestamp);
        
        // 将当前作业的ID加入runningJob集合
        // 表示当前作业已处于运行状态
        runningJobs.put(jobGraph.getJobID(), dispatcherJob);
    
        final JobID jobId = jobGraph.getJobID();
    
        // 处理Job派发结果
        final CompletableFuture<CleanupJobState> cleanupJobStateFuture =
            dispatcherJob
            .getResultFuture()
            .handleAsync(
            (dispatcherJobResult, throwable) -> {
                Preconditions.checkState(
                    runningJobs.get(jobId) == dispatcherJob,
                    "The job entry in runningJobs must be bound to the lifetime of the DispatcherJob.");
    
                if (dispatcherJobResult != null) {
                    return handleDispatcherJobResult(
                        jobId, dispatcherJobResult, executionType);
                } else {
                    return dispatcherJobFailed(jobId, throwable);
                }
            },
            getMainThreadExecutor());
    
        // 作业停止的时候,将JobID从runningJob中移除
        final CompletableFuture<Void> jobTerminationFuture =
            cleanupJobStateFuture
            .thenApply(cleanupJobState -> removeJob(jobId, cleanupJobState))
            .thenCompose(Function.identity());
    
        // 将作业ID和对应的作业停止future加入到dispatcherJobTerminationFutures集合维护
        FutureUtils.assertNoException(jobTerminationFuture);
        registerDispatcherJobTerminationFuture(jobId, jobTerminationFuture);
    }
    

    接下来是Dispatcher.createJobManagerRunner方法。

    JobManagerDispatcher中被创建出来,然后启动。创建JobManager的逻辑在createJobManagerRunner方法中,如下所示:

    CompletableFuture<JobManagerRunner> createJobManagerRunner(
        JobGraph jobGraph, long initializationTimestamp) {
        final RpcService rpcService = getRpcService();
        return CompletableFuture.supplyAsync(
            () -> {
                try {
                    // 使用工厂类创建JobManager
                    // 传入了JobGraph和高可用服务
                    JobManagerRunner runner =
                        jobManagerRunnerFactory.createJobManagerRunner(
                        jobGraph,
                        configuration,
                        rpcService,
                        highAvailabilityServices,
                        heartbeatServices,
                        jobManagerSharedServices,
                        new DefaultJobManagerJobMetricGroupFactory(
                            jobManagerMetricGroup),
                        fatalErrorHandler,
                        initializationTimestamp);
                    // 启动JobManager
                    // 实际上为启动JobManager的leader选举服务,选出JM主节点
                    runner.start();
                    return runner;
                } catch (Exception e) {
                    throw new CompletionException(
                        new JobInitializationException(
                            jobGraph.getJobID(),
                            "Could not instantiate JobManager.",
                            e));
                }
            },
            ioExecutor); // do not use main thread executor. Otherwise, Dispatcher is blocked on
        // JobManager creation
    }
    

    此时,JobManager开始进行leader竞选活动。为了确保JobManager不存在单点故障问题,Flink设计了JobManager 高可用,可以同时运行多个JobManager实例。在Standalone部署方式中,JobManager的竞选通过Zookeeper来实现。Yarn集群模式下则通过Yarn的ApplicationMaster失败后自动重启动方式来确保JobManager的高可用。有关leader选举的内容请参见Flink 源码之leader选举(Zookeeper方式)

    一旦leader JM被选举出来,选举服务会调用对应JM的grantLeadership方法。该方法内容如下所示:

    @Override
    public void grantLeadership(final UUID leaderSessionID) {
        synchronized (lock) {
            if (shutdown) {
                log.debug(
                    "JobManagerRunner cannot be granted leadership because it is already shut down.");
                return;
            }
    
            leadershipOperation =
                leadershipOperation.thenRun(
                ThrowingRunnable.unchecked(
                    () -> {
                        synchronized (lock) {
                            // 主要逻辑是这个
                            // 检查作业调度状态并启动JobManager
                            verifyJobSchedulingStatusAndStartJobManager(
                                leaderSessionID);
                        }
                    }));
    
            handleException(leadershipOperation, "Could not start the job manager.");
        }
    }
    

    接着我们跟踪到verifyJobSchedulingStatusAndStartJobManager方法。

    @GuardedBy("lock")
    private void verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId)
        throws FlinkException {
        // 如果JobManager已停止,直接返回
        if (shutdown) {
            log.debug("Ignoring starting JobMaster because JobManagerRunner is already shut down.");
            return;
        }
    
        // 从JobRegistry中获取Job调度状态
        final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus =
            getJobSchedulingStatus();
    
        // 如果作业已执行完毕
        // 调用作业执行完毕逻辑(实际上是作业未被当前JobManager完成运行的逻辑)
        if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE) {
            jobAlreadyDone();
        } else {
            // 启动JobMaster
            startJobMaster(leaderSessionId);
        }
    }
    

    现在逻辑流转到了JobManagerRunnerImpl.startJobMaster方法。

    该方法启动JobMaster。注册JobGraph,启动JobMaster服务并确认该JobMaster为leader。

    @GuardedBy("lock")
    private void startJobMaster(UUID leaderSessionId) throws FlinkException {
        log.info(
            "JobManager runner for job {} ({}) was granted leadership with session id {}.",
            jobGraph.getName(),
            jobGraph.getJobID(),
            leaderSessionId);
    
        try {
            // 注册JobGraph
            // 根据集群部署形式(Standalone,Zookeeper或K8s),采用不同的方式存储JobID
            runningJobsRegistry.setJobRunning(jobGraph.getJobID());
        } catch (IOException e) {
            throw new FlinkException(
                String.format(
                    "Failed to set the job %s to running in the running jobs registry.",
                    jobGraph.getJobID()),
                e);
        }
    
        // 启动JobMaster服务
        startJobMasterServiceSafely(leaderSessionId);
    
        // 确认该JobMaster是leader状态
        if (jobMasterService != null) {
            confirmLeaderSessionIdIfStillLeader(jobMasterService, leaderSessionId);
        }
    }
    

    JobManagerRunnerImpl.startJobMasterServiceSafely紧接着通过

    DefaultJobMasterServiceFactory.createJobMasterService方法,创建出JobMaster并启动他的Rpc通信服务。

    接下来。在JobMaster构造函数中存在构建Flink作业任务调度器的逻辑。JobMaster.createScheduler方法调用了

    DefaultSlotPoolServiceSchedulerFactory.createScheduler创建Flink的调度器。该方法又调用了Scheduler工厂类的创建Scheduler实例这个方法DefaultSchedulerFactory.createInstance

    接下来的流程到了DefaultScheduler中。DefaultScheduler是Flink作业调度器的默认实现。它继承了SchedulerBaseSchedulerBase又实现了SchedulerNG接口。

    SchedulerBase构造函数中调用了createAndRestoreExecutionGraph方法。

    SchedulerBase.createAndRestoreExecutionGraph代码如下所示:

    private ExecutionGraph createAndRestoreExecutionGraph(
        JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
        CompletedCheckpointStore completedCheckpointStore,
        CheckpointsCleaner checkpointsCleaner,
        CheckpointIDCounter checkpointIdCounter,
        ShuffleMaster<?> shuffleMaster,
        JobMasterPartitionTracker partitionTracker,
        ExecutionDeploymentTracker executionDeploymentTracker,
        long initializationTimestamp,
        ComponentMainThreadExecutor mainThreadExecutor,
        JobStatusListener jobStatusListener)
        throws Exception {
    
        // 创建ExecutionGraph
        ExecutionGraph newExecutionGraph =
            createExecutionGraph(
            currentJobManagerJobMetricGroup,
            completedCheckpointStore,
            checkpointsCleaner,
            checkpointIdCounter,
            shuffleMaster,
            partitionTracker,
            executionDeploymentTracker,
            initializationTimestamp);
    
        // 获取ExecutionGraph中创建出的CheckpointCoordinator
        // 创建CheckpointCoordinator的过程后面章节有说明
        final CheckpointCoordinator checkpointCoordinator =
            newExecutionGraph.getCheckpointCoordinator();
    
        if (checkpointCoordinator != null) {
            // check whether we find a valid checkpoint
            // 检查是否存在一个最近的checkpoint
            if (!checkpointCoordinator.restoreInitialCheckpointIfPresent(
                new HashSet<>(newExecutionGraph.getAllVertices().values()))) {
    
                // check whether we can restore from a savepoint
                // 如果有,尝试从这个检查点恢复
                tryRestoreExecutionGraphFromSavepoint(
                    newExecutionGraph, jobGraph.getSavepointRestoreSettings());
            }
        }
    
        // 设置任务失败监听器
        newExecutionGraph.setInternalTaskFailuresListener(
            new UpdateSchedulerNgOnInternalFailuresListener(this));
        // 设置作业状态监听器
        newExecutionGraph.registerJobStatusListener(jobStatusListener);
        // 设置JobMaster的主线程ThreadExecutor
        newExecutionGraph.start(mainThreadExecutor);
    
        return newExecutionGraph;
    }
    

    SchedulerBase.createExecutionGraph方法调用DefaultExecutionGraphBuilder,创建出ExecutionGraph。代码如下所示:

    private ExecutionGraph createExecutionGraph(
        JobManagerJobMetricGroup currentJobManagerJobMetricGroup,
        CompletedCheckpointStore completedCheckpointStore,
        CheckpointsCleaner checkpointsCleaner,
        CheckpointIDCounter checkpointIdCounter,
        ShuffleMaster<?> shuffleMaster,
        final JobMasterPartitionTracker partitionTracker,
        ExecutionDeploymentTracker executionDeploymentTracker,
        long initializationTimestamp)
        throws JobExecutionException, JobException {
    
        // 创建Execution部署监听器
        ExecutionDeploymentListener executionDeploymentListener =
            new ExecutionDeploymentTrackerDeploymentListenerAdapter(executionDeploymentTracker);
        //创建Execution状态更新监听器
        ExecutionStateUpdateListener executionStateUpdateListener =
            (execution, newState) -> {
            if (newState.isTerminal()) {
                executionDeploymentTracker.stopTrackingDeploymentOf(execution);
            }
        };
    
        // 创建ExecutionGraph
        return DefaultExecutionGraphBuilder.buildGraph(
            jobGraph,
            jobMasterConfiguration,
            futureExecutor,
            ioExecutor,
            userCodeLoader,
            completedCheckpointStore,
            checkpointsCleaner,
            checkpointIdCounter,
            rpcTimeout,
            currentJobManagerJobMetricGroup,
            blobWriter,
            log,
            shuffleMaster,
            partitionTracker,
            TaskDeploymentDescriptorFactory.PartitionLocationConstraint.fromJobType(
                jobGraph.getJobType()),
            executionDeploymentListener,
            executionStateUpdateListener,
            initializationTimestamp,
            new DefaultVertexAttemptNumberStore());
    }
    

    ExecutionGraph相关概念

    ExecutionGraph为Flink作业的物理执行计划。用来协调数据流的分布式执行过程。

    StreamGraphJobGraph不同的是,ExecutionGraph是在JobManager中生成。

    ExecutionGraph也有顶点(Vertex)的概念,ExecutionGraph中的vertex为ExecutionJobVertex,和JobGraph中的JobVertex对应。从ExecutionGraphJobGraph的过程中加入了并行度的概念,ExecutionJobVertex包含了与之对应的JobVertex中所有的并行任务。ExecutionJobVertex之中每一个并行的任务由ExecutionVertex代表。也就是说一个ExecutionJobVertex具有多少并行度,它下面就包含多少个ExecutionVertex

    ExecutionVertex可以被执行一次或多次(由于任务恢复,重计算或更新配置)ExecutionVertex的每一次执行都会生成一个Execution对象。Execution负责跟踪ExecutionVertex的任务执行状态变化和资源使用状况。

    IntermediateResultJobGraphJobVertexIntermediateDataSet的概念对应,用于表示两个相邻的ExecutionJobVertex之间数据传输过程中的临时存放点。IntermediateResultExecutionJobVertex创建的时候被构建出来,数量和该vertex的并行度一致。

    DefaultExecutionGraphBuilder的buildGraph方法

    public static ExecutionGraph buildGraph(
        JobGraph jobGraph,
        Configuration jobManagerConfig,
        ScheduledExecutorService futureExecutor,
        Executor ioExecutor,
        ClassLoader classLoader,
        CompletedCheckpointStore completedCheckpointStore,
        CheckpointsCleaner checkpointsCleaner,
        CheckpointIDCounter checkpointIdCounter,
        Time rpcTimeout,
        MetricGroup metrics,
        BlobWriter blobWriter,
        Logger log,
        ShuffleMaster<?> shuffleMaster,
        JobMasterPartitionTracker partitionTracker,
        TaskDeploymentDescriptorFactory.PartitionLocationConstraint partitionLocationConstraint,
        ExecutionDeploymentListener executionDeploymentListener,
        ExecutionStateUpdateListener executionStateUpdateListener,
        long initializationTimestamp,
        VertexAttemptNumberStore vertexAttemptNumberStore)
        throws JobExecutionException, JobException {
    
        checkNotNull(jobGraph, "job graph cannot be null");
    
        // 获取作业名称和作业ID
        final String jobName = jobGraph.getName();
        final JobID jobId = jobGraph.getJobID();
    
        // 创建JobInformation
        // JobInformation为ExecutionGraph中的job相关配置信息的封装类
        final JobInformation jobInformation =
            new JobInformation(
            jobId,
            jobName,
            jobGraph.getSerializedExecutionConfig(),
            jobGraph.getJobConfiguration(),
            jobGraph.getUserJarBlobKeys(),
            jobGraph.getClasspaths());
    
        // 获取保留在历史记录中的最大重试次数
        final int maxPriorAttemptsHistoryLength =
            jobManagerConfig.getInteger(JobManagerOptions.MAX_ATTEMPTS_HISTORY_SIZE);
    
        // 获取IntermediateResultPartition释放策略工厂类
        final PartitionReleaseStrategy.Factory partitionReleaseStrategyFactory =
            PartitionReleaseStrategyFactoryLoader.loadPartitionReleaseStrategyFactory(
            jobManagerConfig);
    
        // create a new execution graph, if none exists so far
        // 创建ExecutionGraph,后面章节分析
        final DefaultExecutionGraph executionGraph;
        try {
                executionGraph =
                        new DefaultExecutionGraph(
                                jobInformation,
                                futureExecutor,
                                ioExecutor,
                                rpcTimeout,
                                maxPriorAttemptsHistoryLength,
                                classLoader,
                                blobWriter,
                                partitionReleaseStrategyFactory,
                                shuffleMaster,
                                partitionTracker,
                                partitionLocationConstraint,
                                executionDeploymentListener,
                                executionStateUpdateListener,
                                initializationTimestamp,
                                vertexAttemptNumberStore);
        } catch (IOException e) {
            throw new JobException("Could not create the ExecutionGraph.", e);
        }
    
        // set the basic properties
    
        try {
            // 设置json格式的执行计划
            executionGraph.setJsonPlan(JsonPlanGenerator.generatePlan(jobGraph));
        } catch (Throwable t) {
            log.warn("Cannot create JSON plan for job", t);
            // give the graph an empty plan
            // 如果根据jobGraph生成json执行计划失败,设置一个空的执行计划
            executionGraph.setJsonPlan("{}");
        }
    
        // initialize the vertices that have a master initialization hook
        // file output formats create directories here, input formats create splits
    
        final long initMasterStart = System.nanoTime();
        log.info("Running initialization on master for job {} ({}).", jobName, jobId);
    
        for (JobVertex vertex : jobGraph.getVertices()) {
            // 获取节点调用的类名,即节点的task
            String executableClass = vertex.getInvokableClassName();
            // 确保每个节点的调用类必须存在
            if (executableClass == null || executableClass.isEmpty()) {
                throw new JobSubmissionException(
                    jobId,
                    "The vertex "
                    + vertex.getID()
                    + " ("
                    + vertex.getName()
                    + ") has no invokable class.");
            }
    
            try {
                // 根据不同的节点类型,调用job启动时节点的任务逻辑
                vertex.initializeOnMaster(classLoader);
            } catch (Throwable t) {
                throw new JobExecutionException(
                    jobId,
                    "Cannot initialize task '" + vertex.getName() + "': " + t.getMessage(),
                    t);
            }
        }
    
        log.info(
            "Successfully ran initialization on master in {} ms.",
            (System.nanoTime() - initMasterStart) / 1_000_000);
    
        // topologically sort the job vertices and attach the graph to the existing one
        // 按照拓扑结构(数据流的顺序)排序,获取所有的Job顶点
        List<JobVertex> sortedTopology = jobGraph.getVerticesSortedTopologicallyFromSources();
        if (log.isDebugEnabled()) {
            log.debug(
                "Adding {} vertices from job graph {} ({}).",
                sortedTopology.size(),
                jobName,
                jobId);
        }
        // executionGraph绑定所有的Job节点
        executionGraph.attachJobGraph(sortedTopology);
    
        if (log.isDebugEnabled()) {
            log.debug(
                "Successfully created execution graph from job graph {} ({}).", jobName, jobId);
        }
    
        // configure the state checkpointing
        // 配置checkpoint
        // 如果启用了checkpoint
        if (isCheckpointingEnabled(jobGraph)) {
            // 从JobGraph获取checkpoint的配置
            // snapshotSettings的配置位于StreamingJobGraphGenerator
            JobCheckpointingSettings snapshotSettings = jobGraph.getCheckpointingSettings();
            
            // 获取所有触发checkpoint的顶点,即所有的数据输入顶点
            List<ExecutionJobVertex> triggerVertices =
                idToVertex(snapshotSettings.getVerticesToTrigger(), executionGraph);
    
            // 获取所有需要checkpoint确认的顶点,即所有的顶点
            List<ExecutionJobVertex> ackVertices =
                idToVertex(snapshotSettings.getVerticesToAcknowledge(), executionGraph);
    
            // 获取所有需要接收到提交checkpoint信息的顶点,即所有的顶点
            List<ExecutionJobVertex> confirmVertices =
                idToVertex(snapshotSettings.getVerticesToConfirm(), executionGraph);
    
            // Maximum number of remembered checkpoints
            // 获取历史记录checkpoint最大数量
            int historySize = jobManagerConfig.getInteger(WebOptions.CHECKPOINTS_HISTORY_SIZE);
    
            // 创建checkpoint状态跟踪器,和CheckpointCoordinator配合工作
            CheckpointStatsTracker checkpointStatsTracker =
                new CheckpointStatsTracker(
                historySize,
                ackVertices,
                snapshotSettings.getCheckpointCoordinatorConfiguration(),
                metrics);
    
            // load the state backend from the application settings
            // 获取状态后端的配置
            final StateBackend applicationConfiguredBackend;
            final SerializedValue<StateBackend> serializedAppConfigured =
                snapshotSettings.getDefaultStateBackend();
    
            if (serializedAppConfigured == null) {
                applicationConfiguredBackend = null;
            } else {
                try {
                    // 根据应用的配置获取状态后端
                    applicationConfiguredBackend =
                        serializedAppConfigured.deserializeValue(classLoader);
                } catch (IOException | ClassNotFoundException e) {
                    throw new JobExecutionException(
                        jobId, "Could not deserialize application-defined state backend.", e);
                }
            }
    
            final StateBackend rootBackend;
            try {
                // 获取状态后端配置
                // 如果应用的状态后端没有配置,使用配置文件中的状态后端
                // 如果配置文件中也没有,使用默认值
                rootBackend =
                    StateBackendLoader.fromApplicationOrConfigOrDefault(
                    applicationConfiguredBackend, jobManagerConfig, classLoader, log);
            } catch (IllegalConfigurationException | IOException | DynamicCodeLoadingException e) {
                throw new JobExecutionException(
                    jobId, "Could not instantiate configured state backend", e);
            }
            
            // load the checkpoint storage from the application settings
            // 从app设置中加载checkpoint存储配置
            final CheckpointStorage applicationConfiguredStorage;
            final SerializedValue<CheckpointStorage> serializedAppConfiguredStorage =
                snapshotSettings.getDefaultCheckpointStorage();
    
            if (serializedAppConfiguredStorage == null) {
                applicationConfiguredStorage = null;
            } else {
                try {
                    applicationConfiguredStorage =
                        serializedAppConfiguredStorage.deserializeValue(classLoader);
                } catch (IOException | ClassNotFoundException e) {
                    throw new JobExecutionException(
                        jobId,
                        "Could not deserialize application-defined checkpoint storage.",
                        e);
                }
            }
            
            // 和状态后端的配置类似,从应用配置和flink配置文件中加载checkpoint存储配置
            final CheckpointStorage rootStorage;
            try {
                rootStorage =
                    CheckpointStorageLoader.load(
                    applicationConfiguredStorage,
                    null,
                    rootBackend,
                    jobManagerConfig,
                    classLoader,
                    log);
            } catch (IllegalConfigurationException | DynamicCodeLoadingException e) {
                throw new JobExecutionException(
                    jobId, "Could not instantiate configured checkpoint storage", e);
            }
    
            // instantiate the user-defined checkpoint hooks
    
            // 实例化用户定义的checkpoint钩子
            // 这些钩子可以在恢复快照或者是触发快照的时候执行
            final SerializedValue<MasterTriggerRestoreHook.Factory[]> serializedHooks =
                snapshotSettings.getMasterHooks();
            final List<MasterTriggerRestoreHook<?>> hooks;
    
            if (serializedHooks == null) {
                hooks = Collections.emptyList();
            } else {
                final MasterTriggerRestoreHook.Factory[] hookFactories;
                try {
                    hookFactories = serializedHooks.deserializeValue(classLoader);
                } catch (IOException | ClassNotFoundException e) {
                    throw new JobExecutionException(
                        jobId, "Could not instantiate user-defined checkpoint hooks", e);
                }
    
                final Thread thread = Thread.currentThread();
                final ClassLoader originalClassLoader = thread.getContextClassLoader();
                thread.setContextClassLoader(classLoader);
    
                try {
                    hooks = new ArrayList<>(hookFactories.length);
                    for (MasterTriggerRestoreHook.Factory factory : hookFactories) {
                        hooks.add(MasterHooks.wrapHook(factory.create(), classLoader));
                    }
                } finally {
                    thread.setContextClassLoader(originalClassLoader);
                }
            }
    
            // 获取checkpoint协调器的配置
            final CheckpointCoordinatorConfiguration chkConfig =
                snapshotSettings.getCheckpointCoordinatorConfiguration();
    
            // 为executionGraph应用checkpoint的相关配置
            executionGraph.enableCheckpointing(
                chkConfig,
                triggerVertices,
                ackVertices,
                confirmVertices,
                hooks,
                checkpointIdCounter,
                completedCheckpointStore,
                rootBackend,
                checkpointStatsTracker,
                checkpointsCleaner);
        }
    
        // create all the metrics for the Execution Graph
        // 创建相关监控项,监控任务运行时间,重启时间和停止时间
        metrics.gauge(RestartTimeGauge.METRIC_NAME, new RestartTimeGauge(executionGraph));
        metrics.gauge(DownTimeGauge.METRIC_NAME, new DownTimeGauge(executionGraph));
        metrics.gauge(UpTimeGauge.METRIC_NAME, new UpTimeGauge(executionGraph));
    
        return executionGraph;
    }
    

    创建ExecutionGraph的主要步骤大致如下:

    • 获取Job信息
    • 创建ExecutionGraph本体
    • 绑定JobGraph顶点
    • 设置Checkpoint配置
    • 设置状态后端配置
    • 设置Checkpoint存储
    • 设置Checkpoint钩子
    • 设置作业监控

    ExecutionGraph构造函数

    • jobInformation:作业信息的一个封装,包含作业id,名称,配置,用户代码和classpath等。
    • futureExecutor:异步执行线程池。
    • ioExecutor:IO操作线程池。
    • rpcTimeout:RPC调用超时时间。
    • maxPriorAttemptsHistoryLength:保留在历史记录中的最大重试次数。
    • classLoader:用户代码类加载器。
    • blobWriter:用于将数据写入blob server。
    • partitionReleaseStrategyFactory:IntermediateResultPartition释放策略工厂类。
    • shuffleMaster:用于注册IntermediateResultPartition(中间结果分区),向JobMaster注册数据分区及它的生产者。
    • partitionTracker:用于追踪和释放分区。
    • partitionLocationConstraint:限制在部署的时候partition的位置可否未知。在批模式,分区未知可以未知,但是在流模式,分区位置必须是已知的。
    • executionDeploymentListener:执行计划部署监听器
    • executionStateUpdateListener:执行计划更新监听器
    • initializationTimestamp:初始时间戳
    • vertexAttemptNumberStore:用于储存每个Job顶点重试次数

    attachJobGraph方法

    该方法将JobGraph绑定到ExecutionGraph。

    @Override
    public void attachJobGraph(List<JobVertex> topologiallySorted) throws JobException {
    
        // 检查在JobMaster主线程执行
        assertRunningInJobMasterMainThread();
    
        LOG.debug(
            "Attaching {} topologically sorted vertices to existing job graph with {} "
            + "vertices and {} intermediate results.",
            topologiallySorted.size(),
            tasks.size(),
            intermediateResults.size());
    
        // 创建保存Execution作业顶点的集合
        final ArrayList<ExecutionJobVertex> newExecJobVertices =
            new ArrayList<>(topologiallySorted.size());
        final long createTimestamp = System.currentTimeMillis();
    
        for (JobVertex jobVertex : topologiallySorted) {
    
            // 如果有顶点是数据输入顶点并且是无法停止的顶点
            // 则设置ExecutionGraph的数据源task属性为无法停止
            if (jobVertex.isInputVertex() && !jobVertex.isStoppable()) {
                this.isStoppable = false;
            }
    
            // create the execution job vertex and attach it to the graph
            // 创建出ExecutionJobVertex
            ExecutionJobVertex ejv =
                new ExecutionJobVertex(
                this,
                jobVertex,
                maxPriorAttemptsHistoryLength,
                rpcTimeout,
                createTimestamp,
                this.initialAttemptCounts.getAttemptCounts(jobVertex.getID()));
    
            // 设置前一个节点的IntermediateResult给当前ejv
            // 完成连接到前置节点这个逻辑,即这个方法名的含义
            ejv.connectToPredecessors(this.intermediateResults);
    
            // 将job顶点ID和ejv作为键值对,放入ExecutionGraph
            ExecutionJobVertex previousTask = this.tasks.putIfAbsent(jobVertex.getID(), ejv);
            
            // 如果previousTask不为空,说明两个JobGraph的顶点具有相同的ID,为异常情况
            if (previousTask != null) {
                throw new JobException(
                    String.format(
                        "Encountered two job vertices with ID %s : previous=[%s] / new=[%s]",
                        jobVertex.getID(), ejv, previousTask));
            }
    
            // 遍历ejv中创建的IntermediateResult
            // 该IntermediateResult在ExecutionJobVertex构造函数创建
            // 从ejv对应JobVertex的IntermediateDataSets创建出IntermediateResult
            for (IntermediateResult res : ejv.getProducedDataSets()) {
                IntermediateResult previousDataSet =
                    this.intermediateResults.putIfAbsent(res.getId(), res);
                // 同理,检查result的ID不能重复
                if (previousDataSet != null) {
                    throw new JobException(
                        String.format(
                            "Encountered two intermediate data set with ID %s : previous=[%s] / new=[%s]",
                            res.getId(), res, previousDataSet));
                }
            }
    
            // 该集合按照顶点创建顺序保存ejv,将ejv保存起来
            this.verticesInCreationOrder.add(ejv);
            // 统计总的顶点数,作业实际执行的时候,每个并行度都会部署一个vertex运行task
            // 因此需要累加各个ejv的并行度
            this.numVerticesTotal += ejv.getParallelism();
            newExecJobVertices.add(ejv);
        }
    
        // 注册所有的ExecutionVertex和它输出数据的IntermediateResultPartition
        // ExecutionVertex为物理执行节点,一个ExecutionJobVertex有多少并行度,就会包含多少个ExecutionVertex
        registerExecutionVerticesAndResultPartitions(this.verticesInCreationOrder);
    
        // the topology assigning should happen before notifying new vertices to failoverStrategy
        // 创建执行拓扑
        //执行拓扑包含所有ExecutionVertex,ResultPartition以及PipelinedRegion
        executionTopology = DefaultExecutionTopology.fromExecutionGraph(this);
    
        // 创建分区释放策略
        partitionReleaseStrategy =
            partitionReleaseStrategyFactory.createInstance(getSchedulingTopology());
    }
    

    Pipelined Region

    创建executionTopology我们会遇到Pipelined region。在解释这个概念前需要了解下pipelined result和blocking result的区别。

    Pipelined result指的是数据从管道中源源不断的流出,下游可以连续消费产生的数据。一旦上游产生数据,下游就可以立即开始消费。Pipelined result生产数据的过程永远不会停止。此类型对应的作业为流计算作业。

    Blocking result只能等到上游数据生产过程结束的时候才可以消费。Blocking result永远是有限的。典型的场景是批处理作业。

    Pipelined Region是ExecutionGraph的一部分。它包含连续多个pipeline类型数据交换task(生成pipelined result)。因此一个ExecutionGraph可被分隔为多个pipelined Region,他们之间有block类型作业相连接。

    Pipelined Region的意义是region内部的所有消费者必须持续消费上游生产者产生的数据,从而避免阻塞上游,或者产生反压。所以说同一个pipelined Region内的所有task启动或失败之时都必须同时被调度或重启。

    Pipelined region调度的官网解释请参见:https://flink.apache.org/2020/12/15/pipelined-region-sheduling.html

    enableCheckpointing方法

    该方法为ExecutionGraph初始化检查点相关配置。主要逻辑是创建和配置CheckpointCoordinator对象。代码如下所示:

    @Override
    public void enableCheckpointing(
        CheckpointCoordinatorConfiguration chkConfig,
        List<MasterTriggerRestoreHook<?>> masterHooks,
        CheckpointIDCounter checkpointIDCounter,
        CompletedCheckpointStore checkpointStore,
        StateBackend checkpointStateBackend,
        CheckpointStorage checkpointStorage,
        CheckpointStatsTracker statsTracker,
        CheckpointsCleaner checkpointsCleaner) {
    
        // 检查作业必须处于已创建状态
        checkState(state == JobStatus.CREATED, "Job must be in CREATED state");
        // 检查CheckpointCoordinator(检查点协调器)必须未创建,避免重复操作
        checkState(checkpointCoordinator == null, "checkpointing already enabled");
    
        // 收集各个ExecutionJobVertex的OperatorCoordinator
        // OperatorCoordinator运行在JobManager,与作业vertex中的operator相关联。用于和operator交互
        final Collection<OperatorCoordinatorCheckpointContext> operatorCoordinators =
            buildOpCoordinatorCheckpointContexts();
    
        checkpointStatsTracker = checkNotNull(statsTracker, "CheckpointStatsTracker");
    
        // 创建checkpoint失败管理器,负责在checkpoint失败时候调用处理逻辑
        CheckpointFailureManager failureManager =
            new CheckpointFailureManager(
            chkConfig.getTolerableCheckpointFailureNumber(),
            new CheckpointFailureManager.FailJobCallback() {
                @Override
                public void failJob(Throwable cause) {
                    getJobMasterMainThreadExecutor().execute(() -> failGlobal(cause));
                }
    
                @Override
                public void failJobDueToTaskFailure(
                    Throwable cause, ExecutionAttemptID failingTask) {
                    getJobMasterMainThreadExecutor()
                        .execute(
                        () ->
                        failGlobalIfExecutionIsStillRunning(
                            cause, failingTask));
                }
            });
    
        // 创建CheckpointCoordinator周期自动触发checkpoint的定时器
        checkState(checkpointCoordinatorTimer == null);
    
        checkpointCoordinatorTimer =
            Executors.newSingleThreadScheduledExecutor(
            new DispatcherThreadFactory(
                Thread.currentThread().getThreadGroup(), "Checkpoint Timer"));
    
        // 创建CheckpointCoordinator,负责协调整个集群范围内所有operator的checkpoint操作,发起checkpoint操作和提交checkpoint
        // create the coordinator that triggers and commits checkpoints and holds the state
        checkpointCoordinator =
            new CheckpointCoordinator(
            jobInformation.getJobId(),
            chkConfig,
            operatorCoordinators,
            checkpointIDCounter,
            checkpointStore,
            checkpointStorage,
            ioExecutor,
            checkpointsCleaner,
            new ScheduledExecutorServiceAdapter(checkpointCoordinatorTimer),
            SharedStateRegistry.DEFAULT_FACTORY,
            failureManager,
            createCheckpointPlanCalculator(),
            new ExecutionAttemptMappingProvider(getAllExecutionVertices()));
    
        // register the master hooks on the checkpoint coordinator
        // 设置主消息钩子,在创建checkpoint或从checkpoint恢复的时候回调
        for (MasterTriggerRestoreHook<?> hook : masterHooks) {
            if (!checkpointCoordinator.addMasterHook(hook)) {
                LOG.warn(
                    "Trying to register multiple checkpoint hooks with the name: {}",
                    hook.getIdentifier());
            }
        }
    
        // 配置checkpoint状态跟踪器
        checkpointCoordinator.setCheckpointStatsTracker(checkpointStatsTracker);
    
        // interval of max long value indicates disable periodic checkpoint,
        // the CheckpointActivatorDeactivator should be created only if the interval is not max
        // value
        // 如果没有禁用周期性触发checkpoint,注册一个作业状态监听器
        // 该listener为CheckpointCoordinator所用,监听作业状态的变化
        if (chkConfig.getCheckpointInterval() != Long.MAX_VALUE) {
            // the periodic checkpoint scheduler is activated and deactivated as a result of
            // job status changes (running -> on, all other states -> off)
            registerJobStatusListener(checkpointCoordinator.createActivatorDeactivator());
        }
    
        // 配置状态后端名称和checkpoint存储名称
        this.stateBackendName = checkpointStateBackend.getClass().getSimpleName();
        this.checkpointStorageName = checkpointStorage.getClass().getSimpleName();
    }
    

    从ExecutionGraph到部署Task

    ExecutionVertex每次执行都会创建出一个Execution对象。

    JobManager启动完毕之后,会对Scheduler发出开始调度的命令(调用SchedulerBasestartScheduling方法)。经过中间层层调用(较为复杂,这里暂时省略),最终到达Execution.deploy方法。

    Execution.deploy方法为真正的部署运行逻辑,根据task资源和ExecutionVertex构造出一个task部署描述符。这个部署描述符的作用为携带task执行配置,通过RPC的方式传递给TaskManager,从而创建出一个符合要求的task。

    public void deploy() throws JobException {
        // 确保在JobMaster主线程执行
        assertRunningInJobMasterMainThread();
    
        // 获取分配的资源
        final LogicalSlot slot = assignedResource;
    
        checkNotNull(
            slot,
            "In order to deploy the execution we first have to assign a resource via tryAssignResource.");
    
        // Check if the TaskManager died in the meantime
        // This only speeds up the response to TaskManagers failing concurrently to deployments.
        // The more general check is the rpcTimeout of the deployment call
        if (!slot.isAlive()) {
            throw new JobException("Target slot (TaskManager) for deployment is no longer alive.");
        }
    
        // make sure exactly one deployment call happens from the correct state
        // note: the transition from CREATED to DEPLOYING is for testing purposes only
        // 获取之前的状态,并切换状态为正在部署(DEPLOYING)
        ExecutionState previous = this.state;
        if (previous == SCHEDULED || previous == CREATED) {
            if (!transitionState(previous, DEPLOYING)) {
                // race condition, someone else beat us to the deploying call.
                // this should actually not happen and indicates a race somewhere else
                throw new IllegalStateException(
                    "Cannot deploy task: Concurrent deployment call race.");
            }
        } else {
            // vertex may have been cancelled, or it was already scheduled
            throw new IllegalStateException(
                "The vertex must be in CREATED or SCHEDULED state to be deployed. Found state "
                + previous);
        }
    
        // 检查这个slot资源是否分配给了当前execution
        if (this != slot.getPayload()) {
            throw new IllegalStateException(
                String.format(
                    "The execution %s has not been assigned to the assigned slot.", this));
        }
    
        try {
    
            // race double check, did we fail/cancel and do we need to release the slot?
            // 再次检查作业状态是否为正在部署
            if (this.state != DEPLOYING) {
                slot.releaseSlot(
                    new FlinkException(
                        "Actual state of execution "
                        + this
                        + " ("
                        + state
                        + ") does not match expected state DEPLOYING."));
                return;
            }
    
            LOG.info(
                "Deploying {} (attempt #{}) with attempt id {} to {} with allocation id {}",
                vertex.getTaskNameWithSubtaskIndex(),
                attemptNumber,
                vertex.getCurrentExecutionAttempt().getAttemptId(),
                getAssignedResourceLocation(),
                slot.getAllocationId());
    
            // 创建一个Task部署描述符
            // 该部署描述符携带了ExecutionVertex及其分配的资源等信息
            final TaskDeploymentDescriptor deployment =
                TaskDeploymentDescriptorFactory.fromExecutionVertex(vertex, attemptNumber)
                .createDeploymentDescriptor(
                slot.getAllocationId(),
                taskRestore,
                producedPartitions.values());
    
            // null taskRestore to let it be GC'ed
            taskRestore = null;
    
            final TaskManagerGateway taskManagerGateway = slot.getTaskManagerGateway();
    
            final ComponentMainThreadExecutor jobMasterMainThreadExecutor =
                vertex.getExecutionGraphAccessor().getJobMasterMainThreadExecutor();
    
            getVertex().notifyPendingDeployment(this);
            // We run the submission in the future executor so that the serialization of large TDDs
            // does not block
            // the main thread and sync back to the main thread once submission is completed.
            // 在这里,异步调用taskManagerGateway,通过rpc方式通知TaskManager
            // 将Task部署描述符发送给TaskManager
            // TaskManager接收到后开始部署Task
            CompletableFuture.supplyAsync(
                () -> taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
                .thenCompose(Function.identity())
                .whenCompleteAsync(
                (ack, failure) -> {
                    if (failure == null) {
                        vertex.notifyCompletedDeployment(this);
                    } else {
                        if (failure instanceof TimeoutException) {
                            String taskname =
                                vertex.getTaskNameWithSubtaskIndex()
                                + " ("
                                + attemptId
                                + ')';
    
                            markFailed(
                                new Exception(
                                    "Cannot deploy task "
                                    + taskname
                                    + " - TaskManager ("
                                    + getAssignedResourceLocation()
                                    + ") not responding after a rpcTimeout of "
                                    + rpcTimeout,
                                    failure));
                        } else {
                            markFailed(failure);
                        }
                    }
                },
                jobMasterMainThreadExecutor);
    
        } catch (Throwable t) {
            markFailed(t);
        }
    }
    

    上面方法中通过TaskManagerGateway调用了TaskExecutorsubmitTask方法。

    @Override
    public CompletableFuture<Acknowledge> submitTask(
        TaskDeploymentDescriptor tdd, JobMasterId jobMasterId, Time timeout) {
        // ...
        // 创建出Task
        Task task =
                        new Task(
                                jobInformation,
                                taskInformation,
                                tdd.getExecutionAttemptId(),
                                tdd.getAllocationId(),
                                tdd.getSubtaskIndex(),
                                tdd.getAttemptNumber(),
                                tdd.getProducedPartitions(),
                                tdd.getInputGates(),
                                memoryManager,
                                taskExecutorServices.getIOManager(),
                                taskExecutorServices.getShuffleEnvironment(),
                                taskExecutorServices.getKvStateService(),
                                taskExecutorServices.getBroadcastVariableManager(),
                                taskExecutorServices.getTaskEventDispatcher(),
                                externalResourceInfoProvider,
                                taskStateManager,
                                taskManagerActions,
                                inputSplitProvider,
                                checkpointResponder,
                                taskOperatorEventGateway,
                                aggregateManager,
                                classLoaderHandle,
                                fileCache,
                                taskManagerConfiguration,
                                taskMetricGroup,
                                resultPartitionConsumableNotifier,
                                partitionStateChecker,
                                getRpcService().getExecutor());
    // ...
    }
    

    到这里为止,TaskManager中的具体任务Task对象已经被创建出来。从JobGraph生成ExecutionGraph并最终部署为Task的过程已分析完毕。

    本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

    相关文章

      网友评论

        本文标题:Flink 源码之ExecutionGraph

        本文链接:https://www.haomeiwen.com/subject/tbrjhltx.html