Flink 开发一个简单的应用程序只需要构建环境、构建数据源、构建数据处理方案、构建数据输出及执行程序这五个步骤,但每个步骤都有对应其他强大的API,所以本文一一举例学习。
构建环境
流处理
StreamExecutionEnvironment env = null;
// 构建流环境,如果在本地则创建本地环境,如果是集群,则创建集群环境
env = StreamExecutionEnvironment.getExecutionEnvironment();
// 创建本地执行环境并设置并行数
env = StreamExecutionEnvironment.createLocalEnvironment(3);
// 创建远程执行环境,jobmanager的IP,端口,并行度,运行程序的位置
env = StreamExecutionEnvironment.createRemoteEnvironment("10.xxx.xx.103",6123,5,"D:/test/abc.jar");
第三种方式可以直接从本地代码中构建与远程集群的Flink JobManager 的RPC连接,通过指定应用程序所在的jar包,将运行程序远程拷贝到 JobManager 节点上,然后将Flink 应用运行在远程的环境中,本地程序相当于一个客户端。
批处理
ExecutionEnvironment env = null;
// 构建流环境,如果在本地则创建本地环境,如果是集群,则创建集群环境
env = ExecutionEnvironment.getExecutionEnvironment();
// 创建本地执行环境并设置并行数
env = ExecutionEnvironment.createLocalEnvironment(3);
// 创建远程执行环境,jobmanager的IP,端口,并行度,运行程序的位置
env = ExecutionEnvironment.createRemoteEnvironment("10.xxx.xx.103",6123,5,"D:/test/abc.jar");
构建数据源
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// 从集合中读取数据
DataStream<Integer> ds = env.fromCollection(Arrays.asList(1,2,3,4,5,6));
// 直接读取数据
DataStream<Integer> ds1 = env.fromElements(1,2,3,4,5);
// 从文件读取
DataStream<String> ds2 = env.readTextFile("D:\\workspace\\spring\\middleware\\flink\\flink-test\\src\\main\\resources\\hello.txt");
// 从 kafka 读取
Properties properties = new Properties();
properties.setProperty("bootstrap.servers", "10.240.30.104:9092");
properties.setProperty("group.id", "test");
properties.setProperty("enable.auto.commit", "true");
properties.setProperty("auto.commit.interval.ms", "0");
FlinkKafkaConsumer<String> myConsumer = new FlinkKafkaConsumer<String>("topic1", new SimpleStringSchema(),properties);
DataStream<String> ds3 = env.addSource(myConsumer);
// 自定义数据源
DataStream<String> ds4 = env.addSource(new CustomizeSource());
自定义数据源
package com.example.demo;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
/**
* @author big uncle
* @date 2021/6/3 13:54
* @module
**/
public class CustomizeSource implements SourceFunction<String> {
private boolean running = true;
/**
* 读取数据
**/
@Override
public void run(SourceContext<String> ctx) throws Exception {
String str = "a-b-c:";
int i =0;
while (running){
i++;
ctx.collect(str+i);
Thread.sleep(1000);
}
}
/**
* 关闭
**/
@Override
public void cancel() {
running = false;
}
}
Transform
map fliter flatmap
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.fromElements("a,b,c,d,e,1,2,3,4");
// 读取值是 "a,b,c,d,e,1,2,3,4" 转变为 一个个的元素
dataStream.flatMap(new FlatMapFunction<String, Object>() {
@Override
public void flatMap(String s, Collector<Object> collector) throws Exception {
String[] str = s.split(",");
for(String ss : str) {
collector.collect(ss);
}
}
})
// 为 true 才会输出,过滤掉不是 a 的值
.filter(i -> i.equals("a"))
// 转黄给 a 添加 字符串 0
.map(i -> i+"0")
// 输出结果为 a0
.print();
env.execute("a");
}
key
所有的聚合操作只能在 key 分组(分区)之后。sum()、min()、max()、minBy()、maxBy()、reduce() 比较器
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
dataStream.keyBy(new KeySelector<Integer, Object>() {
@Override
// 把所有数据放到一个分区 0
public Object getKey(Integer integer) throws Exception {
return 0;
}
})
// 对 分区进行聚合,聚合下标为 0,注意这里是下标,最后输出数据139,会显示累加值
.sum(0).print();
env.execute("test");
}
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
// keyBy 可以根据位置,也可以根据字段
dataStream.keyBy(new KeySelector<Integer, Object>() {
@Override
// 把所有数据放到一个分区 0
public Object getKey(Integer integer) throws Exception {
return 0;
}
})
// 注意这里是下标,也可以是字段名
.min(0).print();
env.execute("test");
}
min()、minBy() ,针对元祖和对象而言,min 会对比某个字段的大小,但不会更新其他字段值,minBy() 会更新其他字段值。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
// keyBy 可以根据位置,也可以根据字段
dataStream.keyBy(new KeySelector<Integer, Object>() {
@Override
// 把所有数据放到一个分区 0
public Object getKey(Integer integer) throws Exception {
return 0;
}
})
.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer t1, Integer t2) throws Exception {
return t2 - t1 > 0 ? t2 : t1;
}
}).print();
env.execute("test");
}
reduce 可以比较两个值,更自由一些
split 和 Select (OutputTag)Connect和CoMap以及Union
从代码中可以看出帮我们收集了所有数据,我们可以在做其他操作,多用于做数据标签。
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
final OutputTag<String> outputTag = new OutputTag<String>("side-output"){};
SingleOutputStreamOperator<Object> stream = dataStream.process(new ProcessFunction<Integer, Object>() {
@Override
public void processElement(Integer value, Context ctx, Collector<Object> out) throws Exception {
if(value > 10) {
out.collect(value);
}
ctx.output(outputTag,"其他类型的流"+value.toString());
}
});
stream.print("原始数据进行过滤后:");
DataStream<String> sideOutputStream = stream.getSideOutput(outputTag);
sideOutputStream.print("标签处理:");
// 合流
sideOutputStream.connect(stream).flatMap(new CoFlatMapFunction<String,Object,Integer>() {
// 第一个流如何处理
@Override
public void flatMap1(String value, Collector out) throws Exception {
String str = value.toString();
str = str.replaceAll("其他类型的流","");
out.collect(Integer.valueOf(str));
}
// 第二流如何处理
@Override
public void flatMap2(Object value, Collector out) throws Exception {
out.collect(Integer.valueOf(value.toString()));
}
}).keyBy(new KeySelector<Integer, Object>() {
@Override
public Object getKey(Integer value) throws Exception {
return 0;
}
}).sum(0).print("合并后输出:");
env.execute("test");
}
标签处理::1> 其他类型的流3
标签处理::2> 其他类型的流7
标签处理::3> 其他类型的流1
标签处理::7> 其他类型的流10
合并后输出::6> 1
合并后输出::6> 8
合并后输出::6> 11
合并后输出::6> 21
原始数据进行过滤后::8> 27
原始数据进行过滤后::6> 19
标签处理::8> 其他类型的流27
原始数据进行过滤后::5> 22
原始数据进行过滤后::4> 50
标签处理::5> 其他类型的流22
标签处理::6> 其他类型的流19
标签处理::4> 其他类型的流50
合并后输出::6> 48
合并后输出::6> 75
合并后输出::6> 97
合并后输出::6> 119
合并后输出::6> 138
合并后输出::6> 157
合并后输出::6> 207
合并后输出::6> 257
该方法只能在以下处理时使用。
- ProcessFunction
- KeyedProcessFunction
- CoProcessFunction
- KeyedCoProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
flink 1.12 没有 Split 和 Select,换成了 OutputTag
Union 要求多条流合并,必须是相同的数据类型,比较简单,这里就不说了。
分区
keyBy 默认是按照 hash 分区,除了KeyBy 还有其他的分区方式。
- broadcast() 广播,给下游分区全部广播同一份数据
- shuffle() 随机把当前数据发配个下游分区
- foeward() 只放在当前分区做计算
- rebalance() 轮询给下游分区发数据
- rescale() 给相同分组的下游分区轮询发数据
- global() 丢给下游分区的第一个实例
- partitionCustom() 用户自定义重分区方式
Sink 输出
kafka
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
dataStream.addSink(new FlinkKafkaProducer("topic-test",new SimpleStringSchema(),new Properties()));
env.execute("test");
}
自定义
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
dataStream.addSink(new MySink()).setParallelism(1);
env.execute("test");
}
public static class MySink extends RichSinkFunction<Integer> {
// 不建议 在open 和 clos 写类似数据库连接 把握不好..死翘翘,但可以在 open 和 clous 检查连接状态,或有链接池 获取也行
@Override
public void open(Configuration parameters) throws Exception {
System.out.println("open 有多少数据呗调用多少次");
}
@Override
public void close() throws Exception {
System.out.println("close 有多少数据呗调用多少次");
}
@Override
public void invoke(Integer value, Context context) throws Exception {
// 输出数据 可以用来写 到 redis kafka hdfs 等
System.out.println(value);
}
}
想要直到 flink 支持更多的 source 或 sink 可以去 maven库里 搜索 flink connector
window
window 就是将无限流切割为有限流的一种方式,它会将流数据分发到有限大小的桶(bucket)中进行分析。window 不是等有数据才去做窗口,而是有一个桶,等数据,桶可以存在多个。只要桶不关闭,哪怕晚迟到的数据,依然可以被分配到对应的桶里。
基本类型
时间窗口(Time Window)
按时间段做为窗口,如8:01~8:02 为一个窗口
时间窗口分为:滚动时间窗口(TumBlingWindows)、滑动时间窗口(SlidingWindows)、会话窗口(Session Windows)
-
滚动时间窗口:时间是对齐的,如8:00~9:00 下一个窗口就是 9:00~10:00,数据不会重叠(不重复数据),如果数据刚好在整点,按照 [ ) 左开右闭,会属于新时间。
-
滑动时间窗口:窗口长度固定,可以有重叠(重复数据)
-
会话窗口:一定时间没有收到数据会话结束,有新的数据来,开启新的会话。session gap 为最小的间隔。
计数窗口(Count Window)
按多少各数做为一个窗口,如10个数 为一个窗口。
计数窗口分为:滚动计数窗口、滑动计数窗口
代码演示
如果想要调用时间窗口,必须在 keyBy 之后,但也可以直接使用 windowALL() 方法,该方法把所有数据放到一个分区,并行度为一。
public static void main(String[] args) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Integer> dataStream = env.fromElements(10,27,3,7,1,50,22,19);
// 滚动窗口,可以看这个方法的实现类
dataStream.windowAll(TumblingProcessingTimeWindows.of(Time.seconds(15)));
// session 窗口
dataStream.keyBy(0).window(EventTimeSessionWindows.withGap(Time.minutes(1)));
// 滚动窗口
dataStream.keyBy(0).timeWindow(Time.seconds(15));
// 滑动窗口
dataStream.keyBy(0).timeWindow(Time.seconds(20),Time.seconds(5));
// 滑动窗口
dataStream.keyBy(0).window(SlidingProcessingTimeWindows.of(Time.seconds(20),Time.seconds(5)));
// 滚动计数窗口
dataStream.keyBy(0).countWindow(10);
// 滑动计数窗口
dataStream.keyBy(0).countWindow(10,3);
}
窗口函数
增量聚合函数
每条数据到来进行计算,保持一个简单的状态。一定会等到时间点(窗口结束点)到才会输出结果。这种窗口可以做一些max、min等比较,累计的函数
- ResuceFunction
- AggregateFunction
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.socketTextStream("192.168.200.58",7777);
// 滑动窗口
dataStream.map(i -> Integer.valueOf(i)).keyBy(new KeySelector<Integer, Object>() {
@Override
public Object getKey(Integer value) throws Exception {
return 0;
}
}).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
// .reduce(new ReduceFunction<Integer>() {
// // 比较两个数大小等 可以实现 max min sum 等操作
// @Override
// public Integer reduce(Integer value1, Integer value2) throws Exception {
// return null;
// }
// });
// 实现一个计数器,统计有多少个数据
.aggregate(new AggregateFunction<Integer, Integer, Integer>() {
// 创建一个累加器,初始值
@Override
public Integer createAccumulator() {
return 0;
}
// 累加规则
@Override
public Integer add(Integer value, Integer accumulator) {
return accumulator + 1;
}
// 输出结果
@Override
public Integer getResult(Integer accumulator) {
return accumulator;
}
// merge一般使用的是 session window 做合并操作
@Override
public Integer merge(Integer a, Integer b) {
return null;
}
}).print();
env.execute("test");
}
全窗口函数
先把窗口所有数据收集起来,等到计算的时候会便利所有数据。这种窗口可以做一些中位数或平均数等操作。
- ProcessWindowFunction
- WindowFunction
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> dataStream = env.socketTextStream("192.168.200.58",7777);
// 滑动窗口
dataStream.map(i -> Integer.valueOf(i)).keyBy(new KeySelector<Integer, Object>() {
@Override
public Object getKey(Integer value) throws Exception {
return 0;
}
}).window(TumblingProcessingTimeWindows.of(Time.seconds(5))) // 攒齐5s 的数据,求平均值
// 参数1 输入类型,2 输出类型,3 key的类型,4 就是个TimeWindow
.apply(new WindowFunction<Integer, Integer, Object, TimeWindow>() {
// 参数1 key的类型,2 就是window,3 所有输入的数据,4 输出收集器
@Override
public void apply(Object o, TimeWindow window, Iterable<Integer> input, Collector<Integer> out) throws Exception {
List<Integer> list = IteratorUtils.toList(input.iterator());
int count = list.size();
int sum = list.stream().reduce((i,y) -> i+y).get();
out.collect(sum/count);
}
}).print();
env.execute("test");
}
把 apply 换成 process 是一样的。
.process(new ProcessWindowFunction<Integer, Integer, Object, TimeWindow>() {
// context 会拿到的信息比 window 多,包含了 window
@Override
public void process(Object o, Context context, Iterable<Integer> elements, Collector<Integer> out) throws Exception {
List<Integer> list = IteratorUtils.toList(elements.iterator());
int count = list.size();
int sum = list.stream().reduce((i,y) -> i+y).get();
out.collect(sum/count);
}
}).print();
在滑动时间窗口,或滑动计数中,如下:
我们会想滑动计数,应该是10个数,来2个滑动一次,但实际流操作中,当你输入2个数,就会输出一次,如下:
假设我们的程序是算平均值,第一次滑动是 (27+10)/2 第二次滑动则是 (27+10+3+7)/4 依此类推。这种方式是等补齐滑动窗口的长度,才会全额计算,就是10个一算。
其他可选API
- trigger() 触发器:定义windows什么时候关闭,触发计算并输出结果,分两步的原因是,考虑数据会迟到,迟到的数据到windows关闭的时间,可以先输出结果,在等等认为会迟到的数据,然后关闭。
- evictor 移除器:定义某些数据的逻辑。
- allowedLateness() 允许处理迟到的数据,5秒后输出结果,等1分钟才关闭窗口,但1分钟内该窗口会实时更新数据,不断输出新逻辑数据。
window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.minutes(1));
- sideOutputLateData() 将迟到的数据放入侧输出流
- getSideOutput() 获取侧输出流
// 标记输出流(侧输出流)
OutputTag<Integer> outputTag = new OutputTag<Integer>("late"){};
SingleOutputStreamOperator<Integer> ds = dataStream.map(i -> Integer.valueOf(i)).keyBy(new KeySelector<Integer, Object>() {
@Override
public Object getKey(Integer value) throws Exception {
return 0;
}
}).window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
.allowedLateness(Time.minutes(1))
// 如果1分钟后还有迟到数据,就放到一个侧输出流里,给一个标记,也可以不搭配 allowedLateness() 使用
.sideOutputLateData(outputTag)
.sum(0);
ds.getSideOutput(outputTag).print();
这种等的方式,迟到数据底属于 1窗口 还是属于 2窗口呢?那就需要和数据本身的时间作比较了,所以以上只是API的调用,但不建议这样使用,针对迟到数据我们一定会用迟到数据的本身时间,来决定他在哪个窗口。
ProcessFunction API
ProcessFunction 可以访问时间戳、watermark以及注册定时事件。还可以输出特定的一些事件,例如超时事件等。ProcessFunction用来构建事件驱动的应用以及实现自定义的业务逻辑(使用之前的window函数和转换算子、Richfunction都无法实现)。例如,flink sql 就是使用ProcessFunction 实现的。
flink 提供了8个 ProcessFunction
- ProcessFunction
- KeyedProcessFunction 分组之后得到KeyedStream,调用该 ProcessFunction
- CoProcessFunction
- ProcessJooinFunction
- BroadcastProcessFunction
- KeyedBroadcastProcessFunction
- ProcessWindowFunction
- ProcessAllWindowFunction
KeyedProcessFunction
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStream<String> dataStream = env.socketTextStream("192.168.200.58", 7777);
// 数据转换
DataStream<EventData> stream = dataStream.map(new MapFunction<String, EventData>() {
@Override
public EventData map(String value) throws Exception {
String[] strs = value.split(",");
return new EventData(
strs[0],
Long.valueOf(strs[1]),
String.valueOf((Long.valueOf(strs[1])-5)),
Integer.valueOf(strs[3])
);
}
});
stream.keyBy(new KeySelector<EventData, String>() {
@Override
public String getKey(EventData value) throws Exception {
return value.getId();
}
}).process(new MyProcess()).print();
env.execute("test");
}
public static class MyProcess extends KeyedProcessFunction<String,EventData,Integer> {
@Override
public void processElement(EventData value, Context ctx, Collector<Integer> out) throws Exception {
out.collect(value.hashCode());
// context
ctx.timestamp(); //获取数据时间戳
ctx.getCurrentKey();// 获取key
//ctx.output();//测输出流,可以通过构造函数吧 测输出流传入
ctx.timerService().currentProcessingTime();// 获取处理时间(属于时间语义的一种)
ctx.timerService().currentWatermark();// 获取水位(事件时间)(属于时间语义的一种)
ctx.timerService().registerProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 1000);//处理时间
// 事件时间,让定时器 延时10s 处理
ctx.timerService().registerEventTimeTimer((value.getEventTime() + 10) * 1000L);
// 删除处理定时器,根据相同的时间为标准,有多个定时器,每个定时器的时间肯定不同,根据时间进行删除
ctx.timerService().deleteProcessingTimeTimer(ctx.timerService().currentProcessingTime() + 1000);
ctx.timerService().deleteEventTimeTimer((value.getEventTime() + 10) * 1000L);
}
// 触发定时器 timestamp,就是触发时间
@Override
public void onTimer(long timestamp, OnTimerContext ctx, Collector<Integer> out) throws Exception {
super.onTimer(timestamp, ctx, out);
ctx.getCurrentKey();
// ctx.output();
// 查看基于什么类型的时间
ctx.timeDomain();
}
}
(1)处理时间——调⽤ ctx.timerService().registerProcessingTimeTimer()注册;onTimer()在系统时间戳达到Timer设定的时间戳时触发。
(2)事件时间——调⽤ ctx.timerService().registerEventTimeTimer()注册;onTimer()在Flink内部⽔印达到或超过Timer设定的时间戳时触发。
网友评论