美文网首页flink
Flink源码解析之Job提交(下)

Flink源码解析之Job提交(下)

作者: 小C菜鸟 | 来源:发表于2018-03-17 17:50 被阅读75次

    上一篇文章我们讲解了通过命令行将一个Job提交到TaskManager的整体过程,但是我们中间忽略了一些细节,比如Job提交到集群的哪些节点,JobGraph是什么,它是如何生成的?JobClient又是如何将Job提交到集群中的等等,本文会为你一一解决这些问题。

    Flink运行时环境

    Flink运行时主要包含两种类型的处理器:

    • JobManager: 主要负责协调分布式执行。调度任务,协调Checkpoint,协调故障时容错功能等。
    • TaskManager: 执行数据流的Task(或更具体地说,子任务),并缓冲和交换数据流。

    根据JobManager和TaskManager的分工和名称,应该可以很显然的看出JobClient提交Job到JobManager节点上,并通过它将子任务分配到TaskManager上执行。

    交互模式提交Job

    在通过命令行提交Job时,会调用CluterClient的run方法去执行提交逻辑,而且分为两种方式,交互模式和非交互模式:

        public JobSubmissionResult run(PackagedProgram prog, int parallelism) throws ProgramInvocationException, ProgramMissingJobException {
            Thread.currentThread().setContextClassLoader(prog.getUserCodeClassLoader());
            if (prog.isUsingProgramEntryPoint()) { // 如果包含入口类(非交互模式提交Job)
    
                // JobWithJars是一个Flink数据流计划,包含了jar中所有的类,以及用于加载用户代码的ClassLoader
                final JobWithJars jobWithJars;
                if (hasUserJarsInClassPath(prog.getAllLibraries())) {
                    jobWithJars = prog.getPlanWithoutJars();
                } else {
                    jobWithJars = prog.getPlanWithJars();
                }
    
                return run(jobWithJars, parallelism, prog.getSavepointSettings());
            } else if (prog.isUsingInteractiveMode()) { // 使用交互模式提交Job
                log.info("Starting program in interactive mode");
    
                final List<URL> libraries;
                if (hasUserJarsInClassPath(prog.getAllLibraries())) {
                    libraries = Collections.emptyList();
                } else {
                    libraries = prog.getAllLibraries();
                }
    
                ContextEnvironmentFactory factory = new ContextEnvironmentFactory(this, libraries,
                    prog.getClasspaths(), prog.getUserCodeClassLoader(), parallelism, isDetached(),
                    prog.getSavepointSettings());
                ContextEnvironment.setAsContext(factory);
    
                try {
                    // 调用main方法
                    prog.invokeInteractiveModeForExecution();
                    if (lastJobExecutionResult == null && factory.getLastEnvCreated() == null) {
                        throw new ProgramMissingJobException("The program didn't contain a Flink job.");
                    }
                    if (isDetached()) {
                        // in detached mode, we execute the whole user code to extract the Flink job, afterwards we run it here
                        return ((DetachedEnvironment) factory.getLastEnvCreated()).finalizeExecute();
                    } else {
                        // in blocking mode, we execute all Flink jobs contained in the user code and then return here
                        return this.lastJobExecutionResult;
                    }
                } finally {
                    ContextEnvironment.unsetContext();
                }
            } else {
                throw new ProgramInvocationException("PackagedProgram does not have a valid invocation mode.");
            }
        }
    

    而实际中,大家可能都是采用交互模式提交作业,在提交的jar包中包含mainClass。以Flink的流处理示例WordCount为例:

        public static void main(String[] args) throws Exception {
    
            // Checking input parameters
            final ParameterTool params = ParameterTool.fromArgs(args);
    
            // set up the execution environment
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
            // make parameters available in the web interface
            env.getConfig().setGlobalJobParameters(params);
    
            // get input data
            DataStream<String> text;
            if (params.has("input")) {
                // read the text file from given input path
                text = env.readTextFile(params.get("input"));
            } else {
                System.out.println("Executing WordCount example with default input data set.");
                System.out.println("Use --input to specify file input.");
                // get default test text data
                text = env.fromElements(WordCountData.WORDS);
            }
    
            DataStream<Tuple2<String, Integer>> counts =
            // split up the lines in pairs (2-tuples) containing: (word,1)
            text.flatMap(new Tokenizer())
            // group by the tuple field "0" and sum up tuple field "1"
                    .keyBy(0).sum(1);
    
            // emit result
            if (params.has("output")) {
                counts.writeAsText(params.get("output"));
            } else {
                System.out.println("Printing result to stdout. Use --output to specify output path.");
                counts.print();
            }
    
            // execute program
            env.execute("Streaming WordCount");
        }
    

    ClusterClient中的prog.invokeInteractiveModeForExecution();其实就是调用WordCount的main方法。main方法的逻辑很简单,分为两部分:构建和执行数据流。本节重点讲执行数据流,也就是最后一行的env.execute("Streaming WordCount");
    以本地流执行环境(LocalStreamEnvironment)来看一下execute方法执行了哪些逻辑

        @Override
        public JobExecutionResult execute(String jobName) throws Exception {
            // transform the streaming program into a JobGraph
            // 生成流图
            StreamGraph streamGraph = getStreamGraph();
            streamGraph.setJobName(jobName);
    
            // 将流图转换成作业图
            JobGraph jobGraph = streamGraph.getJobGraph();
    
            Configuration configuration = new Configuration();
            configuration.addAll(jobGraph.getJobConfiguration());
    
            configuration.setLong(TaskManagerOptions.MANAGED_MEMORY_SIZE, -1L);
            configuration.setInteger(ConfigConstants.TASK_MANAGER_NUM_TASK_SLOTS, jobGraph.getMaximumParallelism());
    
            // add (and override) the settings with what the user defined
            configuration.addAll(this.conf);
    
            if (LOG.isInfoEnabled()) {
                LOG.info("Running job on local embedded Flink mini cluster");
            }
    
            LocalFlinkMiniCluster exec = new LocalFlinkMiniCluster(configuration, true);
            try {
                exec.start();
                // 提交作业图
                return exec.submitJobAndWait(jobGraph, getConfig().isSysoutLoggingEnabled());
            }
            finally {
                transformations.clear();
                exec.stop();
            }
        }
    

    可以看出主要分为三步:生成流图,生成作业图和提交Job。首先看下提交Job的逻辑

      @throws(classOf[JobExecutionException])
      def submitJobAndWait(
          jobGraph: JobGraph,
          printUpdates: Boolean)
        : JobExecutionResult = {
        submitJobAndWait(jobGraph, printUpdates, timeout)
      }
    
      @throws(classOf[JobExecutionException])
      def submitJobAndWait(
          jobGraph: JobGraph,
          printUpdates: Boolean,
          timeout: FiniteDuration)
        : JobExecutionResult = {
    
        val clientActorSystem = startJobClientActorSystem(jobGraph.getJobID)
    
        val userCodeClassLoader =
          try {
            createUserCodeClassLoader(
              jobGraph.getUserJars,
              jobGraph.getClasspaths,
              Thread.currentThread().getContextClassLoader)
          } catch {
            case e: Exception => throw new JobExecutionException(
              jobGraph.getJobID,
              "Could not create the user code class loader.",
              e)
          }
    
        try {
          JobClient.submitJobAndWait(
           clientActorSystem,
           configuration,
           highAvailabilityServices,
           jobGraph,
           timeout,
           printUpdates,
           userCodeClassLoader)
        } finally {
           if(!useSingleActorSystem) {
             // we have to shutdown the just created actor system
             shutdownJobClientActorSystem(clientActorSystem)
           }
         }
      }
    

    通过执行链,可以看出最终还是会通过上文描述过的JobClient.submitJobAndWait(...)方法提交作业,这里不再赘述。JobClient会启动一个Actor System,虽然它不是Flink运行时的一部分,但是它可以断开连接,或者保持连接以接收进度报告。一个整体的Job提交图如下所示:

    上面讲了提交作业的三步,第一和第二步分别是生成流图和作业图,下面我们分别看下流图和作业图 。

    流图

    StreamGraph(流图)是用来表示流的拓补结构的数据结构,它包含了生成JobGraph的必要信息。
    流图是由节点和边组成的,分别对应数据结构StreamNode和StreamEdge。一个StreamGraph可能如下图所示:


    下面我们看下StreamGraph是如何创建的,即getStreamGraph()方法的逻辑。
        @Internal
        public StreamGraph getStreamGraph() {
            if (transformations.size() <= 0) {
                throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
            }
            return StreamGraphGenerator.generate(this, transformations);
        }
    
        public static StreamGraph generate(StreamExecutionEnvironment env, List<StreamTransformation<?>> transformations) {
            return new StreamGraphGenerator(env).generateInternal(transformations);
        }
    
        /**
         * This starts the actual transformation, beginning from the sinks.
         */
        private StreamGraph generateInternal(List<StreamTransformation<?>> transformations) {
            for (StreamTransformation<?> transformation: transformations) {
                transform(transformation);
            }
            return streamGraph;
        }
    
        /**
         * Transforms one {@code StreamTransformation}.
         *
         * <p>This checks whether we already transformed it and exits early in that case. If not it
         * delegates to one of the transformation specific methods.
         */
        private Collection<Integer> transform(StreamTransformation<?> transform) {
    
            if (alreadyTransformed.containsKey(transform)) {
                return alreadyTransformed.get(transform);
            }
    
            LOG.debug("Transforming " + transform);
    
            if (transform.getMaxParallelism() <= 0) {
    
                // if the max parallelism hasn't been set, then first use the job wide max parallelism
                // from theExecutionConfig.
                int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
                if (globalMaxParallelismFromConfig > 0) {
                    transform.setMaxParallelism(globalMaxParallelismFromConfig);
                }
            }
    
            // call at least once to trigger exceptions about MissingTypeInfo
            transform.getOutputType();
    
            Collection<Integer> transformedIds;
            if (transform instanceof OneInputTransformation<?, ?>) {
                transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
            } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
                transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
            } else if (transform instanceof SourceTransformation<?>) {
                transformedIds = transformSource((SourceTransformation<?>) transform);
            } else if (transform instanceof SinkTransformation<?>) {
                transformedIds = transformSink((SinkTransformation<?>) transform);
            } else if (transform instanceof UnionTransformation<?>) {
                transformedIds = transformUnion((UnionTransformation<?>) transform);
            } else if (transform instanceof SplitTransformation<?>) {
                transformedIds = transformSplit((SplitTransformation<?>) transform);
            } else if (transform instanceof SelectTransformation<?>) {
                transformedIds = transformSelect((SelectTransformation<?>) transform);
            } else if (transform instanceof FeedbackTransformation<?>) {
                transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
            } else if (transform instanceof CoFeedbackTransformation<?>) {
                transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
            } else if (transform instanceof PartitionTransformation<?>) {
                transformedIds = transformPartition((PartitionTransformation<?>) transform);
            } else if (transform instanceof SideOutputTransformation<?>) {
                transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
            } else {
                throw new IllegalStateException("Unknown transformation: " + transform);
            }
    
            // need this check because the iterate transformation adds itself before
            // transforming the feedback edges
            if (!alreadyTransformed.containsKey(transform)) {
                alreadyTransformed.put(transform, transformedIds);
            }
    
            if (transform.getBufferTimeout() > 0) {
                streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
            }
            if (transform.getUid() != null) {
                streamGraph.setTransformationUID(transform.getId(), transform.getUid());
            }
            if (transform.getUserProvidedNodeHash() != null) {
                streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
            }
    
            if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
                streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
            }
    
            return transformedIds;
        }
    

    可以看出,核心的逻辑在transform(StreamTransformation<?> transform)方法中,可能大家疑惑StreamTransformation是什么?StreamTransformation是DataStream创建操作的描述信息,每一个DataStream底层都有一个StreamTransformation,它是DataStream的原始信息。通过StreamTransformation就可以构建一副整体的StreamGraph。以OneInputTransformation为例,看下是如何进行transform操作的。

        /**
         * Transforms a {@code OneInputTransformation}.
         *
         * <p>This recursively transforms the inputs, creates a new {@code StreamNode} in the graph and
         * wired the inputs to this new node.
         */
        private <IN, OUT> Collection<Integer> transformOneInputTransform(OneInputTransformation<IN, OUT> transform) {
    
            // 转换当前OneInputTransformation的输入StreamTransformation
            Collection<Integer> inputIds = transform(transform.getInput());
    
            // the recursive call might have already transformed this
            if (alreadyTransformed.containsKey(transform)) {
                return alreadyTransformed.get(transform);
            }
    
            String slotSharingGroup = determineSlotSharingGroup(transform.getSlotSharingGroup(), inputIds);
    
            //添加 StreamGraph 节点
            streamGraph.addOperator(transform.getId(),
                    slotSharingGroup,
                    transform.getOperator(),
                    transform.getInputType(),
                    transform.getOutputType(),
                    transform.getName());
    
            if (transform.getStateKeySelector() != null) {
                TypeSerializer<?> keySerializer = transform.getStateKeyType().createSerializer(env.getConfig());
                streamGraph.setOneInputStateKey(transform.getId(), transform.getStateKeySelector(), keySerializer);
            }
    
            streamGraph.setParallelism(transform.getId(), transform.getParallelism());
            streamGraph.setMaxParallelism(transform.getId(), transform.getMaxParallelism());
    
            // 添加 StreamGraph 边
            for (Integer inputId: inputIds) {
                streamGraph.addEdge(inputId, transform.getId(), 0);
            }
    
            return Collections.singleton(transform.getId());
        }
    

    逻辑很清晰,解析当前OneInputTransformation的输入StreamTransformation,根据OneInputTransformation的operator等信息构建StreamNode,然后根据解析的输入StreamTransformation的Id,构建StreamEdge。
    在创建Stream,以及生成StreamGraph的过程中,涉及到较多的数据结构以及层次关系,以上述的WordCount示例中,通过text.flatMap(new Tokenizer())创建的流为例,具体的数据结构和层次如下图所示:

    作业图

    作业图(JobGraph)是唯一被Flink的数据流引擎所识别的表述作业的数据结构,也正是这一共同的抽象体现了流处理和批处理在运行时的统一。

    相比流图(StreamGraph)以及批处理优化计划(OptimizedPlan),JobGraph发生了一些变化,已经不完全是“静态”的数据结构了,因为它加入了中间数据集(IntermediateDataSet)这一“动态”概念。

    作业顶点(JobVertex)、中间数据集(IntermediateDataSet)、作业边(JobEdge)是组成JobGraph的基本元素。这三个对象彼此之间互为依赖:

    • 一个JobVertex关联着若干个JobEdge作为输入端以及若干个IntermediateDataSet作为其生产的结果集;
    • 一个IntermediateDataSet关联着一个JobVertex作为生产者以及若干个JobEdge作为消费者;
    • 一个JobEdge关联着一个IntermediateDataSet可认为是源以及一个JobVertex可认为是目标消费者;

    因此一个JobGraph可能的如下图所示:


    相关文章

      网友评论

        本文标题:Flink源码解析之Job提交(下)

        本文链接:https://www.haomeiwen.com/subject/kfpjqftx.html