1. 处理函数
测试案例
第一个水位线为 -9223372036854775808
这里watermark 总是比 上一个事件时间 慢1ms
public class processDemo1 {
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定义端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// 默认是200L
env.getConfig().setAutoWatermarkInterval(200L);
env.setParallelism(2);
// 随机生成事件数据
DataStreamSource<Event> addSource = env.addSource(new Source());
SingleOutputStreamOperator<Event> outputStreamOperator = addSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
outputStreamOperator.process(new ProcessFunction<Event, String>() {
@Override
public void processElement(Event value, ProcessFunction<Event, String>.Context ctx, Collector<String> out) throws Exception {
long watermark = ctx.timerService().currentWatermark();
long processingTime = ctx.timerService().currentProcessingTime();
Long timestamp = ctx.timestamp();
int thisSubtask = getRuntimeContext().getIndexOfThisSubtask();
System.out.println(processingTime);
System.out.println("--------");
System.out.println(timestamp);
out.collect(value.user + " " + value.timestamp + " " + watermark + " " + thisSubtask);
}
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
}
@Override
public void onTimer(long timestamp, ProcessFunction<Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
super.onTimer(timestamp, ctx, out);
}
}).print();
env.execute();
}
}
定时器的简单应用-事件的处理时间
public class processProcessingTime {
public static void main(String[] args) throws Exception{
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定义端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// 默认是200L
env.getConfig().setAutoWatermarkInterval(200L);
env.setParallelism(2);
DataStreamSource<Event> addSource = env.addSource(new Source());
//定时器的触发和事件处理时间
addSource.keyBy(data -> data.user).process(new KeyedProcessFunction<String, Event, String>() {
@Override
public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
//获取时间的处理时间
long processingTime = ctx.timerService().currentProcessingTime();
out.collect(ctx.getCurrentKey() + "事件处理事件为" + new Timestamp(processingTime));
//10秒的定时器
ctx.timerService().registerProcessingTimeTimer(processingTime + 10 * 1000L);
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect(ctx.getCurrentKey() + "定时器触发事件" + new Timestamp(timestamp));
}
}).print();
env.execute();
}
}
定时器的简单应用-事件的事件时间
public class processEventTime {
public static void main(String[] args) throws Exception{
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定义端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// 默认是200L
env.getConfig().setAutoWatermarkInterval(200L);
env.setParallelism(1);
SingleOutputStreamOperator<Event> operator = env.addSource(new Source()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
operator.keyBy(data -> data.user).process(new KeyedProcessFunction<String, Event, String>() {
@Override
public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
Long timestamp = ctx.timestamp();
System.out.println(new Timestamp(value.timestamp));
out.collect(ctx.getCurrentKey() + "时间戳" + new Timestamp(timestamp) + "水位线" + ctx.timerService().currentWatermark());
//10秒的定时器
ctx.timerService().registerEventTimeTimer(timestamp + 10 * 1000L);
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect(ctx.getCurrentKey() + "定时器触发事件" + new Timestamp(timestamp) + "水位线" + ctx.timerService().currentWatermark());
}
}).print();
env.execute();
}
}
2022-05-23 23:54:50.305
大时间戳2022-05-23 23:54:50.305水位线-9223372036854775808
2022-05-23 23:54:51.309
大时间戳2022-05-23 23:54:51.309水位线1653321290304
2022-05-23 23:54:52.309
中时间戳2022-05-23 23:54:52.309水位线1653321291308
2022-05-23 23:54:53.309
中时间戳2022-05-23 23:54:53.309水位线1653321292308
2022-05-23 23:54:54.31
小时间戳2022-05-23 23:54:54.31水位线1653321293308
2022-05-23 23:54:55.31
中时间戳2022-05-23 23:54:55.31水位线1653321294309
2022-05-23 23:54:56.31
中时间戳2022-05-23 23:54:56.31水位线1653321295309
2022-05-23 23:54:57.311
小时间戳2022-05-23 23:54:57.311水位线1653321296309
2022-05-23 23:54:58.311
小时间戳2022-05-23 23:54:58.311水位线1653321297310
2022-05-23 23:54:59.311
中时间戳2022-05-23 23:54:59.311水位线1653321298310
2022-05-23 23:55:00.312
中时间戳2022-05-23 23:55:00.305水位线1653321299310
大定时器触发事件2022-05-23 23:55:00.305水位线1653321300311
定时器的时间必须是小于等于水位线才会触发
processwindowFunction
windowall实现TOP_N
public class processTop_N {
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定义端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// 默认是200L
env.getConfig().setAutoWatermarkInterval(200L);
env.setParallelism(1);
SingleOutputStreamOperator<Event> operator = env.addSource(new Source()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
operator.map(data -> data.url)
//滑动窗口
.windowAll(SlidingProcessingTimeWindows.of(Time.seconds(10),Time.seconds(5)))
.aggregate(new AggregateFunction<String, HashMap<String, Long>, ArrayList<Tuple2<String, Long>>>() {
@Override
public HashMap<String, Long> createAccumulator() {
return new HashMap<String, Long>();
}
@Override
public HashMap<String, Long> add(String value, HashMap<String, Long> accumulator) {
//每来一个数据判断是否和map里的key是重复的,重复的+1,不重复的置为1
if (accumulator.containsKey(value)) {
accumulator.put(value, accumulator.get(value) + 1);
} else {
accumulator.put(value, 1L);
}
return accumulator;
}
@Override
public ArrayList<Tuple2<String, Long>> getResult(HashMap<String, Long> accumulator) {
ArrayList<Tuple2<String, Long>> list = new ArrayList<>();
for (Map.Entry<String, Long> entry : accumulator.entrySet()) {
list.add(Tuple2.of(entry.getKey(), entry.getValue()));
}
list.sort(new Comparator<Tuple2<String, Long>>() {
@Override
public int compare(Tuple2<String, Long> o1, Tuple2<String, Long> o2) {
return (int) (o2.f1 - o1.f1) ;
}
});
return list;
}
@Override
public HashMap<String, Long> merge(HashMap<String, Long> a, HashMap<String, Long> b) {
return null;
}
}, new ProcessAllWindowFunction<ArrayList<Tuple2<String, Long>>, String, TimeWindow>() {
@Override
public void process(ProcessAllWindowFunction<ArrayList<Tuple2<String, Long>>, String, TimeWindow>.Context context, Iterable<ArrayList<Tuple2<String, Long>>> elements, Collector<String> out) throws Exception {
long start = context.window().getStart();
long end = context.window().getEnd();
StringBuilder builder = new StringBuilder();
ArrayList<Tuple2<String, Long>> list = elements.iterator().next();
for (int i = 0; i < 2; i++) {
builder.append("N").append(i + 1).append("事件类型").append(list.get(i).f0).append(" 访问次数 ").append(list.get(i).f1).append("\n");
}
out.collect("窗口开始事件" + new Timestamp(start) + " 窗口结束事件" + new Timestamp(end) + "\n" + builder) ;
}
}).print();
env.execute();
}
}
每个重写方法是每来一条数据执行一次,根据窗口来执行,这个要了解清楚
2. 多流转换
侧输出流
public class outTagDemo {
public static void main(String[] args) throws Exception {
Configuration conf = new Configuration();
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
OutputTag<Tuple2<String, String>> tag_小明 = new OutputTag<>("1"){};
OutputTag<Tuple2<String, String>> tag_小红 = new OutputTag<>("2"){};
DataStreamSource<Event> source = env.addSource(new Source());
SingleOutputStreamOperator<Event> operator = source.process(new ProcessFunction<Event, Event>() {
@Override
public void processElement(Event value, ProcessFunction<Event, Event>.Context ctx, Collector<Event> out) throws Exception {
if (value.user.equals("小明")){
ctx.output(tag_小明,Tuple2.of(value.user,value.url));
}
else if(value.user.equals("小红")){
ctx.output(tag_小红,Tuple2.of(value.user,value.url));
}
else {
out.collect(value);
}
}
});
operator.getSideOutput(tag_小明).print();
operator.getSideOutput(tag_小红).print();
operator.print();
env.execute();
}
}
联合(union)
连接(connect)
双流join-窗口连接
public class windowJoinDemo1 {
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定义端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
SingleOutputStreamOperator<Tuple2<String, Long>> source1 = env.fromElements(
Tuple2.of("1", 1000L),
Tuple2.of("2", 1001L),
Tuple2.of("1", 2000L),
Tuple2.of("2", 2001L),
Tuple2.of("1", 5001L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
}));
SingleOutputStreamOperator<Tuple2<String, Long>> source2 = env.fromElements(
Tuple2.of("1", 4000L),
Tuple2.of("2", 4001L),
Tuple2.of("1", 5000L),
Tuple2.of("2", 6001L),
Tuple2.of("1", 7001L),
Tuple2.of("1", 4001L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
}));
source1.join(source2)
.where(data -> data.f0)
.equalTo(data -> data.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new JoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
@Override
public String join(Tuple2<String, Long> first, Tuple2<String, Long> second) throws Exception {
return first + " ---> " + second;
}
}).print();
env.execute();
}
}
双流join-间隔连接
public class windowJoinDemo2 {
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定义端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
SingleOutputStreamOperator<Tuple2<String, Long>> source1 = env.fromElements(
Tuple2.of("1", 1000L),
Tuple2.of("2", 1001L),
Tuple2.of("1", 2000L),
Tuple2.of("2", 2001L),
Tuple2.of("1", 5001L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
}));
SingleOutputStreamOperator<Tuple2<String, Long>> source2 = env.fromElements(
Tuple2.of("1", 4000L),
Tuple2.of("2", 4001L),
Tuple2.of("1", 5000L),
Tuple2.of("2", 6001L),
Tuple2.of("1", 7001L),
Tuple2.of("1", 4001L),
Tuple2.of("1", 8001L),
Tuple2.of("1", 9001L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
}));
source2.keyBy(data -> data.f0)
.intervalJoin(source1.keyBy(data -> data.f0))
.between(Time.seconds(-2),Time.seconds(2))
.process(new ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
@Override
public void processElement(Tuple2<String, Long> left, Tuple2<String, Long> right, ProcessJoinFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
out.collect(left + "-->" + right + " " + new Timestamp(ctx.getTimestamp()) + "");
}
}).print();
env.execute();
}
}
双流join-同组连结
source1.coGroup(source2)
.where(data -> data.f0)
.equalTo(data -> data.f0)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.apply(new RichCoGroupFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
@Override
public void coGroup(Iterable<Tuple2<String, Long>> first, Iterable<Tuple2<String, Long>> second, Collector<String> out) throws Exception {
out.collect(first + "-->" + second );
}
}).print();
[(1,1000), (1,2000)]-->[(1,4000), (1,4001)]
[(2,1001), (2,2001)]-->[(2,4001)]
[(1,5001)]-->[(1,5000), (1,7001)]
[]-->[(2,6001)]
3. 状态编程
flink中的状态
- 状态的访问权限
- 容错性
- 分布式应用的横向扩展性
补充一下理解flink中的task :flink taskmanager&slots&并行度&任务链&task分配详解
按KEY分区状态
值状态每隔10S统计一次PV
public class stateDemo1 {
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定义端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
SingleOutputStreamOperator<Event> operator = env.addSource(new clickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
operator.keyBy(data -> data.user)
.process(new KeyedProcessFunction<String, Event, String>() {
//定义两个状态:1.保存值 2.保存定时器
ValueState<Long> valueState ;
ValueState<Long> timeState;
@Override
public void open(Configuration parameters) throws Exception {
valueState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("valueState", Long.class));
timeState = getRuntimeContext().getState(new ValueStateDescriptor<Long>("timeState", Long.class));
}
@Override
public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
//更新key-value值
valueState.update(valueState.value() == null ? 1 : valueState.value() + 1);
//判断这个key的定时器是否注册过:我们的目的是10S只输出一次
if (timeState.value() == null){
ctx.timerService().registerEventTimeTimer(ctx.timestamp() + 10 * 1000L);
timeState.update(ctx.timestamp() + 10 * 1000L);
}
out.collect(ctx.getCurrentKey() + "PV " + valueState.value() + "水位线 " + new Timestamp(ctx.timerService().currentWatermark()) + " " + new Timestamp(ctx.timestamp())) ;
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
out.collect("onTimer" + ctx.getCurrentKey() + "PV " + valueState.value() + "水位线 " + new Timestamp(ctx.timerService().currentWatermark()) + " " + new Timestamp(ctx.timestamp()));
//清空定时器
timeState.clear();
// valueState.clear(); 这里清除PV的化就相当于滚动窗口了
// 直到现在下一个key的定时器的产生,依赖于下一个key数据的到达,这就有点类似于会话窗口了,不符合题意,所以需要我们手动注册一下定时器
ctx.timerService().registerEventTimeTimer(timestamp + 10 * 1000L);
timeState.update(timestamp + 10 * 1000L);
}
})
.print();
env.execute();
}
}
这里有个问题,我一直没搞清楚,事件时间和水位线的问题.....把事件时间给打印,修改Duration.ofSeconds(2)测试案例
list-state案例
public class stateDemo2 {
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定义端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
SingleOutputStreamOperator<Tuple2<String, Long>> operator1 = env.fromElements(
Tuple2.of("a", 100L),
Tuple2.of("a", 100L),
Tuple2.of("b", 200L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
}));
SingleOutputStreamOperator<Tuple2<String, Long>> operator2 = env.fromElements(
Tuple2.of("a", 1000L),
Tuple2.of("b", 2000L),
Tuple2.of("c", 2000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {
@Override
public long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {
return element.f1;
}
}));
operator1.keyBy(data -> data.f0)
.connect(operator2.keyBy(data -> data.f0))
.process(new CoProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>() {
ListState<Tuple2<String, Long>> left;
ListState<Tuple2<String, Long>> right;
@Override
public void open(Configuration parameters) throws Exception {
left = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, Long>>("left", Types.TUPLE(Types.STRING, Types.LONG)));
right = getRuntimeContext().getListState(new ListStateDescriptor<Tuple2<String, Long>>("right", Types.TUPLE(Types.STRING, Types.LONG)));
}
@Override
public void processElement1(Tuple2<String, Long> value, CoProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
for (Tuple2<String, Long> left : right.get()) {
out.collect(value + " " + left);
System.out.println("----");
}
System.out.println("11111");
left.add(Tuple2.of(value.f0, value.f1));
}
@Override
public void processElement2(Tuple2<String, Long> value, CoProcessFunction<Tuple2<String, Long>, Tuple2<String, Long>, String>.Context ctx, Collector<String> out) throws Exception {
for (Tuple2<String, Long> right : left.get()) {
out.collect(value + " " + right);
}
System.out.println("22222");
right.add(Tuple2.of(value.f0, value.f1));
}
}).print();
env.execute();
}
}
测试了一下,为什么执行结果是这样的,非常诡异...
11111
(a,1000) (a,100)
22222
(a,100) (a,1000)
----
11111
22222
(b,200) (b,2000)
----
11111
22222
Tuple2.of("a", 1000L),
Tuple2.of("a", 3000L),
Tuple2.of("b", 2000L),
Tuple2.of("c", 2000L)
改了一下,测试案例,确实不太对,3000L的匹配了两次100L,没有进行清除....
实现窗口
public class stateDemo3 {
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定义端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
env.setParallelism(1);
SingleOutputStreamOperator<Event> operator = env.addSource(new clickSource()).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
operator.print();
//按url统计PV
operator.keyBy(data -> data.url)
.process(new keyProcess(10 * 1000L))
.print();
env.execute();
}
}
class keyProcess extends KeyedProcessFunction<String,Event,String>{
private Long tumbleTime;
MapState<Long, Long> mapState;
public keyProcess(Long tumbleTime) {
this.tumbleTime = tumbleTime;
}
@Override
public void open(Configuration parameters) throws Exception {
mapState = getRuntimeContext().getMapState(new MapStateDescriptor<Long, Long>("mapState", Long.class, Long.class));
}
@Override
public void processElement(Event value, KeyedProcessFunction<String, Event, String>.Context ctx, Collector<String> out) throws Exception {
long startTime = value.timestamp / tumbleTime * tumbleTime;
long endTime = startTime + tumbleTime ;
if (mapState.contains(startTime)){
mapState.put(startTime,mapState.get(startTime) + 1);
}
else {
mapState.put(startTime,1L);
}
//相同的定时器会重复吗???
ctx.timerService().registerEventTimeTimer(endTime - 1L);
}
@Override
public void onTimer(long timestamp, KeyedProcessFunction<String, Event, String>.OnTimerContext ctx, Collector<String> out) throws Exception {
long endTime = timestamp + 1L;
long startTime = endTime - tumbleTime;
out.collect("窗口" + new Timestamp(startTime) + "--" + new Timestamp(endTime) + " " + ctx.getCurrentKey() + " PV: " + mapState.get(startTime));
mapState.remove(startTime);
}
}
这里为什么要+1 -1???有什么区别吗
看起来 ontime一次会输出多条数据,但是不是已经keyBy了吗,也就是说一个key一个ontime...是这样吗,这样做性能是不是有点低
状态生存时间TTL
算子状态
算子状态也支持不同的结构类型,主要有三种:liststate,unionliststate,broadcaststate
广播状态
状态持久化
检查点
状态后端
4. 容错机制
检查点(checkpoint)
- 检查点的保存
- 从检查点恢复状态
- 检查点算法
- 检查点配置
- 保存点(savepoint)
状态一致性
什么是状态一致性
- 对于流处理器内部来说,所谓的状态一致性,其实就是我们所说的计算结果要保证准确
- 一条数据不应该丢失,也不应该重复计算
- 在遇到故障可以恢复状态,恢复以后的重新计算,结果应该也是完全正确的
状态一致性分类
- 最多一次:当任务故障,最简单的做法是什么都不干,既不恢复丢失的状态,也不重播丢失的数据.at-most-once 语义的含义是最多处理一次事件
- 至少一次:在大多数的真实应用场景,我们希望不丢失事件,这种类型的保障称为at-least-once,意思是所有的事件都得到了处理,而一些事件还可能被处理多次
- 精确一次: 恰好处理一次是最严格的保证,也是最难实现的,恰好处理一次语义不仅仅意味着没有事件丢失,还意味着针对每一个数据,内部状态仅仅更新一次(逻辑上的)
端到端的精确一致性
幂等写入(idempotent writes)
所谓的幂等性操作,是说一个操作,可以重复执行很多次,但是只会导致一次结果更改,也就是说,后面再重复执行就不起作用了
幂等
有一种场景:数据消费如下
10,20,30,40,50
我们在消费到10的时候做了一个检查点,但是数据在消费到30的时候挂掉了,所以数据要从最近一个检查点恢复,所以看到的数据走势是 10,20,30,10,20,30.....虽然数据最终是一致性的,但是中间的过程客户看到的是不对的...所以我们需要事务写入这种类型的
事务写入(transactional writes)
- 事务(transaction)
应用程序中一系列严密的操作.所有操作必须成功完成,否则在每个操作中所作的操作更改都会被撤销
具有原子性:一个事务中的一系列操作要么全部成功,要么一个都不做 - 实现思想
构建的事务对应着checkpoint 等到checkpoint 真正完成的时候,才把所有对应的结果写入sink系统中 - 实现方式
预写日志
两阶段提交
预写日志(write-ahead-log)
把结果数据先当成状态保存,然后在收到checkpoint完成的通知时,一次性写入sink系统
简单易于实现,由于数据提前在状态后端中做了缓存,所以无论做什么sink系统,都能用这种方式搞定
在大多数场景下是至少一次性语义的
两阶段提交(two-phase-commit)
参考一下: 第九章:一致性与共识
对于每一个checkpoint ,sink任务都会启动一个事务,并将接下来所有接收的数据添加到事务中
然后将这些数据写入到sink系统中,但不提交他们 -- 这时只是预提交(这里能查数据不?不能被消费的)
当收到checkpoint完成的通知,它才正式提交事务,实现结果的真正写入
这种方式真正实现了exactly-once,它需要一个提供事务支持的外部sink系统
flink和kafka连接时的精准一次保障
public class KafkaToKafka {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
//开启checkpointing
env.enableCheckpointing(30000, CheckpointingMode.EXACTLY_ONCE);
env.setStateBackend(new FsStateBackend("file:///Users/xing/Desktop/flinkck20210123"));
//设置Kafka相关参数
Properties properties = new Properties();//设置Kafka的地址和端口
properties.setProperty("bootstrap.servers", "linux03:9092,linux04:9092,linux05:9092");
//读取偏移量策略:如果没有记录偏移量,就从头读,如果记录过偏移量,就接着读
properties.setProperty("auto.offset.reset", "earliest");
//设置消费者组ID
properties.setProperty("group.id", "g1");
//没有开启checkpoint,让flink提交偏移量的消费者定期自动提交偏移量
properties.setProperty("enable.auto.commit", "false");
//创建FlinkKafkaConsumer并传入相关参数
FlinkKafkaConsumer<String> kafkaConsumer = new FlinkKafkaConsumer<>(
"kafka2021", //要读取数据的Topic名称
new SimpleStringSchema(), //读取文件的反序列化Schema
properties //传入Kafka的参数
);
//使用addSource添加kafkaConsumer
kafkaConsumer.setCommitOffsetsOnCheckpoints(false); //在checkpoint时,不将偏移量写入到kafka特殊的topic中
DataStreamSource<String> lines = env.addSource(kafkaConsumer);
SingleOutputStreamOperator<String> filtered = lines.filter(e -> !e.startsWith("error"));
//写入Kafka的topic
String topic = "out2021";
//设置Kafka相关参数
properties.setProperty("transaction.timeout.ms",1000 * 60 * 5 + "");
//创建FlinkKafkaProducer
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
topic, //指定topic
new KafkaStringSerializationSchema(topic), //指定写入Kafka的序列化Schema
properties, //指定Kafka的相关参数
FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定写入Kafka为EXACTLY_ONCE语义
);
filtered.addSink(kafkaProducer);
env.execute();
}
}
/**
* 自定义String类型数据Kafka的序列化Schema
*/
public class KafkaStringSerializationSchema implements KafkaSerializationSchema<String> {
private String topic;
private String charset;
//构造方法传入要写入的topic和字符集,默认使用UTF-8
public KafkaStringSerializationSchema(String topic) {
this(topic, "UTF-8");
}
public KafkaStringSerializationSchema(String topic, String charset) {
this.topic = topic;
this.charset = charset;
}
//调用该方法将数据进行序列化
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
//将数据转成bytes数组
byte[] bytes = element.getBytes(Charset.forName(charset));
//返回ProducerRecord
return new ProducerRecord<>(topic, bytes);
}
}
网友评论