1. 数据分区
随机和轮询分区,广播,全局分区
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.RichParallelSourceFunction;
public class richRescale {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Integer> source = env.addSource(new RichParallelSourceFunction<Integer>() {
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
for (int i = 1; i < 10; i++) {
if (i % 2 == getRuntimeContext().getIndexOfThisSubtask()) {
System.out.println(getRuntimeContext().getIndexOfThisSubtask());
ctx.collect(i);
}
}
}
@Override
public void cancel() {
}
});
source.setParallelism(2).rescale().print().setParallelism(4);
source.setParallelism(2).rebalance().print().setParallelism(4);
source.broadcast().print().setParallelism(4);
source.global().print().setParallelism(4);
env.execute();
}
}
自定义重分区
2. Sink
写入文件系统
import org.apache.flink.api.common.serialization.SimpleStringEncoder;
import org.apache.flink.core.fs.Path;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.filesystem.StreamingFileSink;
import org.apache.flink.streaming.api.functions.sink.filesystem.rollingpolicies.DefaultRollingPolicy;
import org.apache.flink.util.TimeUtils;
import java.util.concurrent.TimeUnit;
public class toFile {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Event> source = env.fromElements(
new Event("小米", "0001", 100000L),
new Event("小红", "0002", 200000L),
new Event("小黄", "0003", 200000L)
);
StreamingFileSink<String> build = StreamingFileSink.<String>forRowFormat(new Path("./output"), new SimpleStringEncoder<>("UTF-8"))
.withRollingPolicy(
DefaultRollingPolicy.builder()
.withMaxPartSize(1024 * 1024 * 1024)
.withRolloverInterval(TimeUnit.MINUTES.toMinutes(15))
.withInactivityInterval(TimeUnit.MINUTES.toMinutes(15))
.build()
).build();
source.map(Event::toString).addSink(build);
env.execute();
}
}
写入Kafka
在实际的生产环境中,经常会有一些场景,需要将Flink处理后的数据快速地写入到一个分布式、高吞吐、高可用、可用保证Exactly Once的消息中间件中,供其他的应用消费处理后的数据。Kafka就是Flink最好的黄金搭档,Flink不但可以从Kafka中消费数据,还可以将处理后的数据写入到Kafka,并且吞吐量高、数据安全、可以保证Exactly Once等。
Flink可以和Kafka多个版本整合,比如0.11.x、1.x、2.x等,从Flink1.9开始,使用的是kafka 2.2的客户端,所以这里使用kafka的版本是2.2.2,并且使用最新的API。
下面的例子就是将数据写入到Kafka中,首先要定义一个类实现KafkaSerializationSchema接口,指定一个泛型,String代表要写入到Kafka的数据为String类型。该类的功能是指定写入到Kafka中数据的序列化Schema,需要重写serialize方法,将要写入的数据转成二进制数组,并封装到一个ProducerRecord中返回。
//自定义String类型数据Kafka的序列化Schema
public class KafkaStringSerializationSchema implements KafkaSerializationSchema<String> {
private String topic;
private String charset;
//构造方法传入要写入的topic和字符集,默认使用UTF-8
public KafkaStringSerializationSchema(String topic) {
this(topic, “UTF-8”);
}
public KafkaStringSerializationSchema(String topic, String charset) {
this.topic = topic;
this.charset = charset;
}
//调用该方法将数据进行序列化
@Override
public ProducerRecord<byte[], byte[]> serialize(
String element, @Nullable Long timestamp) {
//将数据转成bytes数组
byte[] bytes = element.getBytes(Charset.forName(charset));
//返回ProducerRecord
return new ProducerRecord<>(topic, bytes);
}
}
然后将Kafka相关的参数设置到Properties中,再new FlinkKafkaProducer,将要写入的topic名称、Kafka序列化Schema、Properties和写入到Kafka的Semantic语义作为FlinkKafkaProducer构造方法参数传入。最好调用addSink方法将FlinkKafkaProducer的引用传入到该方法中。虽然下面的代码指定了EXACTLY_ONCE语义,但是没有开启Checkpointing,是没法实现的。
DataStream<String> dataSteam = …
//写入Kafka的topic
String topic = “test”;
//设置Kafka相关参数
Properties properties = new Properties();
properties.setProperty(“bootstrap.servers”, “node-1:9092,node-2:9092,node-3:9092”);
//创建FlinkKafkaProducer
FlinkKafkaProducer<String> kafkaProducer = new FlinkKafkaProducer<String>(
topic, //指定topic
new KafkaStringSerializationSchema(topic), //指定写入Kafka的序列化Schema
properties, //指定Kafka的相关参数
FlinkKafkaProducer.Semantic.EXACTLY_ONCE //指定写入Kafka为EXACTLY_ONCE语义
);
//添加KafkaSink
dataSteam.addSink(kafkaProducer);
写入Ridis
<dependency>
<groupId>org.apache.bahir</groupId>
<artifactId>flink-connector-redis_2.12</artifactId>
<version>1.1-SNAPSHOT</version>
</dependency>
定义一个类(或者静态内部类)实现RedisMapper即可,需要指定一个泛型,这里是Tuple2<String, Integer>,即写入到Redis中的数据的类型,并实现三个方法。第一个方法是getCommandDescription方法,返回RedisCommandDescription实例,在该构造方法中可以指定写入到Redis的方法类型为HSET,和Redis的additionalKey即value为HASH类型外面key的值;第二个方法getKeyFromData是指定value为HASH类型对应key的值;第三个方法geVauleFromData是指定value为HASH类型对应value的值
public static class RedisWordCountMapper implements RedisMapper<Tuple2<String, Integer>> {
@Override
public RedisCommandDescription getCommandDescription() {
//写入Redis的方法,value使用HASH类型,并指定外面key的值得名称
return new RedisCommandDescription(RedisCommand.HSET, “WORD_COUNT”);
}
@Override
public String getKeyFromData(Tuple2<String, Integer> data) {
return data.f0; //指定写入Redis的value里面key的值
}
@Override
public String getValueFromData(Tuple2<String, Integer> data) {
return data.f1.toString(); //指定写入value里面value的值
}
}
DataStream<Tuple2<String, Integer>> result = wordAndOne.keyBy(0).sum(1);//设置
Redis的参数,如地址、端口号等
FlinkJedisPoolConfig conf = new FlinkJedisPoolConfig.Builder().setHost(“localhost”).setPassword(“123456”).build();
//将数据写入Redis
result.addSink(new RedisSink<>(conf, new RedisWordCountMapper()));
写入ES
写入Mysql
写入自定义Sink
3.时间和窗口
时间语义
- 处理时间
- 事件时间
水位线
实际应用中,如果数据量很大,很多数据的当前时间戳戳就会是一样的,这样每来一条数据就提取时间戳,插入水位线,就显得非常浪费,所以策略是周期性的插入一条水位线
乱序流中周期性生成水位线
如果我们考虑到大量的数据同时到来的处理效率,我们同样可以周期性的生成水位线,这样的乱序数据就会导致计算误差,迟到的数据就会落到不属于它的窗口,前一个窗口丢失数据,后一个窗口统计了不属于他的窗口
为了让窗口能够正确收集迟到的数据,我们也可以等上一段时间,比如两秒,也就是用当前已有数据的最大时间戳减去2秒,就是要插入的水位线的时间戳(我们认为会延迟两秒,但是可能不值延迟两秒,导致窗口丢失数据的)
水位线的特性
- 水位线是插入到数据流中的一个标记,可以认为是一个特殊的数据
- 水位线的内容是一个时间戳,用来表示当前事件时间的进展
- 水位线是基于数据的时间戳生成的
- 水位线的时间戳必须单调递增,以确保任务的事件时间时钟一直向前推进
- 水位线可以通过设置延迟,来保证正确处理乱序数据
- 一个水位线Watermark(t),表示在当前流中事件时间已经到达了时间戳t,这代表t之前的所有数据都到齐了,之后流中不会出现时间戳t` <= t 的数据
水位线的生成策略
- 有序流的watermark生成
import DAY2.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class demo1 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
env.getConfig().setAutoWatermarkInterval(100L);
env.fromElements(
new Event("小米", "0001", 100000L),
new Event("小红", "0002", 200000L),
new Event("小黄", "0003", 200000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forMonotonousTimestamps().withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})).print();
env.execute();
}
}
- 乱序流的watermark生成
import DAY2.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import java.time.Duration;
public class demo2 {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(new Configuration());
env.getConfig().setAutoWatermarkInterval(100L);
env.fromElements(
new Event("小米", "0001", 100000L),
new Event("小红", "0002", 200000L),
new Event("小黄", "0003", 200000L)
).assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofMinutes(1L)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
})).print();
env.execute();
}
}
- Assigner with periodic waterm
- Assigner with punctuated watermark
水位线的传递
4. 窗口
窗口的基本概念
window 是一种切割无限数据为有限块进行处理的手段
窗口的分类
- 时间窗口,计数窗口
- 滚动窗口,滑动窗口,会话窗口(合并)
- 全局窗口(自定义触发器)
窗口API类型
- 非按键分区(Non-Keyed Windows)
- 按键分区窗口(Keyed Windows)
窗口函数
- 增量聚合函数
归约函数/全窗口函数
import DAY2.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.ReduceFunction;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.Random;
public class reduceWindow {
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定义端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// 默认是200L
env.getConfig().setAutoWatermarkInterval(1000L);
env.setParallelism(1);
// 随机生成事件数据
DataStreamSource<Event> addSource = env.addSource(new SourceFunction<Event>() {
private Boolean flag = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
Random random = new Random();
String[] uid = new String[]{"小", "中", "大"};
String[] url = new String[]{"./click", "./show", "./slide"};
while (flag) {
ctx.collect(new Event(uid[random.nextInt(uid.length)], url[random.nextInt(url.length)], System.currentTimeMillis()));
Thread.sleep(100L);
}
}
@Override
public void cancel() {
flag = false;
}
});
// 乱序数据
SingleOutputStreamOperator<Event> outputStreamOperator = addSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
// map 和 keyby
outputStreamOperator.map(data -> Tuple2.of(data.user,1L)).returns(Types.TUPLE(Types.STRING,Types.LONG))
.keyBy(data -> data.f0)
// 滑动窗口
//.window(SlidingEventTimeWindows.of(Time.milliseconds(1000L),Time.seconds(100L))
// 滚动窗口
.window(TumblingEventTimeWindows.of(Time.seconds(1)))
.reduce(new ReduceFunction<Tuple2<String, Long>>() {
@Override
public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
return Tuple2.of(value1.f0,value1.f1+ value2.f1);
}
}).print();
env.execute();
}
}
- 聚合函数(aggregate)
import DAY2.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.AggregateFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import java.time.Duration;
import java.util.Date;
import java.util.Random;
public class windowAggr_AVG {
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定义端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// 默认是200L
env.getConfig().setAutoWatermarkInterval(1000L);
env.setParallelism(1);
// 随机生成事件数据
DataStreamSource<Event> addSource = env.addSource(new SourceFunction<Event>() {
private Boolean flag = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
Random random = new Random();
String[] uid = new String[]{"小", "中", "大"};
String[] url = new String[]{"./click", "./show", "./slide"};
while (flag) {
ctx.collect(new Event(uid[random.nextInt(uid.length)], url[random.nextInt(url.length)], System.currentTimeMillis()));
Thread.sleep(200L);
}
}
@Override
public void cancel() {
flag = false;
}
});
// 乱序数据
SingleOutputStreamOperator<Event> outputStreamOperator = addSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ofSeconds(2L)).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
outputStreamOperator.keyBy(data -> data.user)
.window(TumblingEventTimeWindows.of(Time.seconds(2)))
.aggregate(new AggregateFunction<Event, Tuple3<Long,Integer,String>, Tuple2<String,Date>>() {
@Override
public Tuple3<Long, Integer, String> createAccumulator() {
//初始化
return Tuple3.of(0L,0,"");
}
@Override
public Tuple3<Long, Integer, String> add(Event value, Tuple3<Long, Integer, String> accumulator) {
//中间ACC计算
return Tuple3.of(accumulator.f0+value.timestamp , accumulator.f1+1,value.user);
}
@Override
public Tuple2<String, Date> getResult(Tuple3<Long, Integer, String> accumulator) {
return Tuple2.of(accumulator.f2, new Date(accumulator.f0 / accumulator.f1));
}
//用于会话窗口
@Override
public Tuple3<Long, Integer, String> merge(Tuple3<Long, Integer, String> a, Tuple3<Long, Integer, String> b) {
return Tuple3.of(a.f0+b.f0,a.f1+b.f1,b.f2);
}
}).print();
env.execute();
}
}
- 全窗口函数(full window functions)
与增量聚合函数不同,全窗口函数需要先收集窗口中的数据,并在内部缓存起来,等到窗口要输出结果的时候再取出数据进行计算
窗口函数
处理窗口函数
import DAY2.Event;
import org.apache.flink.api.common.eventtime.SerializableTimestampAssigner;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.configuration.ConfigConstants;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.configuration.RestOptions;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import org.apache.flink.streaming.api.functions.windowing.ProcessWindowFunction;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
import org.apache.flink.util.Collector;
import java.sql.Timestamp;
import java.time.Duration;
import java.util.HashSet;
import java.util.Random;
public class proessWindow {
public static void main(String[] args) throws Exception {
Configuration conf= new Configuration();
conf.setBoolean(ConfigConstants.TASK_MANAGER_LOG_PATH_KEY, true);
//自定义端口
conf.setInteger(RestOptions.PORT, 8050);
StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(conf);
// 默认是200L
env.getConfig().setAutoWatermarkInterval(200L);
env.setParallelism(1);
// 随机生成事件数据
DataStreamSource<Event> addSource = env.addSource(new SourceFunction<Event>() {
private Boolean flag = true;
@Override
public void run(SourceContext<Event> ctx) throws Exception {
Random random = new Random();
String[] uid = new String[]{"小", "中", "大"};
String[] url = new String[]{"./click", "./show", "./slide"};
while (flag) {
ctx.collect(new Event(uid[random.nextInt(uid.length)], url[random.nextInt(url.length)], System.currentTimeMillis()));
Thread.sleep(1000L);
}
}
@Override
public void cancel() {
flag = false;
}
});
addSource.print();
// 乱序数据
SingleOutputStreamOperator<Event> outputStreamOperator = addSource.assignTimestampsAndWatermarks(WatermarkStrategy.<Event>forBoundedOutOfOrderness(Duration.ZERO).withTimestampAssigner(new SerializableTimestampAssigner<Event>() {
@Override
public long extractTimestamp(Event element, long recordTimestamp) {
return element.timestamp;
}
}));
// map 和 keyby
outputStreamOperator.map(data -> Tuple2.of(data.user,1L)).returns(Types.TUPLE(Types.STRING,Types.LONG))
.keyBy(data -> true)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new ProcessWindowFunction<Tuple2<String, Long>, String, Boolean, TimeWindow>() {
@Override
public void process(Boolean aBoolean, ProcessWindowFunction<Tuple2<String, Long>, String, Boolean, TimeWindow>.Context context, Iterable<Tuple2<String, Long>> elements, Collector<String> out) throws Exception {
HashSet<String> uidSst = new HashSet<>();
for (Tuple2<String, Long> element : elements) {
uidSst.add(element.f0);
}
long start = context.window().getStart();
long end = context.window().getEnd();
out.collect("窗口 " + uidSst.size() + " " +new Timestamp(start) + " " +new Timestamp(end));
}
}).print();
env.execute();
}
}
设置延迟时间输出顺序的影响
- 两种窗口函数的结合
网友评论