美文网首页
01-Flink代码生成StreamGraph

01-Flink代码生成StreamGraph

作者: hoose | 来源:发表于2024-07-15 15:33 被阅读0次

通常,我们写一段flink stream api代码类型如下:

 DataStreamSource<String> kafkaSource =
                env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
        //.setParallelism(3);

        SingleOutputStreamOperator<CanalMessage> data =
                kafkaSource.flatMap(new ProcessDdlFunction(dorisMap))
                        .uid("ddl");
        SingleOutputStreamOperator<Map<String, String>> convertMap =
                data.map(new ProcessBinlogFunction())
                        .uid("binlog");
...
env.execute(topics);

熟悉flink的人都知道,最后这行代码env.execute(topics);实际上是把我们的代码提交到环境上执行,可以是local/yarn/k8s,它是触发任务执行的开始,那么代码里调用了map ,flatmap这些函数是做什么的呢,让我们一起分析,举例我们点击这个map方法,发现是调用了DataStream这个类里的map方法:

public <R> SingleOutputStreamOperator<R> map(
            MapFunction<T, R> mapper, TypeInformation<R> outputType) {
        return transform("Map", outputType, new StreamMap<>(clean(mapper)));
    }

后面最终调用了如下这个方法:这里有一行核心的代码getExecutionEnvironment().addOperator(resultTransform);,一看就知道是把我们写的代码逻辑,其实就是transformation算子放在一个存储所有transfromations的集合中

protected <R> SingleOutputStreamOperator<R> doTransform(
            String operatorName,
            TypeInformation<R> outTypeInfo,
            StreamOperatorFactory<R> operatorFactory) {

        // read the output type of the input Transform to coax out errors about MissingTypeInfo
        transformation.getOutputType();

        OneInputTransformation<T, R> resultTransform =
                new OneInputTransformation<>(
                        this.transformation,
                        operatorName,
                        operatorFactory,
                        outTypeInfo,
                        environment.getParallelism());

        @SuppressWarnings({"unchecked", "rawtypes"})
        SingleOutputStreamOperator<R> returnStream =
                new SingleOutputStreamOperator(environment, resultTransform);

//把代码逻辑算子放入里面
        getExecutionEnvironment().addOperator(resultTransform);

        return returnStream;
    }

同样,看上面的实例,我们把flatmap的算子也放在集合里,那么重点来了,就是提交任务env.execute(topics)
下面的transformations 就是StreamExecutionEnvironment里的属性变量,也就是上面我们讲的存储transfromation集合

public JobExecutionResult execute(String jobName) throws Exception {
        final List<Transformation<?>> originalTransformations = new ArrayList<>(transformations);
        StreamGraph streamGraph = getStreamGraph();
        if (jobName != null) {
            streamGraph.setJobName(jobName);
        }

        try {
            return execute(streamGraph);
        } catch (Throwable t) {
            Optional<ClusterDatasetCorruptedException> clusterDatasetCorruptedException =
                    ExceptionUtils.findThrowable(t, ClusterDatasetCorruptedException.class);
            if (!clusterDatasetCorruptedException.isPresent()) {
                throw t;
            }

            // Retry without cache if it is caused by corrupted cluster dataset.
            invalidateCacheTransformations(originalTransformations);
            streamGraph = getStreamGraph(originalTransformations);
            return execute(streamGraph);
        }
    }

相关文章

网友评论

      本文标题:01-Flink代码生成StreamGraph

      本文链接:https://www.haomeiwen.com/subject/rnehhjtx.html