前言
在Flink中,基于不同的Time Notion来处理流数据,具有不同的意义和结果,官网给出的一张图,非常形象地展示了Process Time、Event Time、Ingestion Time这三个时间分别所处的位置,如下图所示:
下面,分别对这3个Time Notion进行说明如下:
ProcessTime
Flink中有对数据处理的操作进行抽象,称为Transformation Operator,而对于整个Dataflow的开始和结束分别对应着Source Operator和Sink Operator,这些Operator都是在Flink集群系统所在的主机节点上,所以在基于ProcessTime的Notion进行与时间相关的数据处理时,数据处理依赖于Flink程序运行所在的主机节点系统时钟(System Clock)。
因为我们关心的是数据处理时间(Process Time),比如进行Time Window操作,对Window的指派就是基于当前Operator所在主机节点的系统时钟。也就是说,每次创建一个Window,计算Window对应的起始时间和结束时间都使用Process Time,它与外部进入的数据元素的事件时间无关。那么,后续作用于Window的操作(Function)都是基于具有Process Time特性的Window进行的。
使用ProcessTime的场景,比如,我们需要对某个App应用的用户行为进行实时统计分析与监控,由于用户可能使用不同的终端设备,这样可能会造成数据并非是实时的(如用户手机没电,导致2小时以后才会将操作行为记录批量上传上来)。而此时,如果我们按照每分钟的时间粒度做实时统计监控,那么这些数据记录延迟的太严重,如果为了等到这些记录上传上来(无法预测,具体什么时间能获取到这些数据)再做统计分析,对每分钟之内的数据进行统计分析的结果恐怕要到几个小时甚至几天后才能计算并输出结果,这不是我们所希望的。而且,数据处理系统可能也没有这么大的容量来处理海量数据的情况。结合业务需求,其实我们只需要每分钟时间内进入的数据记录,依赖当前数据处理系统的处理时间(Process Time)生成每分钟的Window,指派数据记录到指定Window并计算结果,这样就不用考虑数据元素本身自带的事件时间了。
EventTime
流数据中的数据元素可能会具有不变的事件时间(Event Time)属性,该事件时间是数据元素所代表的行为发生时就不会改变。最简单的情况下,这也最容易理解:所有进入到Flink处理系统的流数据,都是在外部的其它系统中产生的,它们产生后具有了事件时间,经过传输后,进入到Flink处理系统,理论上(如果所有系统都具有相同系统时钟)该事件时间对应的时间戳要早于进入到Flink处理系统中进行处理的时间戳,但实际应用中会出现数据记录乱序、延迟到达等问题,这也是非常普遍的。
基于EventTime的Notion,处理数据的进度(Progress)依赖于数据本身,而不是当前Flink处理系统中Operator所在主机节点的系统时钟。所以,需要有一种机制能够控制数据处理的进度,比如一个基于事件时间的Time Window创建后,具体怎么确定属于该Window的数据元素都已经到达?如果确定都到达了,然后就可以对属于这个Window的所有数据元素做满足需要的处理(如汇总、分组等)。这就要用到WaterMark机制,它能够衡量数据处理进度(表达数据到达的完整性)。
WaterMark带有一个时间戳,假设为X,进入到数据处理系统中的数据元素具有事件时间,记为Y,如果Y<X,则所有的数据元素均已到达,可以计算并输出结果。反过来说,可能更容易理解一些:要想触发对当前Window中的数据元素进行计算,必须保证对所有进入到系统的数据元素,其事件时间Y>=X。如果数据元素的事件时间是有序的,那么当出现一个数据元素的事件时间Y<X,则触发对当前Window计算,并创建另一个新的Window来指派事件时间Y<X的数据元素到该新的Window中。
可以看到,有了WaterMark机制,对基于事件时间的流数据处理会变得特别灵活,可以根据实际业务需要选择各种组件和处理策略。比如,上面我们说到,当Y<X则触发当前Window计算,记为t1时刻,如果流数据元素是乱序的,经过一段时间,假设t2时刻有一个数据元素的事件时间Y>=X,这时该怎么办呢?如果t1时刻的Window已经不存在了,但我们还是希望新出现的乱序数据元素加入到t1时刻Window的计算中,这时可以实现自定义的Trigger来满足各种业务场景的需要。
IngestionTime
IngestionTime是数据进入到Flink流数据处理系统的时间,该时间依赖于Source Operator所在主机节点的系统时钟,会为到达的数据记录指派Ingestion Time。基于IngestionTime的Notion,存在多个Source Operator的情况下,每个Source Operator会使用自己本地系统时钟指派Ingestion Time。后续基于时间相关的各种操作,都会使用数据记录中的Ingestion Time。
与EventTime相比,IngestionTime不能处理乱序、延迟到达事件的应用场景,它也就不用必须指定如何生成WaterMark。
使用EventTime与WaterMark来完成不同类型内容操作行为的统计分析
方法一 调用assignTimestampsAndWatermarks()进行指派
具体操作: TimeWindow的大小设置为1分钟(60000ms),允许延迟到达时间设置为50秒(50000ms),并且为了模拟流数据元素事件时间早于当前处理系统的系统时间,设置延迟时间为2分钟(120000ms)。
首先完成操作行为数据模拟包括数据延迟 乱序等情况: 自定义实现一个source:
import org.apache.commons.lang3.StringUtils;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.time.Instant;
import java.util.*;
import java.util.concurrent.TimeUnit;
/**
* @Auther: dalan
* @Date: 19-3-21 16:44
* @Description:
*/
public class StringLineEventSource extends RichParallelSourceFunction<String> {
/** logger */
private static final Logger LOGGER = LoggerFactory.getLogger(StringLineEventSource.class);
public Long latenessMills;
private volatile boolean running = true;
public StringLineEventSource(){super();}
public StringLineEventSource(Long latenessMills){super(); this.latenessMills = latenessMills;}
private List<String> channelSet = Arrays.asList("a", "b", "c", "d"); // 操作内容: 模拟数据
private List<String> behaviorTypes = Arrays.asList("INSTALL", "OPEN",
"BROWSE", "CLICK",
"PURCHASE", "CLOSE", "UNINSTALL"); // 操作类型
private Random rand = new Random(9527); // 通过随机函数生产结果
@Override
public void run(SourceContext<String> ctx) throws Exception {
long numElements = Long.MAX_VALUE;
long count = 0L;
while (running && count < numElements){
String channel = channelSet.get(rand.nextInt(channelSet.size()));
List<String> event = generateEvent(); // 生产数据: eventtime使用的当前时间
LOGGER.info(event.toString());
String ts = event.get(0);
String id = event.get(1);
String behaviorType = event.get(2);
String result = StringUtils.join(Arrays.asList(ts, channel, id, behaviorType),"\t");
ctx.collect(result);
count += 1;
TimeUnit.MILLISECONDS.sleep(5L);
}
}
private List<String> generateEvent() {
Long delayedTimestamp = Instant.ofEpochMilli(System.currentTimeMillis())
.minusMillis(latenessMills)
.toEpochMilli(); // 延迟时间
// timestamp, id, behaviorType
return Arrays.asList(delayedTimestamp.toString(),
UUID.randomUUID().toString(),
behaviorTypes.get(rand.nextInt(behaviorTypes.size()))); // <ts,uuid,type>
}
@Override
public void cancel() {
this.running = false;
}
}
流数据中的数据元素为字符串记录行的格式,包含字段:事件时间、渠道、用户编号、用户行为类型。在Flink程序中,通过调用stream: DataStream[T]的assignTimestampsAndWatermarks()进行时间戳的指派,并生成WaterMark。然后,基于Keyed Window生成Tumbling Window(不存在Window重叠)来操作数据记录。最后,将计算结果输出到Kafka中去。
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.math.NumberUtils;
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.api.java.tuple.*;
import org.apache.flink.api.java.utils.ParameterTool;
import org.apache.flink.streaming.api.TimeCharacteristic;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.streaming.connectors.kafka.FlinkKafkaProducer011;
import org.apache.flink.util.Collector;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.Arrays;
/**
* @Auther: dalan
* @Date: 19-3-21 16:43
* @Description:
*/
public class UserDefineWaterMark {
/** logger */
private static final Logger LOGGER = LoggerFactory.getLogger(UserDefineWaterMark.class);
// main
public static void main(String[] args) throws Exception {
final ParameterTool params = ParameterTool.fromArgs(args);
Long sourceLatenessMillis = (Long)params.getLong("source-lateness-millis");
Long maxLaggedTimeMillis = params.getLong("window-lagged-millis");
Long windowSizeMillis = params.getLong("window-size-millis");
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime); // 基于event time
DataStream<String> streams = env.addSource(new StringLineEventSource(sourceLatenessMillis));
//解析输入的数据
DataStream<String> inputMap = ((DataStreamSource<String>) streams)
.setParallelism(1)
.assignTimestampsAndWatermarks( // 指派时间戳,并生成WaterMark
new BoundedOutOfOrdernessTimestampExtractor<String>(Time.milliseconds(maxLaggedTimeMillis)){
@Override
public long extractTimestamp(String s) {
return NumberUtils.toLong(s.split("\t")[0]);
}
})
.setParallelism(2)
.map(new MapFunction<String, Tuple2<Tuple2<String,String>, Long>>() {
@Override
public Tuple2<Tuple2<String,String>, Long> map(String value) throws Exception {
String[] arr = value.split("\t");
String channel = arr[1];
return new Tuple2<Tuple2<String,String>, Long>(Tuple2.of(channel, arr[3]), 1L);
}
})
.setParallelism(2)
.keyBy(0)
.window(TumblingEventTimeWindows.of(Time.milliseconds(windowSizeMillis)))
.process(new ProcessWindowFunction<Tuple2<Tuple2<String, String>, Long>, Object, Tuple, TimeWindow>() {
@Override
public void process(Tuple tuple, Context context, Iterable<Tuple2<Tuple2<String, String>, Long>> iterable, Collector<Object> collector) throws Exception {
long count = 0;
Tuple2<String,String> tuple2 = null;
for (Tuple2<Tuple2<String, String>, Long> in : iterable){
tuple2 = in.f0;
count++;
}
LOGGER.info("window===" + tuple.toString());
collector.collect(new Tuple6<String, Long, Long,String,String,Long>(tuple2.getField(0).toString(),context.window().getStart(), context.window().getEnd(),tuple.getField(0).toString(),tuple.getField(1).toString(),count));
}
})
.setParallelism(4)
.map(t -> {
Tuple6<String, Long, Long,String,String,Long> tt = (Tuple6<String, Long, Long,String,String,Long>)t;
Long windowStart = tt.f1;
Long windowEnd = tt.f2;
String channel = tt.f3;
String behaviorType = tt.f4;
Long count = tt.f5;
return StringUtils.join(Arrays.asList(windowStart, windowEnd, channel, behaviorType, count) ,"\t");
})
.setParallelism(3);
inputMap.addSink((new FlinkKafkaProducer011<String>("localhost:9092,localhost:9092","windowed-result-topic",new SimpleStringSchema())))
.setParallelism(3);
//inputMap.print();
env.execute("EventTime and WaterMark Demo");
}
}
通过执行 mvn clean package -DskipTests 生产jar包,或者直接执行main方法,需要添加如下启动参数
--window-result-topic windowed-result-topic
--zookeeper.connect localhost:2181
--bootstrap.servers localhost:9092
--source-lateness-millis 120000 # 源延迟时间
--window-lagged-millis 50000 # 窗口延迟时间
--window-size-millis 60000 # 窗口大小
输出如下内容:
1553169300000 1553169360000 b UNINSTALL 440
1553169300000 1553169360000 c CLOSE 389
1553169300000 1553169360000 b CLICK 445
1553169300000 1553169360000 b OPEN 423
1553169300000 1553169360000 a OPEN 363
1553169300000 1553169360000 b CLOSE 431
1553169300000 1553169360000 a PURCHASE 433
1553169300000 1553169360000 d PURCHASE 427
1553169300000 1553169360000 a INSTALL 388
1553169300000 1553169360000 c BROWSE 381
1553169300000 1553169360000 d CLOSE 446
1553169300000 1553169360000 c UNINSTALL 427
1553169300000 1553169360000 a CLICK 391
1553169300000 1553169360000 b BROWSE 429
1553169300000 1553169360000 a UNINSTALL 377
1553169300000 1553169360000 d INSTALL 397
1553169300000 1553169360000 a CLOSE 423
1553169300000 1553169360000 c PURCHASE 437
1553169300000 1553169360000 a BROWSE 388
1553169300000 1553169360000 d UNINSTALL 379
1553169300000 1553169360000 d BROWSE 431
1553169300000 1553169360000 b INSTALL 434
1553169300000 1553169360000 c CLICK 415
1553169300000 1553169360000 c INSTALL 455
1553169300000 1553169360000 d OPEN 426
1553169300000 1553169360000 c OPEN 399
1553169300000 1553169360000 d CLICK 402
1553169300000 1553169360000 b PURCHASE 442
在上面的代码实现中我们直接使用了Flink内建实现的BoundedOutOfOrdernessTimestampExtractor来指派时间戳和生成WaterMark。从而实现了从事件记录中提取时间戳的逻辑,实际生成WaterMark的逻辑使用BoundedOutOfOrdernessTimestampExtractor提供的默认逻辑.
public abstract class BoundedOutOfOrdernessTimestampExtractor<T> implements AssignerWithPeriodicWatermarks<T> {
private static final long serialVersionUID = 1L;
private long currentMaxTimestamp;
private long lastEmittedWatermark = Long.MIN_VALUE;
private final long maxOutOfOrderness;
public BoundedOutOfOrdernessTimestampExtractor(Time maxOutOfOrderness) {
if (maxOutOfOrderness.toMilliseconds() < 0) {
throw new RuntimeException("Tried to set the maximum allowed " +
"lateness to " + maxOutOfOrderness + ". This parameter cannot be negative.");
}
this.maxOutOfOrderness = maxOutOfOrderness.toMilliseconds();
this.currentMaxTimestamp = Long.MIN_VALUE + this.maxOutOfOrderness; // 初始设置当前最大事件时间戳
}
public long getMaxOutOfOrdernessInMillis() {
return maxOutOfOrderness;
}
public abstract long extractTimestamp(T element);
@Override
public final Watermark getCurrentWatermark() {
long potentialWM = currentMaxTimestamp - maxOutOfOrderness; // 当前最大事件时间戳,减去允许最大延迟到达时间
if (potentialWM >= lastEmittedWatermark) { // 检查上一次emit的WaterMark时间戳,如果比lastEmittedWatermark大则更新其值
lastEmittedWatermark = potentialWM;
}
return new Watermark(lastEmittedWatermark);
}
@Override
public final long extractTimestamp(T element, long previousElementTimestamp) {
long timestamp = extractTimestamp(element);
if (timestamp > currentMaxTimestamp) { // 检查新到达的数据元素的事件时间,用currentMaxTimestamp记录下当前最大的
currentMaxTimestamp = timestamp;
}
return timestamp;
}
}
在getCurrentWatermark()和extractTimestamp()方法中,lastEmittedWatermark是WaterMark中的时间戳,计算它时,总是根据当前进入Flink处理系统的数据元素的最大的事件时间currentMaxTimestamp,然后再减去一个maxOutOfOrderness(外部配置的支持最大延迟到达的时间),也就说,这里面实现的WaterMark中的时间戳序列是非严格单调递增的。
网友评论