美文网首页Flink
Flink-application运行模式详解

Flink-application运行模式详解

作者: 飞_侠 | 来源:发表于2022-03-05 15:53 被阅读0次
    image.png

    本文目的:

    针对main()方法在ApplicationClusterEntryPoint入口类中执行,从源码角度解析

    首先和其他集群比对

    例如StandaloneSessionClusterEntryPoint模式:main()方法是在客户端执行的。当我们通过如下命令提交任务时:

    $ ./bin/flink run examples/streaming/WordCount.jar
    

    执行flink命令,参数是run,将最终调用CliFrontend.java类的main()方法:核心逻辑如下
    会通过解析将用户程序生成PackagedProgram类型的对象,PackageProgram类型的对象主要封装如下信息:

    image.png
     /**
         * Executions the run action.
         *
         * @param args Command line arguments for the run action.
         */
        protected void run(String[] args) throws Exception {
            LOG.info("Running 'run' command.");
    
            final Options commandOptions = CliFrontendParser.getRunCommandOptions();
            final CommandLine commandLine = getCommandLine(commandOptions, args, true);
    
            // evaluate help flag
            if (commandLine.hasOption(HELP_OPTION.getOpt())) {
                CliFrontendParser.printHelpForRun(customCommandLines);
                return;
            }
    
            final CustomCommandLine activeCommandLine =
                    validateAndGetActiveCommandLine(checkNotNull(commandLine));
    
            final ProgramOptions programOptions = ProgramOptions.create(commandLine);
    
            final List<URL> jobJars = getJobJarAndDependencies(programOptions);
    
            final Configuration effectiveConfiguration =
                    getEffectiveConfiguration(activeCommandLine, commandLine, programOptions, jobJars);
    
            LOG.debug("Effective executor configuration: {}", effectiveConfiguration);
    
            try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
                /**
                * han_pf
                * 执行用户程序,通过反射执行提交Job的main()方法,将用户程序转换成StreamGraph,并生成JobGraph提交到集群。
                */
                executeProgram(effectiveConfiguration, program);
            }
    
    
    try {
                    /**
                    * han_pf
                    * 通过反射调用提交job的main()方法。
                    */
                    program.invokeInteractiveModeForExecution();
                } finally {
                    ContextEnvironment.unsetAsContext();
                    StreamContextEnvironment.unsetAsContext();
                }
            } finally {
                Thread.currentThread().setContextClassLoader(contextClassLoader);
            }
    
        }
    

    Yarn管理下的Application模式

    而对于Yarn管理下的Application模式,客户端只是进行jar包上传,以上executeProgram()将在集群侧执行(准确来说是Dispatcher启动过程中执行),分析如下:
    首先,提交作业启动集群(yarn和K8s才支持如下命令,Standalone集群需要直接提交作业到JM上)

    $ ./bin/flink run-application -t yarn-application ./examples/streaming/TopSpeedWindowing.jar
    
     protected void runApplication(String[] args) throws Exception {
            LOG.info("Running 'run-application' command.");
    
            final Options commandOptions = CliFrontendParser.getRunCommandOptions();
            final CommandLine commandLine = getCommandLine(commandOptions, args, true);
    
            if (commandLine.hasOption(HELP_OPTION.getOpt())) {
                CliFrontendParser.printHelpForRunApplication(customCommandLines);
                return;
            }
    
            final CustomCommandLine activeCommandLine =
                    validateAndGetActiveCommandLine(checkNotNull(commandLine));
    
            final ApplicationDeployer deployer =
                    new ApplicationClusterDeployer(clusterClientServiceLoader);
    
            final ProgramOptions programOptions;
            final Configuration effectiveConfiguration;
    
            。。。。
    
            final ApplicationConfiguration applicationConfiguration =
                    new ApplicationConfiguration(
                            programOptions.getProgramArgs(), programOptions.getEntryPointClassName());
          //调用ApplicationClusterDeployer.run()方法部署程序到集群中
            deployer.run(effectiveConfiguration, applicationConfiguration);
        }
    

    ApplicationClusterDeployer:

     public <ClusterID> void run(
                final Configuration configuration,
                final ApplicationConfiguration applicationConfiguration)
                throws Exception {
            checkNotNull(configuration);
            checkNotNull(applicationConfiguration);
    
            LOG.info("Submitting application in 'Application Mode'.");
    
            final ClusterClientFactory<ClusterID> clientFactory =
                    clientServiceLoader.getClusterClientFactory(configuration);
            try (final ClusterDescriptor<ClusterID> clusterDescriptor =
                    clientFactory.createClusterDescriptor(configuration)) {
                final ClusterSpecification clusterSpecification =
                        clientFactory.getClusterSpecification(configuration);
    
                clusterDescriptor.deployApplicationCluster(
                        clusterSpecification, applicationConfiguration);
            }
        }
    

    至此,客户端并未执行StreamGraph和JobGraph的转换,只是将用户程序的JAR上传。

    Standalone管理下的Application模式

    1. 第一步准备jar并提交作业至JobManager
    $ ./bin/standalone-job.sh start --job-classname org.apache.flink.streaming.examples.windowing.WordCount
    
    
    1. 第二步启动TM
    $ ./bin/taskmanager.sh start
    

    standalone-job.sh:

    USAGE="Usage: standalone-job.sh ((start|start-foreground))|stop [args]"
    
    STARTSTOP=$1
    ENTRY_POINT_NAME="standalonejob"
    
    if [[ $STARTSTOP != "start" ]] && [[ $STARTSTOP != "start-foreground" ]] && [[ $STARTSTOP != "stop" ]]; then
      echo $USAGE
      exit 1
    fi
    
    bin=`dirname "$0"`
    bin=`cd "$bin"; pwd`
    
    . "$bin"/config.sh
    
    # Startup parameters
    ARGS=("${@:2}")
    
    if [[ $STARTSTOP == "start" ]] || [[ $STARTSTOP == "start-foreground" ]]; then
        # Add cluster entry point specific JVM options
        export FLINK_ENV_JAVA_OPTS="${FLINK_ENV_JAVA_OPTS} ${FLINK_ENV_JAVA_OPTS_JM}"
        parseJmArgsAndExportLogs "${ARGS[@]}"
    
        if [ ! -z "${DYNAMIC_PARAMETERS}" ]; then
            ARGS=(${DYNAMIC_PARAMETERS[@]} "${ARGS[@]}")
        fi
    fi
    
    ARGS=("--configDir" "${FLINK_CONF_DIR}" "${ARGS[@]}")
    
    if [[ $STARTSTOP == "start-foreground" ]]; then
        exec "${FLINK_BIN_DIR}"/flink-console.sh ${ENTRY_POINT_NAME} "${ARGS[@]}"
    else
        "${FLINK_BIN_DIR}"/flink-daemon.sh ${STARTSTOP} ${ENTRY_POINT_NAME} "${ARGS[@]}"
    fi
    

    最终调用flink-daemon.sh 并传standalonejob参数:
    flink-daemon.sh:

    (standalonejob)
            CLASS_TO_RUN=org.apache.flink.container.entrypoint.StandaloneApplicationClusterEntryPoint
    

    StandaloneApplicationClusterEntryPoint分析如下:

    image.png

    standaloneApplicationClusterEntryPoint:

    public static void main(String[] args) {
           // startup checks and logging
           EnvironmentInformation.logEnvironmentInfo(
                   LOG, StandaloneApplicationClusterEntryPoint.class.getSimpleName(), args);
           SignalHandler.register(LOG);
           JvmShutdownSafeguard.installAsShutdownHook(LOG);
    
           final StandaloneApplicationClusterConfiguration clusterConfiguration =
                   ClusterEntrypointUtils.parseParametersOrExit(
                           args,
                           new StandaloneApplicationClusterConfigurationParserFactory(),
                           StandaloneApplicationClusterEntryPoint.class);
    
           Configuration configuration = loadConfigurationFromClusterConfig(clusterConfiguration);
          /***************************/
           PackagedProgram program = null;
           try {
               /**
               * han_pf
               * session模式的客户端会生成这个对象
               */
               program = getPackagedProgram(clusterConfiguration, configuration);
           } catch (Exception e) {
               LOG.error("Could not create application program.", e);
               System.exit(1);
           }
    
           try {
               configureExecution(configuration, program);
           } catch (Exception e) {
               LOG.error("Could not apply application configuration.", e);
               System.exit(1);
           }
           /***************************/
    
           StandaloneApplicationClusterEntryPoint entrypoint =
                   new StandaloneApplicationClusterEntryPoint(configuration, program);
    
           ClusterEntrypoint.runClusterEntrypoint(entrypoint);
       }
    

    如上源码分析流程图,在ClusterEntrypoint.runClusterEntrypoint()方法执行前并未调用用户程序的main()方法执行,最终调用main()方法执行的是在启动JobManager的Dispatcher组件过程中调用的。
    在创建DefaultDispatcherRunner对象后执行leader选举,选举成功最终回调DefaultDispatcherRunner的grantLeadership()方法:

     @Override
        public void grantLeadership(UUID leaderSessionID) {
            runActionIfRunning(
                    () -> {
                        LOG.info(
                                "{} was granted leadership with leader id {}. Creating new {}.",
                                getClass().getSimpleName(),
                                leaderSessionID,
                                DispatcherLeaderProcess.class.getSimpleName());
                        /**
                        * han_pf
                        *启动dispatcher
                        */
                        startNewDispatcherLeaderProcess(leaderSessionID);
                    });
        }
    

    startNewDispatcherLeaderProcess()继续调用AbstractDispatcherLeaderProcess的onStart()方法:

       public final void start() {
            runIfStateIs(State.CREATED, this::startInternal);
        }
    
        private void startInternal() {
            log.info("Start {}.", getClass().getSimpleName());
            state = State.RUNNING;
            /**
            * han_pf
            *执行实现类的onStart方法,实现类有两个SessionDispatcherLeaderProcess和JobDispatcherLeaderProcess,
             * 此时是session模式,所以看SessionDispatcherLeaderProcess
            */
            onStart();
        }
    
    image.png

    Application模式下调用JobDispatcherLeaderProcess类的onStart()方法:

        protected void onStart() {
            final DispatcherGatewayService dispatcherService =
                    /**
                    * han_pf
                    * Application模式走不同分支,ApplicationDispatcherGatewayServiceFactory,DefaultDispatcherGatewayServiceFactory
                    */
                    dispatcherGatewayServiceFactory.create(
                            DispatcherId.fromUuid(getLeaderSessionId()),
                            Collections.singleton(jobGraph),
                            ThrowingJobGraphWriter.INSTANCE);
    
            completeDispatcherSetup(dispatcherService);
        }
    

    ApplicationDispatcherGatewayServiceFactory:

      public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
                DispatcherId fencingToken,
                Collection<JobGraph> recoveredJobs,
                JobGraphWriter jobGraphWriter) {
    
            final List<JobID> recoveredJobIds = getRecoveredJobIds(recoveredJobs);
    
            final Dispatcher dispatcher;
            try {
                dispatcher =
                        dispatcherFactory.createDispatcher(
                                rpcService,
                                fencingToken,
                                recoveredJobs,
                                /**
                                * han_pf
                                * application模式main方法的执行入口
                                */
                                (dispatcherGateway, scheduledExecutor, errorHandler) -> new ApplicationDispatcherBootstrap(application, recoveredJobIds,configuration,dispatcherGateway, scheduledExecutor,errorHandler)
                                ,
                                PartialDispatcherServicesWithJobGraphStore.from(
                                        partialDispatcherServices, jobGraphWriter));
            } catch (Exception e) {
                throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
            }
    
            dispatcher.start();
    
            return DefaultDispatcherGatewayService.from(dispatcher);
        }
    

    调用new ApplicationDispatcherBootstrap()创建对象:

    public ApplicationDispatcherBootstrap(
                final PackagedProgram application,
                final Collection<JobID> recoveredJobIds,
                final Configuration configuration,
                final DispatcherGateway dispatcherGateway,
                final ScheduledExecutor scheduledExecutor,
                final FatalErrorHandler errorHandler) {
            this.configuration = checkNotNull(configuration);
            this.recoveredJobIds = checkNotNull(recoveredJobIds);
            this.application = checkNotNull(application);
            this.errorHandler = checkNotNull(errorHandler);
            /**
            * han_pf
            * 执行用户程序
            */
            this.applicationCompletionFuture =
                    fixJobIdAndRunApplicationAsync(dispatcherGateway, scheduledExecutor);
    
            this.clusterShutdownFuture = runApplicationAndShutdownClusterAsync(dispatcherGateway);
        }
    
    
    private void runApplicationEntryPoint(
                final CompletableFuture<List<JobID>> jobIdsFuture,
                final Set<JobID> tolerateMissingResult,
                final DispatcherGateway dispatcherGateway,
                final ScheduledExecutor scheduledExecutor,
                final boolean enforceSingleJobExecution) {
            try {
                final List<JobID> applicationJobIds = new ArrayList<>(recoveredJobIds);
    
                final PipelineExecutorServiceLoader executorServiceLoader =
                        new EmbeddedExecutorServiceLoader(
                                applicationJobIds, dispatcherGateway, scheduledExecutor);
                /**
                * han_pf
                * 跟客户端CliFrontend.executeProgram调用同一个方法。
                */
                ClientUtils.executeProgram(
                        executorServiceLoader,
                        configuration,
                        application,
                        enforceSingleJobExecution,
                        true /* suppress sysout */);
    
                if (applicationJobIds.isEmpty()) {
                    jobIdsFuture.completeExceptionally(
                            new ApplicationExecutionException(
                                    "The application contains no execute() calls."));
                } else {
                    jobIdsFuture.complete(applicationJobIds);
                }
            } catch (Throwable t) {
                
            }
        }
    
    

    至此在集群侧将执行用户程序main()方法进行StreamGraph及JobGraph的转换。

    相关文章

      网友评论

        本文标题:Flink-application运行模式详解

        本文链接:https://www.haomeiwen.com/subject/atzvrrtx.html