美文网首页
Flink DataStream\StreamExecution

Flink DataStream\StreamExecution

作者: kaiker | 来源:发表于2022-03-23 19:34 被阅读0次

    StreamExecutionEnvironment

    https://www.cnblogs.com/wangwei0721/p/14063415.html

    • 是程序执行的环境。
    • 可基于此设置运行的环境。
    • 也可基于该类的一些方法创建出DataStream(DataStreamSource,是DataStream子类的子类)。
    • 会记录经过的所有的transformation,可以通过transformation获得StreamGraph(任务拓扑)。
    • execute方法会将StreamGraph传递给任务执行器。
    public abstract class StreamExecutionEnvironment {
    ...
        // 默认使用processingTime
        private static final TimeCharacteristic DEFAULT_TIME_CHARACTERISTIC = TimeCharacteristic.ProcessingTime;
        // 最大的网络延迟记录数
        private static final long DEFAULT_NETWORK_BUFFER_TIMEOUT = 100L;
        // 执行设置,秉性度、重试次数、默认水位等许多属性
        private final ExecutionConfig config = new ExecutionConfig();
        // checkpoint设置
        private final CheckpointConfig checkpointCfg = new CheckpointConfig();
        // 所有的transformation
        protected final List<Transformation<?>> transformations = new ArrayList<>();
        // 状态后端设置
        private StateBackend defaultStateBackend;
    
        // 创建数据流
        public <OUT> DataStreamSource<OUT> addSource(SourceFunction<OUT> function, String sourceName, TypeInformation<OUT> typeInfo) {
    
            if (function instanceof ResultTypeQueryable) {
                typeInfo = ((ResultTypeQueryable<OUT>) function).getProducedType();
            }
            if (typeInfo == null) {
                try {
                    typeInfo = TypeExtractor.createTypeInfo(
                            SourceFunction.class,
                            function.getClass(), 0, null, null);
                } catch (final InvalidTypesException e) {
                    typeInfo = (TypeInformation<OUT>) new MissingTypeInfo(sourceName, e);
                }
            }
    
            boolean isParallel = function instanceof ParallelSourceFunction;
    
            clean(function);
    
            final StreamSource<OUT, ?> sourceOperator = new StreamSource<>(function);
            return new DataStreamSource<>(this, typeInfo, sourceOperator, isParallel, sourceName);
        }
      }
    
        // 直接从data创建
        public final <OUT> DataStreamSource<OUT> fromElements(Class<OUT> type, OUT... data) {
            if (data.length == 0) {
                throw new IllegalArgumentException("fromElements needs at least one element as argument");
            }
    
            TypeInformation<OUT> typeInfo;
            try {
                typeInfo = TypeExtractor.getForClass(type);
            }
            catch (Exception e) {
                throw new RuntimeException("Could not create TypeInformation for type " + type.getName()
                        + "; please specify the TypeInformation manually via "
                        + "StreamExecutionEnvironment#fromElements(Collection, TypeInformation)", e);
            }
            return fromCollection(Arrays.asList(data), typeInfo);
        }
    
      ...
    
        // 用到了持有的transformations变量,生成StreamGraph(Class representing the streaming topology. 
        // It contains all the information necessary to build the jobgraph for the execution.)
        public StreamGraph getStreamGraph(boolean clearTransformations) {
            final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
            if (clearTransformations) {
                transformations.clear();
            }
            return streamGraph;
        }
    
         // 大部分情况下执行时调用的env.execute()执行的方法
         public JobExecutionResult execute() throws Exception {
            return execute(getStreamGraph());
        }
    

    Transformation

    https://izualzhy.cn/flink-source-transformations

    A Transformation represents the operation that creates a DataStream. Every DataStream has an underlying Transformation that is the origin of said DataStream.
    Transformation代表了创建DataStream的操作,每一个DataStream都持有一个Transformation代表它如何转换出来的。

    • 对所有转换操作的记录
    • 可以设置操作所需的资源
    • 所有的你提供的操作,最终都会被包到Transformation中,addOperator会往StreamExecutionEnvironment里的List<Transformation>做add。因为有这个内容,flink知道你在对流做些什么转化,最后能够生成执行图。
    public abstract class Transformation<T> {
        protected final int id;
        protected String name;
        protected String description;
        protected TypeInformation<T> outputType;
        ... 
    }
    

    SourceTransformation

    Transformation种类很多,这里先用StreamExecutionEnvironment里的fromElements()做下探索

    fromElements过程中的Transformation
    对LegacySourceTransformation的描述:
    This represents a Source. This does not actually transform anything since it has no inputs but it
    is the root
    

    完成创建后,真正执行的时候需用到这些Transformation。

    image.png
    final StreamGraph streamGraph = getStreamGraphGenerator(transformations).generate();
    
    public StreamGraph generate() {
        ...
            for (Transformation<?> transformation : transformations) {
                transform(transformation);
            }
        ...
            for (StreamNode node : streamGraph.getStreamNodes()) {
                if (node.getInEdges().stream().anyMatch(this::shouldDisableUnalignedCheckpointing)) {
                    for (StreamEdge edge : node.getInEdges()) {
                        edge.setSupportsUnalignedCheckpoints(false);
                    }
                }
            }
    
        ...
    
            return builtStreamGraph;
        }
    

    OneInputTansformation

    最常见的DataStream转换操作,map操作的大致流程。


    map

    另外关于StreamGraph,可参考https://izualzhy.cn/flink-source-stream-graph

    DataStream

    • DataStream里有StreamExecutionEnvironment和Transformation<T>两个类型的成员变量。
    • 上面也对这两个类型进行了介绍,DataStream里的API包括了各种对DataStream的操作,比如map、flatMap、filter、keyBy。另外它持有的这两个变量本身是不持有数据的,可以理解flink只是在不断地转换和计算。
    public class DataStream<T> {
       protected final StreamExecutionEnvironment environment;
       protected final Transformation<T> transformation;
       ...
    }
    

    以map操作为例,

    • 需要输入一个MapFunction的实现,输入类型T的数据,返回类型O的数据。
    • 这个function会被提供给一个Operator(StreamMap<>)
    • Operator会定义如何使用这个function
    • transform-doTransfrom,如上一部分图所示,又将operator放进了transformation中,这样执行的时候就知道要做什么操作了。
        public <R> SingleOutputStreamOperator<R> map(
                MapFunction<T, R> mapper, TypeInformation<R> outputType) {
            return transform("Map", outputType, new StreamMap<>(clean(mapper)));
        }
    
      public interface MapFunction<T, O> extends Function, Serializable {
        O map(T value) throws Exception;
      }
    
    public class StreamMap<IN, OUT> extends AbstractUdfStreamOperator<OUT, MapFunction<IN, OUT>>
            implements OneInputStreamOperator<IN, OUT> {
    
        private static final long serialVersionUID = 1L;
    
        public StreamMap(MapFunction<IN, OUT> mapper) {
            super(mapper); // userFunction来自这里,就是提供的mapfunction
            chainingStrategy = ChainingStrategy.ALWAYS;
        }
    
        @Override
        public void processElement(StreamRecord<IN> element) throws Exception {
            output.collect(element.replace(userFunction.map(element.getValue())));
        }
    }
    
        protected <R> SingleOutputStreamOperator<R> doTransform(
                String operatorName,
                TypeInformation<R> outTypeInfo,
                StreamOperatorFactory<R> operatorFactory) {
    
            // read the output type of the input Transform to coax out errors about MissingTypeInfo
            transformation.getOutputType();
    
            OneInputTransformation<T, R> resultTransform =
                    new OneInputTransformation<>(
                            this.transformation,
                            operatorName,
                            operatorFactory,
                            outTypeInfo,
                            environment.getParallelism());
    
            @SuppressWarnings({"unchecked", "rawtypes"})
            SingleOutputStreamOperator<R> returnStream =
                    new SingleOutputStreamOperator(environment, resultTransform);
    
            getExecutionEnvironment().addOperator(resultTransform);
    
            return returnStream;
        }
    

    DataStream方法都是这样一个逻辑(map\flatmap\process\filter\print),其实keyBy、join等操作看似复杂,最后调用apply还是会包成transformation

    • 提供function
    • Operator使用function展开数据处理
    • transform-doTransform,最终转换成transformation,flink因此能够知道该如何展开处理。

    相关文章

      网友评论

          本文标题:Flink DataStream\StreamExecution

          本文链接:https://www.haomeiwen.com/subject/crecjrtx.html