序
本文主要研究一下flink的Execution Plan Visualization
实例
代码
@Test
public void testExecutionPlan(){
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple2<String,Integer>> dataStream = env.fromElements(WORDS)
.flatMap(new WordCountTest.Tokenizer())
.keyBy(0)
.sum(1);
dataStream.print();
System.out.println(env.getExecutionPlan());
}
json
{
"nodes": [
{
"id": 1,
"type": "Source: Collection Source",
"pact": "Data Source",
"contents": "Source: Collection Source",
"parallelism": 1
},
{
"id": 2,
"type": "Flat Map",
"pact": "Operator",
"contents": "Flat Map",
"parallelism": 4,
"predecessors": [
{
"id": 1,
"ship_strategy": "REBALANCE",
"side": "second"
}
]
},
{
"id": 4,
"type": "Keyed Aggregation",
"pact": "Operator",
"contents": "Keyed Aggregation",
"parallelism": 4,
"predecessors": [
{
"id": 2,
"ship_strategy": "HASH",
"side": "second"
}
]
},
{
"id": 5,
"type": "Sink: Print to Std. Out",
"pact": "Data Sink",
"contents": "Sink: Print to Std. Out",
"parallelism": 4,
"predecessors": [
{
"id": 4,
"ship_strategy": "FORWARD",
"side": "second"
}
]
}
]
}
可视化
打开flink plan visualizer将上面的json,输入到文本框,点击Draw进行可视化如下:
StreamExecutionEnvironment.getExecutionPlan
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/environment/StreamExecutionEnvironment.java
@Public
public abstract class StreamExecutionEnvironment {
//......
/**
* Creates the plan with which the system will execute the program, and
* returns it as a String using a JSON representation of the execution data
* flow graph. Note that this needs to be called, before the plan is
* executed.
*
* @return The execution plan of the program, as a JSON String.
*/
public String getExecutionPlan() {
return getStreamGraph().getStreamingPlanAsJSON();
}
/**
* Getter of the {@link org.apache.flink.streaming.api.graph.StreamGraph} of the streaming job.
*
* @return The streamgraph representing the transformations
*/
@Internal
public StreamGraph getStreamGraph() {
if (transformations.size() <= 0) {
throw new IllegalStateException("No operators defined in streaming topology. Cannot execute.");
}
return StreamGraphGenerator.generate(this, transformations);
}
//......
}
- StreamExecutionEnvironment的getExecutionPlan方法调用了getStreamGraph方法;getStreamGraph方法使用StreamGraphGenerator.generate生成了StreamGraph;之后就是调用StreamGraph.getStreamingPlanAsJSON来获取json格式的execution plan
StreamGraph.getStreamingPlanAsJSON
flink-streaming-java_2.11-1.7.1-sources.jar!/org/apache/flink/streaming/api/graph/StreamGraph.java
@Internal
public class StreamGraph extends StreamingPlan {
private static final Logger LOG = LoggerFactory.getLogger(StreamGraph.class);
private String jobName = StreamExecutionEnvironment.DEFAULT_JOB_NAME;
private final StreamExecutionEnvironment environment;
private final ExecutionConfig executionConfig;
private final CheckpointConfig checkpointConfig;
private boolean chaining;
private Map<Integer, StreamNode> streamNodes;
private Set<Integer> sources;
private Set<Integer> sinks;
private Map<Integer, Tuple2<Integer, List<String>>> virtualSelectNodes;
private Map<Integer, Tuple2<Integer, OutputTag>> virtualSideOutputNodes;
private Map<Integer, Tuple2<Integer, StreamPartitioner<?>>> virtualPartitionNodes;
protected Map<Integer, String> vertexIDtoBrokerID;
protected Map<Integer, Long> vertexIDtoLoopTimeout;
private StateBackend stateBackend;
private Set<Tuple2<StreamNode, StreamNode>> iterationSourceSinkPairs;
//......
public String getStreamingPlanAsJSON() {
try {
return new JSONGenerator(this).getJSON();
}
catch (Exception e) {
throw new RuntimeException("JSON plan creation failed", e);
}
}
//......
}
- StreamGraph的getStreamingPlanAsJSON方法使用JSONGenerator来序列化自己,返回json格式的execution plan
小结
- flink提供了flink plan visualizer的在线地址,用于进行execution plan的可视化,它接收json形式的execution plan
- StreamExecutionEnvironment的getExecutionPlan方法调用了getStreamGraph方法;getStreamGraph方法使用StreamGraphGenerator.generate生成了StreamGraph
- StreamGraph的getStreamingPlanAsJSON方法使用JSONGenerator来序列化自己,返回json格式的execution plan
网友评论