美文网首页
聊聊flink的Execution Plan Visualiza

聊聊flink的Execution Plan Visualiza

作者: go4it | 来源:发表于2019-02-13 14:09 被阅读25次

    本文主要研究一下flink的Execution Plan Visualization

    实例

    代码

        @Test
        public void testExecutionPlan(){
            final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(WORDS)
                    .flatMap(new WordCountTest.Tokenizer())
                    .keyBy(0)
                    .sum(1);
            dataStream.print();
            System.out.println(env.getExecutionPlan());
        }
    

    json

    {
      "nodes": [
        {
          "id": 1,
          "type": "Source: Collection Source",
          "pact": "Data Source",
          "contents": "Source: Collection Source",
          "parallelism": 1
        },
        {
          "id": 2,
          "type": "Flat Map",
          "pact": "Operator",
          "contents": "Flat Map",
          "parallelism": 4,
          "predecessors": [
            {
              "id": 1,
              "ship_strategy": "REBALANCE",
              "side": "second"
            }
          ]
        },
        {
          "id": 4,
          "type": "Keyed Aggregation",
          "pact": "Operator",
          "contents": "Keyed Aggregation",
          "parallelism": 4,
          "predecessors": [
            {
              "id": 2,
              "ship_strategy": "HASH",
              "side": "second"
            }
          ]
        },
        {
          "id": 5,
          "type": "Sink: Print to Std. Out",
          "pact": "Data Sink",
          "contents": "Sink: Print to Std. Out",
          "parallelism": 4,
          "predecessors": [
            {
              "id": 4,
              "ship_strategy": "FORWARD",
              "side": "second"
            }
          ]
        }
      ]
    }
    

    可视化

    打开flink plan visualizer将上面的json,输入到文本框,点击Draw进行可视化如下:

    StreamExecutionEnvironment.getExecutionPlan

    flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java

    @Public
    public abstract class StreamExecutionEnvironment {
        //......
    
        /**
         * Creates the plan with which the system will execute the program, and
         * returns it as a String using a JSON representation of the execution data
         * flow graph. Note that this needs to be called, before the plan is
         * executed.
         *
         * @return The execution plan of the program, as a JSON String.
         */
        public String getExecutionPlan() {
            return getStreamGraph().getStreamingPlanAsJSON();
        }
    
        /**
         * Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
         *
         * @return The streamgraph representing the transformations
         */
        @Internal
        public StreamGraph getStreamGraph() {
            if (transformations.size() <= 0) {
                throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
            }
            return StreamGraphGenerator.generate(this, transformations);
        }
    
        //......
    }
    
    • StreamExecutionEnvironment的getExecutionPlan方法调用了getStreamGraph方法;getStreamGraph方法使用StreamGraphGenerator.generate生成了StreamGraph;之后就是调用StreamGraph.getStreamingPlanAsJSON来获取json格式的execution plan

    StreamGraph.getStreamingPlanAsJSON

    flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraph.java

    @Internal
    public class StreamGraph extends StreamingPlan {
    
        private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
    
        private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;
    
        private final StreamExecutionEnvironment environment;
        private final ExecutionConfig executionConfig;
        private final CheckpointConfig checkpointConfig;
    
        private boolean chaining;
    
        private Map<Integer, StreamNode> streamNodes;
        private Set<Integer> sources;
        private Set<Integer> sinks;
        private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
        private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes;
        private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtualPartitionNodes;
    
        protected Map<Integer, String> vertexIDtoBrokerID;
        protected Map<Integer, Long> vertexIDtoLoopTimeout;
        private StateBackend stateBackend;
        private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
    
        //......
    
        public String getStreamingPlanAsJSON() {
            try {
                return new JSONGenerator(this).getJSON();
            }
            catch (Exception e) {
                throw new RuntimeException("JSON plan creation failed", e);
            }
        }
    
        //......
    }
    
    • StreamGraph的getStreamingPlanAsJSON方法使用JSONGenerator来序列化自己,返回json格式的execution plan

    小结

    • flink提供了flink plan visualizer的在线地址,用于进行execution plan的可视化,它接收json形式的execution plan
    • StreamExecutionEnvironment的getExecutionPlan方法调用了getStreamGraph方法;getStreamGraph方法使用StreamGraphGenerator.generate生成了StreamGraph
    • StreamGraph的getStreamingPlanAsJSON方法使用JSONGenerator来序列化自己,返回json格式的execution plan

    doc

    相关文章

      网友评论

          本文标题:聊聊flink的Execution Plan Visualiza

          本文链接:https://www.haomeiwen.com/subject/fcfdeqtx.html