美文网首页flinkflink
Flink作业提交(二)--- 源码分析JobMaster如何r

Flink作业提交(二)--- 源码分析JobMaster如何r

作者: sj_91d7 | 来源:发表于2021-02-02 12:57 被阅读0次

    Flink第二篇
    Flink引擎作业提交流程这篇文章已经介绍了作业提交给Dispatcher的过程,那JobMaster是怎么run起来的呢?下面主要通过源码来介绍这个问题。本文内容是基于Flink 1.9来讲解。

    1. 作业提交到集群之后,首先是由JobSubmitHandler来处理请求
        @Override
        protected CompletableFuture<JobSubmitResponseBody> handleRequest(@Nonnull HandlerRequest<JobSubmitRequestBody, EmptyMessageParameters> request, @Nonnull DispatcherGateway gateway) throws RestHandlerException {
            final Collection<File> uploadedFiles = request.getUploadedFiles();
            final Map<String, Path> nameToFile = uploadedFiles.stream().collect(Collectors.toMap(
                File::getName,
                Path::fromLocalFile
            ));
    
            if (uploadedFiles.size() != nameToFile.size()) {
                throw new RestHandlerException(
                    String.format("The number of uploaded files was %s than the expected count. Expected: %s Actual %s",
                        uploadedFiles.size() < nameToFile.size() ? "lower" : "higher",
                        nameToFile.size(),
                        uploadedFiles.size()),
                    HttpResponseStatus.BAD_REQUEST
                );
            }
    
            final JobSubmitRequestBody requestBody = request.getRequestBody();
    
            if (requestBody.jobGraphFileName == null) {
                throw new RestHandlerException(
                    String.format("The %s field must not be omitted or be null.",
                        JobSubmitRequestBody.FIELD_NAME_JOB_GRAPH),
                    HttpResponseStatus.BAD_REQUEST);
            }
    
            // 加载JobGraph
            CompletableFuture<JobGraph> jobGraphFuture = loadJobGraph(requestBody, nameToFile);
    
            Collection<Path> jarFiles = getJarFilesToUpload(requestBody.jarFileNames, nameToFile);
    
            Collection<Tuple2<String, Path>> artifacts = getArtifactFilesToUpload(requestBody.artifactFileNames, nameToFile);
    
            // 上传JobGraph和用户其他的jar
            CompletableFuture<JobGraph> finalizedJobGraphFuture = uploadJobGraphFiles(gateway, jobGraphFuture, jarFiles, artifacts, configuration);
    
            // 提交job
            CompletableFuture<Acknowledge> jobSubmissionFuture = finalizedJobGraphFuture.thenCompose(jobGraph -> gateway.submitJob(jobGraph, timeout));
    
            return jobSubmissionFuture.thenCombine(jobGraphFuture,
                (ack, jobGraph) -> new JobSubmitResponseBody("/jobs/" + jobGraph.getJobID()));
        }
    

    JobSubmitHandler的handleRequest主要做了三件事情

    • loadJobGraph,加载JobGraph
    • uploadJobGraphFiles,上传JobGraph和用户其他的jar
    • gateway.submitJob,提交job
    1. gateway.submitJob之后,会调用Dispatcher的submitJob方法,接下来这几次方法调用,都在Dispatcher类中
        @Override
        public CompletableFuture<Acknowledge> submitJob(JobGraph jobGraph, Time timeout) {
            log.info("Received JobGraph submission {} ({}).", jobGraph.getJobID(), jobGraph.getName());
    
            try {
                if (isDuplicateJob(jobGraph.getJobID())) {
                    return FutureUtils.completedExceptionally(
                        new JobSubmissionException(jobGraph.getJobID(), "Job has already been submitted."));
                } else if (isPartialResourceConfigured(jobGraph)) {
                    return FutureUtils.completedExceptionally(
                        new JobSubmissionException(jobGraph.getJobID(), "Currently jobs is not supported if parts of the vertices have " +
                                "resources configured. The limitation will be removed in future versions."));
                } else {
                    return internalSubmitJob(jobGraph);
                }
            } catch (FlinkException e) {
                return FutureUtils.completedExceptionally(e);
            }
        }
    

    然后调用internalSubmitJob方法

        private CompletableFuture<Acknowledge> internalSubmitJob(JobGraph jobGraph) {
            log.info("Submitting job {} ({}).", jobGraph.getJobID(), jobGraph.getName());
    
            final CompletableFuture<Acknowledge> persistAndRunFuture = waitForTerminatingJobManager(jobGraph.getJobID(), jobGraph, this::persistAndRunJob)
                .thenApply(ignored -> Acknowledge.get());
    
            return persistAndRunFuture.handleAsync((acknowledge, throwable) -> {
                if (throwable != null) {
                    cleanUpJobData(jobGraph.getJobID(), true);
    
                    final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
                    log.error("Failed to submit job {}.", jobGraph.getJobID(), strippedThrowable);
                    throw new CompletionException(
                        new JobSubmissionException(jobGraph.getJobID(), "Failed to submit job.", strippedThrowable));
                } else {
                    return acknowledge;
                }
            }, getRpcService().getExecutor());
        }
    

    接下来调用persistAndRunJob方法

        private CompletableFuture<Void> persistAndRunJob(JobGraph jobGraph) throws Exception {
            submittedJobGraphStore.putJobGraph(new SubmittedJobGraph(jobGraph));
    
            final CompletableFuture<Void> runJobFuture = runJob(jobGraph);
    
            return runJobFuture.whenComplete(BiConsumerWithException.unchecked((Object ignored, Throwable throwable) -> {
                if (throwable != null) {
                    submittedJobGraphStore.removeJobGraph(jobGraph.getJobID());
                }
            }));
        }
    

    然后是调用runJob方法

        private CompletableFuture<Void> runJob(JobGraph jobGraph) {
            Preconditions.checkState(!jobManagerRunnerFutures.containsKey(jobGraph.getJobID()));
    
            final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = createJobManagerRunner(jobGraph);
    
            jobManagerRunnerFutures.put(jobGraph.getJobID(), jobManagerRunnerFuture);
    
            return jobManagerRunnerFuture
                .thenApply(FunctionUtils.uncheckedFunction(this::startJobManagerRunner))
                .thenApply(FunctionUtils.nullFn())
                .whenCompleteAsync(
                    (ignored, throwable) -> {
                        if (throwable != null) {
                            jobManagerRunnerFutures.remove(jobGraph.getJobID());
                        }
                    },
                    getMainThreadExecutor());
        }
    

    在这个方法里,主要干了两件事

    • 创建JobManagerRunner对象
    • startJobManagerRunner,去启动JobManager

    再看startJobManagerRunner方法

        private JobManagerRunner startJobManagerRunner(JobManagerRunner jobManagerRunner) throws Exception {
            final JobID jobId = jobManagerRunner.getJobGraph().getJobID();
    
            FutureUtils.assertNoException(
                jobManagerRunner.getResultFuture().handleAsync(
                    (ArchivedExecutionGraph archivedExecutionGraph, Throwable throwable) -> {
                        // check if we are still the active JobManagerRunner by checking the identity
                        final CompletableFuture<JobManagerRunner> jobManagerRunnerFuture = jobManagerRunnerFutures.get(jobId);
                        final JobManagerRunner currentJobManagerRunner = jobManagerRunnerFuture != null ? jobManagerRunnerFuture.getNow(null) : null;
                        //noinspection ObjectEquality
                        if (jobManagerRunner == currentJobManagerRunner) {
                            if (archivedExecutionGraph != null) {
                                jobReachedGloballyTerminalState(archivedExecutionGraph);
                            } else {
                                final Throwable strippedThrowable = ExceptionUtils.stripCompletionException(throwable);
    
                                if (strippedThrowable instanceof JobNotFinishedException) {
                                    jobNotFinished(jobId);
                                } else {
                                    jobMasterFailed(jobId, strippedThrowable);
                                }
                            }
                        } else {
                            log.debug("There is a newer JobManagerRunner for the job {}.", jobId);
                        }
    
                        return null;
                    }, getMainThreadExecutor()));
    
            jobManagerRunner.start();
    
            return jobManagerRunner;
        }
    

    这个方法最后调用jobManagerRunner.start(),启动jobManager

    1. 接着看JobManagerRunner的start方法
        //----------------------------------------------------------------------------------------------
        // Lifecycle management
        //----------------------------------------------------------------------------------------------
    
        public void start() throws Exception {
            try {
                leaderElectionService.start(this);
            } catch (Exception e) {
                log.error("Could not start the JobManager because the leader election service did not start.", e);
                throw new Exception("Could not start the leader election service.", e);
            }
        }
    

    然后调用leaderElectionService.start(this);
    leader选举选用的zk,看下ZooKeeperLeaderElectionService的start方法

        @Override
        public void start(LeaderContender contender) throws Exception {
            Preconditions.checkNotNull(contender, "Contender must not be null.");
            Preconditions.checkState(leaderContender == null, "Contender was already set.");
    
            LOG.info("Starting ZooKeeperLeaderElectionService {}.", this);
    
            synchronized (lock) {
    
                client.getUnhandledErrorListenable().addListener(this);
    
                leaderContender = contender;
    
                leaderLatch.addListener(this);
                leaderLatch.start();
    
                cache.getListenable().addListener(this);
                cache.start();
    
                client.getConnectionStateListenable().addListener(listener);
    
                running = true;
            }
        }
    

    然后接着看leaderLatch.start();

        /**
         * Add this instance to the leadership election and attempt to acquire leadership.
         *
         * @throws Exception errors
         */
        public void start() throws Exception
        {
            Preconditions.checkState(state.compareAndSet(State.LATENT, State.STARTED), "Cannot be started more than once");
    
            startTask.set(AfterConnectionEstablished.execute(client, new Runnable()
                    {
                        @Override
                        public void run()
                        {
                            try
                            {
                                internalStart();
                            }
                            finally
                            {
                                startTask.set(null);
                            }
                        }
                    }));
        }
    

    会调用internalStart方法

        private synchronized void internalStart()
        {
            if ( state.get() == State.STARTED )
            {
                client.getConnectionStateListenable().addListener(listener);
                try
                {
                    reset();
                }
                catch ( Exception e )
                {
                    ThreadUtils.checkInterrupted(e);
                    log.error("An error occurred checking resetting leadership.", e);
                }
            }
        }
    

    接着看reset();方法

        @VisibleForTesting
        void reset() throws Exception
        {
            setLeadership(false);
            setNode(null);
    
            BackgroundCallback callback = new BackgroundCallback()
            {
                @Override
                public void processResult(CuratorFramework client, CuratorEvent event) throws Exception
                {
                    if ( debugResetWaitLatch != null )
                    {
                        debugResetWaitLatch.await();
                        debugResetWaitLatch = null;
                    }
    
                    if ( event.getResultCode() == KeeperException.Code.OK.intValue() )
                    {
                        setNode(event.getName());
                        if ( state.get() == State.CLOSED )
                        {
                            setNode(null);
                        }
                        else
                        {
                            getChildren();
                        }
                    }
                    else
                    {
                        log.error("getChildren() failed. rc = " + event.getResultCode());
                    }
                }
            };
            client.create().creatingParentContainersIfNeeded().withProtection().withMode(CreateMode.EPHEMERAL_SEQUENTIAL).inBackground(callback).forPath(ZKPaths.makePath(latchPath, LOCK_NAME), LeaderSelector.getIdBytes(id));
        }
    

    然后看setLeadership(false);

        private synchronized void setLeadership(boolean newValue)
        {
            boolean oldValue = hasLeadership.getAndSet(newValue);
    
            if ( oldValue && !newValue )
            { // Lost leadership, was true, now false
                listeners.forEach(new Function<LeaderLatchListener, Void>()
                    {
                        @Override
                        public Void apply(LeaderLatchListener listener)
                        {
                            listener.notLeader();
                            return null;
                        }
                    });
            }
            else if ( !oldValue && newValue )
            { // Gained leadership, was false, now true
                listeners.forEach(new Function<LeaderLatchListener, Void>()
                    {
                        @Override
                        public Void apply(LeaderLatchListener input)
                        {
                            input.isLeader();
                            return null;
                        }
                    });
            }
    
            notifyAll();
        }
    

    被选举为leader,会调用input.isLeader(); 代码层级调用比较多,就不一一列举代码内容,把调用方法在这里罗列出来。
    然后会调用leaderContender.grantLeadership(issuedLeaderSessionID); 再调用JobManagerRunner的grantLeadership方法

        @Override
        public void grantLeadership(final UUID leaderSessionID) {
            synchronized (lock) {
                if (shutdown) {
                    log.info("JobManagerRunner already shutdown.");
                    return;
                }
    
                leadershipOperation = leadershipOperation.thenCompose(
                    (ignored) -> {
                        synchronized (lock) {
                            return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
                        }
                    });
    
                handleException(leadershipOperation, "Could not start the job manager.");
            }
        }
    
    1. JobManagerRunner的grantLeadership方法
        //----------------------------------------------------------------------------------------------
        // Leadership methods
        //----------------------------------------------------------------------------------------------
    
        @Override
        public void grantLeadership(final UUID leaderSessionID) {
            synchronized (lock) {
                if (shutdown) {
                    log.info("JobManagerRunner already shutdown.");
                    return;
                }
    
                leadershipOperation = leadershipOperation.thenCompose(
                    (ignored) -> {
                        synchronized (lock) {
                            return verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);
                        }
                    });
    
                handleException(leadershipOperation, "Could not start the job manager.");
            }
        }
    

    接着会调用verifyJobSchedulingStatusAndStartJobManager(leaderSessionID);

        private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
            final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();
    
            return jobSchedulingStatusFuture.thenCompose(
                jobSchedulingStatus -> {
                    if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
                        return jobAlreadyDone();
                    } else {
                        return startJobMaster(leaderSessionId);
                    }
                });
        }
    

    作业刚开始被调度,会调用JobManagerRunner.startJobMaster(leaderSessionId);

        private CompletionStage<Void> startJobMaster(UUID leaderSessionId) {
            log.info("JobManager runner for job {} ({}) was granted leadership with session id {} at {}.",
                jobGraph.getName(), jobGraph.getJobID(), leaderSessionId, getAddress());
    
            try {
                runningJobsRegistry.setJobRunning(jobGraph.getJobID());
            } catch (IOException e) {
                return FutureUtils.completedExceptionally(
                    new FlinkException(
                        String.format("Failed to set the job %s to running in the running jobs registry.", jobGraph.getJobID()),
                        e));
            }
    
            final CompletableFuture<Acknowledge> startFuture;
            try {
                startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
            } catch (Exception e) {
                return FutureUtils.completedExceptionally(new FlinkException("Failed to start the JobMaster.", e));
            }
    
            final CompletableFuture<JobMasterGateway> currentLeaderGatewayFuture = leaderGatewayFuture;
            return startFuture.thenAcceptAsync(
                (Acknowledge ack) -> confirmLeaderSessionIdIfStillLeader(leaderSessionId, currentLeaderGatewayFuture),
                executor);
        }
    

    然后调用jobMasterService.start(new JobMasterId(leaderSessionId)); JobMasterService是个接口,具体实现类是JobMaster,那最终会调用JobMaster.start方法

        /**
         * Start the rpc service and begin to run the job.
         *
         * @param newJobMasterId The necessary fencing token to run the job
         * @return Future acknowledge if the job could be started. Otherwise the future contains an exception
         */
        public CompletableFuture<Acknowledge> start(final JobMasterId newJobMasterId) throws Exception {
            // make sure we receive RPC and async calls
            start();
    
            return callAsyncWithoutFencing(() -> startJobExecution(newJobMasterId), RpcUtils.INF_TIMEOUT);
        }
    

    最后调用startJobExecution(newJobMasterId),至此,JobMaster真正的开始启动服务并开始调度job。
    感兴趣的同学可以接着再往下看startJobExecution方法中的startJobMasterServices和resetAndStartScheduler方法,主要的东西都在这两个方法中。

    相关文章

      网友评论

        本文标题:Flink作业提交(二)--- 源码分析JobMaster如何r

        本文链接:https://www.haomeiwen.com/subject/ekqytltx.html