一、目标
数据源为Kafka ,通过Flink 时间窗口AggregateFunction方法来进行特定窗口内消息事件的次数和累计值。
本例中:使用事件时间(Event Time)、窗口为翻滚窗口(TumblingEventTimeWindows)大小为5s、聚合函数计算特定key对应的窗口内的事件次数和总的金额。
主要内容:
- 本例中事件类SimpleEvent介绍
- Event Time和Watermark 指定和创建
- Window窗口AggregateFunction 实现消息事件的次数和累计值
二、 Event 事件类介绍
例子中Event 为自定义SimpleEvent 事件类消息事件的次数和累计值
public class SimpleEvent implements Serializable {
private String ID_NO;
private BigDecimal AMT;
private long CREATE_TIMESTAMP;
public SimpleEvent(String ID_NO, BigDecimal AMT) {
this.CREATE_TIMESTAMP = System.currentTimeMillis();
this.ID_NO = ID_NO;
this.AMT = AMT;
}
public String getID_NO() {
return ID_NO;
}
public BigDecimal getAMT() {
return AMT;
}
public long getCreationTime() {
return this.CREATE_TIMESTAMP;
}
}
三、Event Time事件时间处理
需要设置env.setStreamTimeCharacteristic
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
由于事件本身带有创建的时间戳,所以直接使用SimpleEvent自带的时间戳作为EventTime ,同时设置watermarks方式为不迟于当前最大EventTime 固定时间。 主要参考Flink官方文档示例实现。
private static class MyTimestampsAndWatermarks implements AssignerWithPeriodicWatermarks<SimpleEvent>{
private final long maxOutOfOrderness = 3500; // 3.5 seconds
private long currentMaxTimestamp;
@Nullable
@Override
public Watermark getCurrentWatermark() {
return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
}
@Override
public long extractTimestamp(SimpleEvent element, long previousElementTimestamp) {
long timestamp = element.getCreationTime();
currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
return timestamp;
}
}
四、Window窗口AggregateFunction 实现消息事件的次数和累计值
AggregateFunction官方接口
public interface AggregateFunction<IN, ACC, OUT> extends Function, Serializable
对应三个参数
@param <IN> The type of the values that are aggregated (input values) 可以理解为输入流数据类型,例子中为SimpleEvent
@param <ACC> The type of the accumulator (intermediate aggregate state). accumulator累加器的类别,本例中为一个复合类,包括key,count,sum分别对应ID_NO,事件次数,时间累计值(总金额)
@param <OUT> The type of the aggregated result 聚合结果类别
本例中实现的AverageAggregate类:
private static class AverageAggregate implements AggregateFunction<SimpleEvent, AverageAccumulator, AverageAccumulator> {
@Override
public AverageAccumulator createAccumulator() {
return new AverageAccumulator();
}
@Override
public AverageAccumulator add(SimpleEvent value, AverageAccumulator accumulator) {
if (accumulator.key.isEmpty()){
accumulator.key = value.getID_NO();
}
accumulator.count +=1;
accumulator.sum =accumulator.sum.add(value.getAMT());
return accumulator;
}
@Override
public AverageAccumulator getResult(AverageAccumulator accumulator) {
//return Long.valueOf(accumulator.sum.toString())/(double)accumulator.count;
return accumulator;
}
@Override
public AverageAccumulator merge(AverageAccumulator a, AverageAccumulator b) {
a.count = a.count+b.count;
a.sum = a.sum.add(b.sum);
return a;
}
}
五、主要代码
// stream 创建 timestamp assigner 和 watermark 机制
DataStream<SimpleEvent> withTimestampsAndWatermarks = sourceStream.assignTimestampsAndWatermarks(new MyTimestampsAndWatermarks());
DataStream<AverageAccumulator> averageAccumulatorStream = withTimestampsAndWatermarks
.keyBy(SimpleEvent::getID_NO)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
.aggregate(new AverageAggregate());
averageAccumulatorStream.map(new MapFunction<AverageAccumulator, String>() {
@Override
public String map(AverageAccumulator value) throws Exception {
System.out.println("TEMP RESULT :"+value.key +" ,"+value.count+ " , "+ value.sum );
return "";
}
});
自定义AverageAccumulator 类:
public class AverageAccumulator {
String key ;
long count;
BigDecimal sum;
long createTime;
long updateTime;
AverageAccumulator(){
key = "" ;
count = 0L ;
sum = new BigDecimal(0);
createTime=0L;
updateTime=0L;
}
}
结果示例:
TEMP RESULT :525992 ,1 , 350
TEMP RESULT :525997 ,5 , 2150
TEMP RESULT :525996 ,2 , 1000
TEMP RESULT :525992 ,2 , 1600
TEMP RESULT :525995 ,2 , 700
TEMP RESULT :525991 ,4 , 1900
TEMP RESULT :525994 ,2 , 1500
完整示例,将在整理后,分享到github。
网友评论