美文网首页
flink1.8 JobGraph生成流程

flink1.8 JobGraph生成流程

作者: todd5167 | 来源:发表于2019-09-27 20:13 被阅读0次

    用户编写的可执行Jar包,在本地客户端生成JobGraph,先看下flink生成并提交JobGraph的大致流程。

    提交过程

    1. 首先,生成PackagedProgram。
    2. 接着,通过PackagedProgramUtils从PackagedProgram中提取JobGraph。
    3. 最后,通过ClusterDescriptor将JobGraph提交到集群中。

    关于任务提交这部分可以参考flinkStreamSQL任务提交流程代码,比直接看源码要简单很多。此处,我参考flink1.8源码进行说明。

    生成PackagedProgram

    buildProgram通过提取命令行中传递的参数,将用户程序jarFile、程序参数programArgs、程序依赖的classpaths传递给PackagedProgram。entryPointClass如果不传递,可以通过程序解析出来。

    CliFrontend#buildProgram
    
    PackagedProgram buildProgram(ProgramOptions options) throws FileNotFoundException, ProgramInvocationException {
        String[] programArgs = options.getProgramArgs();
        String jarFilePath = options.getJarFilePath();
        List<URL> classpaths = options.getClasspaths();
    
        File jarFile = new File(jarFilePath);
        // main方法类
        String entryPointClass = options.getEntryPointClassName();
    
        PackagedProgram program = entryPointClass == null ?
                new PackagedProgram(jarFile, classpaths, programArgs) :
                new PackagedProgram(jarFile, classpaths, entryPointClass, programArgs);
        return program;
    }
    
    
    public PackagedProgram(File jarFile, List<URL> classpaths, @Nullable String entryPointClassName, String... args) throws ProgramInvocationException {
        //  用户jar路径 
        URL jarFileUrl = jarFile.getAbsoluteFile().toURI().toURL();
        this.jarFile = jarFileUrl;
        this.args = args == null ? new String[0] : args;
    
        // 如果没有传递entryPointClassName, 可以通过manifest提取
        if (entryPointClassName == null) {
            entryPointClassName = getEntryPointClassNameFromJar(jarFileUrl);
        }
        
        //抽取当前jar中包含的jar,包含/lib且名称以.jar结尾
        this.extractedTempLibraries = extractContainedLibraries(jarFileUrl);
        this.classpaths = classpaths;
        // 使用FlinkUserCodeClassLoaders加载getAllLibraries中的jar
        this.userCodeClassLoader = JobWithJars.buildUserCodeClassLoader(getAllLibraries(), classpaths, getClass().getClassLoader());
    
        // 使用FlinkUserCodeClassLoaders加载器加载该类,然后切换回父类加载器
        this.mainClass = loadMainClass(entryPointClassName, userCodeClassLoader);
    
        // if the entry point is a program, instantiate the class and get the plan
        if (Program.class.isAssignableFrom(this.mainClass)) {
            Program prg = null;
            try {
                prg = InstantiationUtil.instantiate(this.mainClass.asSubclass(Program.class), Program.class);
            } catch (Exception e) {
                // validate that the class has a main method at least.
                // the main method possibly instantiates the program properly
                if (!hasMainMethod(mainClass)) {
                    throw new ProgramInvocationException("The given program class implements the " +
                            Program.class.getName() + " interface, but cannot be instantiated. " +
                            "It also declares no main(String[]) method as alternative entry point", e);
                }
            } catch (Throwable t) {
                throw new ProgramInvocationException("Error while trying to instantiate program class.", t);
            }
            this.program = prg;
        } else if (hasMainMethod(mainClass)) {
            //用户类不为Program的子类
            this.program = null;
        } else {
            throw new ProgramInvocationException("The given program class neither has a main(String[]) method, nor does it implement the " +
                    Program.class.getName() + " interface.");
        }
    }
    
    提取JobGraph

    通过生成的PackagedProgram、flinkconf中参数配置、任务并行度生成jobGraph,并通过clusterDescriptor进行提交。

    CliFrontend#runProgram
    
    ## 部署任务到新集群
    if (clusterId == null && runOptions.getDetachedMode()) {
        int parallelism = runOptions.getParallelism() == -1 ? defaultParallelism : runOptions.getParallelism();
        // 构建jobGraph
        final JobGraph jobGraph = PackagedProgramUtils.createJobGraph(program, configuration, parallelism);
    
        final ClusterSpecification clusterSpecification = customCommandLine.getClusterSpecification(commandLine);
        //部署
        client = clusterDescriptor.deployJobCluster(
            clusterSpecification,
            jobGraph,
            runOptions.getDetachedMode());
    
        logAndSysout("Job has been submitted with JobID " + jobGraph.getJobID());
    
        try {
            client.shutdown();
        } catch (Exception e) {
            LOG.info("Could not properly shut down the client.", e);
        }
    }
    
    ## createJobGraph
    
    public static JobGraph createJobGraph(
            PackagedProgram packagedProgram,
            Configuration configuration,
            int defaultParallelism,
            @Nullable JobID jobID) throws ProgramInvocationException {
        Thread.currentThread().setContextClassLoader(packagedProgram.getUserCodeClassLoader());
        // 优化器
        final Optimizer optimizer = new Optimizer(new DataStatistics(), new DefaultCostEstimator(), configuration);
        final FlinkPlan flinkPlan;
    
        final OptimizerPlanEnvironment optimizerPlanEnvironment = new OptimizerPlanEnvironment(optimizer);
    
        optimizerPlanEnvironment.setParallelism(defaultParallelism);
        
        // 形成StreamingPlan
        flinkPlan = optimizerPlanEnvironment.getOptimizedPlan(packagedProgram);
    
        final JobGraph jobGraph;
    
        if (flinkPlan instanceof StreamingPlan) {
            // 从StreamingPlan中提取jobGraph
            jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(jobID);
            jobGraph.setSavepointRestoreSettings(packagedProgram.getSavepointSettings());
        } else {
            final JobGraphGenerator jobGraphGenerator = new JobGraphGenerator(configuration);
            jobGraph = jobGraphGenerator.compileJobGraph((OptimizedPlan) flinkPlan, jobID);
        }
        //  用户Jar中的依赖jar包传递给jobGraph
        for (URL url : packagedProgram.getAllLibraries()) {
            try {
                jobGraph.addJar(new Path(url.toURI()));
            } catch (URISyntaxException e) {
                throw new ProgramInvocationException("Invalid URL for jar file: " + url + '.', jobGraph.getJobID(), e);
            }
        }
        // classpaths的传递有多中方式:
        // 1.通过packagedProgram构造函数直接传递
        // 2.先产生jobGraph,然后调用setClasspaths绑定。
        // 3. ContextEnvironment.getClasspaths.add(url)传递。
        jobGraph.setClasspaths(packagedProgram.getClasspaths());ContextEnvironment
    
        return jobGraph;
    }
    
    
    ## flinkstreamsql中classpath传递
    main方法中通过env.registerCachedFile(url.getPath(),  classFileName, true);将插件包的url存放到cacheFile中,当提交时取出并绑定给jobGraph。
    
    
    private static void fillJobGraphClassPath(JobGraph jobGraph) throws MalformedURLException {
        Map<String, String> jobCacheFileConfig = jobGraph.getJobConfiguration().toMap();
        Set<String> classPathKeySet = Sets.newHashSet();
    
        for(Map.Entry<String, String> tmp : jobCacheFileConfig.entrySet()){
            if(Strings.isNullOrEmpty(tmp.getValue())){
                continue;
            }
    
            if(tmp.getValue().startsWith("class_path")){
                //DISTRIBUTED_CACHE_FILE_NAME_1
                //DISTRIBUTED_CACHE_FILE_PATH_1
                String key = tmp.getKey();
                String[] array = key.split("_");
                if(array.length < 5){
                    continue;
                }
    
                array[3] = "PATH";
                classPathKeySet.add(StringUtils.join(array, "_"));
            }
        }
    
        for(String key : classPathKeySet){
            String pathStr = jobCacheFileConfig.get(key);
            jobGraph.getClasspaths().add(new URL("file:" + pathStr));
        }
    }
    
    
    形成StreamingPlan

    在提取StreamingPlan时,需要先构建OptimizerPlanEnvironment,由该Environment通过反射执行用户Main类。在Main方法中,通过执行execute形成StreamingPlan并主动抛出ProgramAbortException,被上层捕获到。

         final OptimizerPlanEnvironment optimizerPlanEnvironment = new OptimizerPlanEnvironment(optimizer);
         optimizerPlanEnvironment.setParallelism(defaultParallelism);
         ## 抽取StreamingPlan
         flinkPlan = optimizerPlanEnvironment.getOptimizedPlan(packagedProgram);
    
        public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocationException {
                   
             // ExecutionEnvironmentFactory创建的对象为OptimizerPlanEnvironment
            setAsContext();
            try {
             // 在OptimizerPlanEnvironment环境下执行Main方法,形成optimizerPlan
                prog.invokeInteractiveModeForExecution();
            }
            catch (ProgramInvocationException e) {
                throw e;
            }
            catch (Throwable t) {
                // 捕获Main方法中抛出的异常,获取StreamingPlan
                if (optimizerPlan != null) {
                    return optimizerPlan;
                } else {
                    throw new ProgramInvocationException("The program caused an error: ", t);
                }
            }
            finally {
                unsetAsContext();
            }
        }
    

    Main方法中的execute()方法:

    • 构造出streamGraph
    • 填充到Environment中
    • 主动抛出异常让上层捕获
        @Override
        public JobExecutionResult execute(String jobName) throws Exception {
            //生成streamGraph
            StreamGraph streamGraph = getStreamGraph();
            streamGraph.setJobName(jobName);
        
            transformations.clear();
    
            if (env instanceof OptimizerPlanEnvironment) {
                ((OptimizerPlanEnvironment) env).setPlan(streamGraph);
            } else if (env instanceof PreviewPlanEnvironment) {
                ((PreviewPlanEnvironment) env).setPreview(streamGraph.getStreamingPlanAsJSON());
            }
    
            throw new OptimizerPlanEnvironment.ProgramAbortException();
        }
    

    StreamExecutionEnvironment.getExecutionEnvironment()方法获取执行环境:

    • ContextEnvironment:Execution Environment for remote execution with the Client.物理执行环境。
    • LocalEnvironment:本地调试使用的执行环境。
    • StreamPlanEnvironment: 主要用来生成streaming job。

    由于在getOptimizedPlan中调用了setAsContext方法,ExecutionEnvironmentFactory创建OptimizerPlanEnvironment,因此返回StreamPlanEnvironment。

        public static StreamExecutionEnvironment getExecutionEnvironment() {
            if (contextEnvironmentFactory != null) {
                return contextEnvironmentFactory.createExecutionEnvironment();
            }
    
            // because the streaming project depends on "flink-clients" (and not the other way around)
            // we currently need to intercept the data set environment and create a dependent stream env.
            // this should be fixed once we rework the project dependencies
    
            ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
            if (env instanceof ContextEnvironment) {
                return new StreamContextEnvironment((ContextEnvironment) env);
            } else if (env instanceof OptimizerPlanEnvironment || env instanceof PreviewPlanEnvironment) {
                return new StreamPlanEnvironment(env);
            } else {
                return createLocalEnvironment();
            }
        }
      // 根据contextEnvironmentFactory生成ExecutionEnvironment
       public static ExecutionEnvironment getExecutionEnvironment() {
            return (ExecutionEnvironment)(contextEnvironmentFactory == null ? createLocalEnvironment() : contextEnvironmentFactory.createExecutionEnvironment());
        }
    

    总之,根据用户启动任务的环境,构造相应的执行环境。通过反射执行用户编写的Main方法,并抽取出StreamPlan,Main在执行时使用的是外部创建出的执行环境。

    相关文章

      网友评论

          本文标题:flink1.8 JobGraph生成流程

          本文链接:https://www.haomeiwen.com/subject/mwozyctx.html