前言
Flink从1.12.0
上对流式API
新增一项特性:可以根据你的使用情况和Job的特点, 可以选择不同的运行时执行模式(runtime execution modes).
流式API的传统执行模式我们称之为STREAMING
执行模式, 这种模式一般用于无界流, 需要持续的在线处理
1.12.0新增了一个BATCH
执行模式, 这种执行模式在执行方式上类似于MapReduce框架. 这种执行模式一般用于有界数据.
默认是使用的STREAMING
执行模式
选择执行模式
BATCH
执行模式仅仅用于有界数据
, 而STREAMING
执行模式可以用在有界数据和无界数据
.
一个公用的规则就是: 当你处理的数据是有界
的就应该使用BATCH执行模式
, 因为它更加高效. 当你的数据是无界
的, 则必须使用STREAMING 执行模式
, 因为只有这种模式才能处理持续的数据流.
配置BATH执行模式
执行模式有3个选择可配:
- STREAMING(默认):有界数据和无界数据
- BATCH:有界数据
- AUTOMATIC:
传统的方式:
- 批处理:
// 获取环境
ExecutionEnvironment executionEnvironment = ExecutionEnvironment.getExecutionEnvironment();
// 读取资源
DataSource<String> dataSource = executionEnvironment.readTextFile("文件地址");
- 有界流
StreamExecutionEnvironment env= StreamExecutionEnvironment.getExecutionEnvironment();
// 读取资源
DataStreamSource<String> streamSource = env.readTextFile("文件地址");
env.execute();
- 无界流
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 监听端口
DataStreamSource<String> source = env.socketTextStream("hadoop102", 9999)
env.execute();
批处理与流处理的区别:
- 批处理处理数据,是一批一批对数据处理,spark就是一个微批数据处理引擎,可以理解成先对数据积压,然后达到一定量再一块处理。
- 流处理,有数据就处理,不需要积压数据
- 批处理无需保留数据状态,处理完就输出。
- 流处理需要保留数据状态,因为也有可能还有该数据。
- 批处理完成,程序就停止。
- 流处理,需要一直等待,即使后面不会有数据产生,程序依然保存运行状态。
有界与无界的理解:
有界流与无界流的区别在于读取的数据是否有尽头,若读取的数据类似于文件(知道开始的位置,结束的位置),无界流就是知道开始但不知道什么时候结束,如网络,Kafka,需要不同的监听着,等待处理数据。
案例(wordcount)
流式处理
程序比较简单,就没加注释
@Test
public void wordCount1() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<String> source = env.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");
SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out)
-> Arrays.stream(value.split(" "))
.forEach(s -> out.collect(Tuple2.of(s, 1)))).returns(Types.TUPLE(Types.STRING, Types.INT));
SingleOutputStreamOperator<Tuple2<String, Integer>> result = flatMap.keyBy(e -> e.f0).sum(1);
result.print();
env.execute();
}
结果
5> (python,1)
12> (word,1)
3> (java,1)
13> (xml,1)
1> (pon,1)
11> (log,1)
7> (txt,1)
1> (pon,2)
11> (exe,1)
3> (java,2)
11> (log,2)
5> (python,2)
5> (hello,1)
5> (python,3)
5> (hello,2)
3> (java,3)
13> (xml,2)
14> (count,1)
11> (log,3)
13> (xml,3)
14> (batch,1)
批处理
@Test
public void wordCount2() throws Exception {
ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment();
// 和流式处理,是两套完全不同的api
DataSource<String> source = env.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");
FlatMapOperator<String, Tuple2<String, Integer>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out)
-> Arrays.stream(value.split(" "))
.forEach(s -> out.collect(Tuple2.of(s, 1)))).returns(Types.TUPLE(Types.STRING, Types.INT));
AggregateOperator<Tuple2<String, Integer>> result = flatMap.groupBy(0).sum(1);
result.print();
}
结果
(pon,2)
(hello,2)
(log,3)
(xml,3)
(exe,1)
(java,3)
(python,3)
(txt,1)
(batch,1)
(count,1)
(word,1)
设置执行模式
传统上的批处理和流处理,需要两套不同的API来处理,不太符合Flink中
流批一体
的理念,此时执行模式的出现完美的解决了问题。只需要指定一个执行模式
,就可以完成流与批
之间的相互转换,其他代码都不用修改。
执行模式所支持的模式:
@PublicEvolving
public enum RuntimeExecutionMode {
/**
* The Pipeline will be executed with Streaming Semantics. All tasks will be deployed before
* execution starts, checkpoints will be enabled, and both processing and event time will be
* fully supported.
*/
STREAMING,
/**
* The Pipeline will be executed with Batch Semantics. Tasks will be scheduled gradually based
* on the scheduling region they belong, shuffles between regions will be blocking, watermarks
* are assumed to be "perfect" i.e. no late data, and processing time is assumed to not advance
* during execution.
*/
BATCH,
/**
* Flink will set the execution mode to {@link RuntimeExecutionMode#BATCH} if all sources are
* bounded, or {@link RuntimeExecutionMode#STREAMING} if there is at least one source which is
* unbounded.
*/
AUTOMATIC
}
转换成批处理
@Test
public void wordCount1() throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 转成批处理,其他都不用改
env.setRuntimeMode(RuntimeExecutionMode.BATCH);
DataStreamSource<String> source = env.readTextFile("D:\\project\\idea\\flink\\input\\wordcount.txt");
SingleOutputStreamOperator<Tuple2<String, Integer>> flatMap = source.flatMap((FlatMapFunction<String, Tuple2<String, Integer>>) (value, out)
-> Arrays.stream(value.split(" "))
.forEach(s -> out.collect(Tuple2.of(s, 1)))).returns(Types.TUPLE(Types.STRING, Types.INT));
SingleOutputStreamOperator<Tuple2<String, Integer>> result = flatMap.keyBy(e -> e.f0).sum(1);
result.print();
env.execute();
}
结果
1> (pon,2)
5> (hello,2)
5> (python,3)
3> (java,3)
7> (txt,1)
14> (batch,1)
14> (count,1)
13> (xml,3)
11> (exe,1)
11> (log,3)
12> (word,1)
注意:
- 在13版本之前不要使用
执行模式
,若数据只有一个(如: (txt,1)),那么该数据不会被输出,13版本修复了该问题。 - 批处理不会存状态(处理完就直接输出了,所以没有必要保留状态)
网友评论