美文网首页
聊聊flink DataStream的split操作

聊聊flink DataStream的split操作

作者: go4it | 来源:发表于2019-01-14 10:36 被阅读47次

    本文主要研究一下flink DataStream的split操作

    实例

    SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
        @Override
        public Iterable<String> select(Integer value) {
            List<String> output = new ArrayList<String>();
            if (value % 2 == 0) {
                output.add("even");
            }
            else {
                output.add("odd");
            }
            return output;
        }
    });
    
    • 本实例将dataStream split为两个dataStream,一个outputName为even,另一个outputName为odd

    DataStream.split

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/DataStream.java

    @Public
    public class DataStream<T> {
    
        //......
    
        public SplitStream<T> split(OutputSelector<T> outputSelector) {
            return new SplitStream<>(this, clean(outputSelector));
        }
    
        //......
    }
    
    • DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream

    OutputSelector

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/collector/selector/OutputSelector.java

    @PublicEvolving
    public interface OutputSelector<OUT> extends Serializable {
    
        Iterable<String> select(OUT value);
    
    }
    
    • OutputSelector定义了select方法用于给element打上outputNames

    SplitStream

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/datastream/SplitStream.java

    @PublicEvolving
    public class SplitStream<OUT> extends DataStream<OUT> {
    
        protected SplitStream(DataStream<OUT> dataStream, OutputSelector<OUT> outputSelector) {
            super(dataStream.getExecutionEnvironment(), new SplitTransformation<OUT>(dataStream.getTransformation(), outputSelector));
        }
    
        public DataStream<OUT> select(String... outputNames) {
            return selectOutput(outputNames);
        }
    
        private DataStream<OUT> selectOutput(String[] outputNames) {
            for (String outName : outputNames) {
                if (outName == null) {
                    throw new RuntimeException("Selected names must not be null");
                }
            }
    
            SelectTransformation<OUT> selectTransform = new SelectTransformation<OUT>(this.getTransformation(), Lists.newArrayList(outputNames));
            return new DataStream<OUT>(this.getExecutionEnvironment(), selectTransform);
        }
    
    }
    
    • SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream;select方法创建了SelectTransformation

    StreamGraphGenerator

    flink-streaming-java_2.11-1.7.0-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraphGenerator.java

    @Internal
    public class StreamGraphGenerator {
    
        //......
    
        private Collection<Integer> transform(StreamTransformation<?> transform) {
    
            if (alreadyTransformed.containsKey(transform)) {
                return alreadyTransformed.get(transform);
            }
    
            LOG.debug("Transforming " + transform);
    
            if (transform.getMaxParallelism() <= 0) {
    
                // if the max parallelism hasn't been set, then first use the job wide max parallelism
                // from theExecutionConfig.
                int globalMaxParallelismFromConfig = env.getConfig().getMaxParallelism();
                if (globalMaxParallelismFromConfig > 0) {
                    transform.setMaxParallelism(globalMaxParallelismFromConfig);
                }
            }
    
            // call at least once to trigger exceptions about MissingTypeInfo
            transform.getOutputType();
    
            Collection<Integer> transformedIds;
            if (transform instanceof OneInputTransformation<?, ?>) {
                transformedIds = transformOneInputTransform((OneInputTransformation<?, ?>) transform);
            } else if (transform instanceof TwoInputTransformation<?, ?, ?>) {
                transformedIds = transformTwoInputTransform((TwoInputTransformation<?, ?, ?>) transform);
            } else if (transform instanceof SourceTransformation<?>) {
                transformedIds = transformSource((SourceTransformation<?>) transform);
            } else if (transform instanceof SinkTransformation<?>) {
                transformedIds = transformSink((SinkTransformation<?>) transform);
            } else if (transform instanceof UnionTransformation<?>) {
                transformedIds = transformUnion((UnionTransformation<?>) transform);
            } else if (transform instanceof SplitTransformation<?>) {
                transformedIds = transformSplit((SplitTransformation<?>) transform);
            } else if (transform instanceof SelectTransformation<?>) {
                transformedIds = transformSelect((SelectTransformation<?>) transform);
            } else if (transform instanceof FeedbackTransformation<?>) {
                transformedIds = transformFeedback((FeedbackTransformation<?>) transform);
            } else if (transform instanceof CoFeedbackTransformation<?>) {
                transformedIds = transformCoFeedback((CoFeedbackTransformation<?>) transform);
            } else if (transform instanceof PartitionTransformation<?>) {
                transformedIds = transformPartition((PartitionTransformation<?>) transform);
            } else if (transform instanceof SideOutputTransformation<?>) {
                transformedIds = transformSideOutput((SideOutputTransformation<?>) transform);
            } else {
                throw new IllegalStateException("Unknown transformation: " + transform);
            }
    
            // need this check because the iterate transformation adds itself before
            // transforming the feedback edges
            if (!alreadyTransformed.containsKey(transform)) {
                alreadyTransformed.put(transform, transformedIds);
            }
    
            if (transform.getBufferTimeout() >= 0) {
                streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
            }
            if (transform.getUid() != null) {
                streamGraph.setTransformationUID(transform.getId(), transform.getUid());
            }
            if (transform.getUserProvidedNodeHash() != null) {
                streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
            }
    
            if (transform.getMinResources() != null && transform.getPreferredResources() != null) {
                streamGraph.setResources(transform.getId(), transform.getMinResources(), transform.getPreferredResources());
            }
    
            return transformedIds;
        }
    
        private <T> Collection<Integer> transformSelect(SelectTransformation<T> select) {
            StreamTransformation<T> input = select.getInput();
            Collection<Integer> resultIds = transform(input);
    
            // the recursive transform might have already transformed this
            if (alreadyTransformed.containsKey(select)) {
                return alreadyTransformed.get(select);
            }
    
            List<Integer> virtualResultIds = new ArrayList<>();
    
            for (int inputId : resultIds) {
                int virtualId = StreamTransformation.getNewNodeId();
                streamGraph.addVirtualSelectNode(inputId, virtualId, select.getSelectedNames());
                virtualResultIds.add(virtualId);
            }
            return virtualResultIds;
        }
    
        private <T> Collection<Integer> transformSplit(SplitTransformation<T> split) {
    
            StreamTransformation<T> input = split.getInput();
            Collection<Integer> resultIds = transform(input);
    
            // the recursive transform call might have transformed this already
            if (alreadyTransformed.containsKey(split)) {
                return alreadyTransformed.get(split);
            }
    
            for (int inputId : resultIds) {
                streamGraph.addOutputSelector(inputId, split.getOutputSelector());
            }
    
            return resultIds;
        }
    
        //......
    }
    
    • StreamGraphGenerator里头的transform会对SelectTransformation以及SplitTransformation进行相应的处理
    • transformSelect方法会根据select.getSelectedNames()来addVirtualSelectNode
    • transformSplit方法则根据split.getOutputSelector()来addOutputSelector

    小结

    • DataStream的split操作接收OutputSelector参数,然后创建并返回SplitStream
    • OutputSelector定义了select方法用于给element打上outputNames
    • SplitStream继承了DataStream,它定义了select方法,可以用来根据outputNames选择split出来的dataStream

    doc

    相关文章

      网友评论

          本文标题:聊聊flink DataStream的split操作

          本文链接:https://www.haomeiwen.com/subject/rlkddqtx.html