Parallel Execution
https://ci.apache.org/projects/flink/flink-docs-master/dev/parallel.html
配置Parallel
Operator Level
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); DataStream<String> text = [...] DataStream<Tuple2<String, Integer>> wordCounts = text .flatMap(new LineSplitter()) .keyBy(0) .timeWindow(Time.seconds(5)) .sum(1).setParallelism(5); wordCounts.print(); env.execute("Word Count Example");
Execution Environment Level
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); env.setParallelism(3);
Client Level
在提交程序的时候设置
./bin/flink run -p 10 ../examples/WordCount-java.jar
System Level
setting the parallelism.default property in ./conf/flink-conf.yaml
坑
Using the parallelism provided by the remote cluster (16). To use another parallelism, set it at the ./bin/flink client.
这个通常表示,集群能够提供的并行度没有达到用户设置的并行度
在运行命令的时候 -yn 4 -ys 4 决定了程序的并行度。
最大并行度= container个数 * 每个container的槽位
在程序中设置的并行度 parallelism 不能大于 最大并行度
网友评论