概述
- 系统的学习一下flink的streaming 算子操作,学习一下新的技术,增加一下个人的技术栈储备,java/scala的api;
- Flink流式算子api官网连接:DataStream Transformations
- 建议多阅读官网,Flink在阿里的推动下,发展很猛,在官网已经有可中文版,可以尝试阅读一下,边阅读边总结;
- flink 1.9.0官网释义
数据准备
-
先准备一份测试数据,flink消费kafka_010的数据,不采用batch的方式;
-
scala的api练习详见github;
-
java代码中flink消费kafka中的单个topic,scla代码中消费的是两个topic
-
java代码中先运行mockup包下方法,产生测试数据,即可运行算子操作的demo;
streaming算子
Map
- 处理DataStream中的每一个元素;
- 官网释义如下:
DataStream → DataStream
Takes one element and produces one element. A map function that doubles the values of the input stream:
DataStream<Integer> dataStream = //...
dataStream.map(new MapFunction<Integer, Integer>() {
@Override
public Integer map(Integer value) throws Exception {
return 2 * value;
}
});
- demo
/**
* 获取的是整个pojo
* MockUpModel(name=yahui, gender=female, timestamp=1563090399305, age=34)
* @param kafkaData
* @return
*/
private static SingleOutputStreamOperator<MockUpModel> getAddAgePojo(SingleOutputStreamOperator<MockUpModel> kafkaData) {
// 返回的是MockUpModel pojo类,其中age字段均+5
return kafkaData.map(new MapFunction<MockUpModel, MockUpModel>() {
@Override
public MockUpModel map(MockUpModel value) throws Exception {
MockUpModel mockUpModel = new MockUpModel();
mockUpModel.name = value.name;
mockUpModel.gender = value.gender;
mockUpModel.age = value.age + 5;
mockUpModel.timestamp = value.timestamp;
return mockUpModel;
}
});
}
FlatMap
- 将DataStream中的每一个元素展开返回0到多个元素,以iterator的形式返回;
- 官网释义
DataStream → DataStream
Takes one element and produces zero, one, or more elements. A flatmap function that splits sentences to words:
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
- demo
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<MockUpModel> kafkaData = getKafka010Data(env);
// 这个操作不能反应flatMap的算子作用,下面的作用相当于filter,输出结果为MockUpModel(name=liyahui-0, gender=male, timestamp=1561516105296, age=0)
kafkaData.flatMap(new FlatMapFunction<MockUpModel, MockUpModel>() {
@Override
public void flatMap(MockUpModel value, Collector<MockUpModel> out) throws Exception {
if (value.age % 2 == 0) {
out.collect(value);
}
}
}).print().setParallelism(1);
// flatmap,是将嵌套集合转换并平铺成非嵌套集合。最好的解释详见官网的释义
/*
dataStream.flatMap(new FlatMapFunction<String, String>() {
@Override
public void flatMap(String value, Collector<String> out)
throws Exception {
for(String word: value.split(" ")){
out.collect(word);
}
}
});
*/
env.execute("flink kafka010 demo");
}
Fliter
- 过滤,用于DataStream的每一个元素,返回的是true或者false
DataStream → DataStream
Evaluates a boolean function for each element and retains those for which the function returns true. A filter that filters out zero values:
dataStream.filter(new FilterFunction<Integer>() {
@Override
public boolean filter(Integer value) throws Exception {
return value != 0;
}
});
- demo
// method 1
/**
* 过滤出年龄是偶数的人
*
* @param kafkaData
* @return
*/
private static SingleOutputStreamOperator<MockUpModel> getFilterDS2(SingleOutputStreamOperator<MockUpModel> kafkaData) {
return kafkaData.filter(new FilterFunction<MockUpModel>() {
@Override
public boolean filter(MockUpModel value) throws Exception {
if (value.age % 2 == 0) {
return true;
}
return false;
}
});
}
// method-2
/**
* lambda 的方式
*
* @param kafkaData
* @return
*/
private static SingleOutputStreamOperator<MockUpModel> getFilterDS(SingleOutputStreamOperator<MockUpModel> kafkaData) {
return kafkaData.filter(line -> line.age % 2 == 0);
}
KeyBy
- 对DataStream进行按照指定的key进行分区
- 注意:pojo不能重写hashCode方法;
DataStream → KeyedStream
Logically partitions a stream into disjoint partitions. All records with the same key are assigned to the same partition. Internally, *keyBy()* is implemented with hash partitioning. There are different ways to [specify keys](https://ci.apache.org/projects/flink/flink-docs-
master/dev/api_concepts.html#specifying-keys).
This transformation returns a *KeyedStream*, which is, among other things, required to use [keyed state](https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/state/state.html#keyed-state).
dataStream.keyBy("someKey") // Key by field "someKey"
dataStream.keyBy(0) // Key by the first element of a Tuple
Attention A type **cannot be a key** if:
1. it is a POJO type but does not override the *hashCode()* method and relies on the *Object.hashCode()* implementation.
2. it is an array of any type.
- demo
/**
* 以年龄为分组条件进行keyBy
*
* @param kafkaData
* @return
*/
private static KeyedStream<MockUpModel, Integer> getKeyedDS(SingleOutputStreamOperator<MockUpModel> kafkaData) {
// lambda 表达式
//kafkaData.keyBy(line -> line.age).print().setParallelism(1);
return kafkaData.keyBy(new KeySelector<MockUpModel, Integer>() {
@Override
public Integer getKey(MockUpModel value) throws Exception {
return value.age;
}
});
}
Reduce
- 叠加操作
Reduce
KeyedStream → DataStream
A "rolling" reduce on a keyed data stream. Combines the current element with the last reduced value and emits the new value.
A reduce function that creates a stream of partial sums:
keyedStream.reduce(new ReduceFunction<Integer>() {
@Override
public Integer reduce(Integer value1, Integer value2)
throws Exception {
return value1 + value2;
}
});
- demo
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
SingleOutputStreamOperator<MockUpModel> kafka010Data = getKafka010Data(env);
// lambda +java
kafka010Data.keyBy(line -> line.gender).reduce(new ReduceFunction<MockUpModel>() {
@Override
public MockUpModel reduce(MockUpModel value1, MockUpModel value2) throws Exception {
MockUpModel mockUpModel = new MockUpModel();
mockUpModel.name = value1.name + "--" + value2.name;
mockUpModel.gender = value1.gender;
mockUpModel.age = (value1.age + value2.age) / 2;
return mockUpModel;
}
}).print().setParallelism(1);
env.execute("flink kafka010 demo");
}
Fold
- 将keyedDS转成DS;在V1.9中是过期函数
- 官网释义
KeyedStream → DataStream
A "rolling" fold on a keyed data stream with an initial value. Combines the current element with the last folded value and emits the new value.
A fold function that, when applied on the sequence (1,2,3,4,5), emits the sequence "start-1", "start-1-2", "start-1-2-3", ...
DataStream<String> result =
keyedStream.fold("start", new FoldFunction<Integer, String>() {
@Override
public String fold(String current, Integer value) {
return current + "-" + value;
}
});
Aggregations
- 聚合函数,获取keyedStream中的最大/最小/sum值, max 和 maxBy 之间的区别在于 max 返回流中的最大值,但 maxBy 返回具有最大值的键, min 和 minBy 同理。
KeyedStream → DataStream
Rolling aggregations on a keyed data stream. The difference between min and minBy is that min returns the minimum value, whereas minBy returns the element that has the minimum value in this field (same for max and maxBy).
keyedStream.sum(0);
keyedStream.sum("key");
keyedStream.min(0);
keyedStream.min("key");
keyedStream.max(0);
keyedStream.max("key");
keyedStream.minBy(0);
keyedStream.minBy("key");
keyedStream.maxBy(0);
keyedStream.maxBy("key");
union
- 聚合算子,可以将多个结构相同的DataStream进行union;
DataStream* → DataStream
Union of two or more data streams creating a new stream containing all the elements from all the streams. Note: If you union a data stream with itself you will get each element twice in the resulting stream.
dataStream.union(otherStream1, otherStream2, ...);
connect
- 用来将两个dataStream组装成一个ConnectedStreams,而且这个connectedStream的组成结构就是保留原有的dataStream的结构体;这样我们就可以把不同的数据组装成同一个结构;
Connect
DataStream,DataStream → ConnectedStreams
"Connects" two data streams retaining their types. Connect allowing for shared state between the two streams.
DataStream<Integer> someStream = //...
DataStream<String> otherStream = //...
ConnectedStreams<Integer, String> connectedStreams = someStream.connect(otherStream);
split
- Split就是将一个DataStream分成两个或者多个DataStream
DataStream → SplitStream
Split the stream into two or more streams according to some criterion.
SplitStream<Integer> split = someDataStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});
select
- Select就是获取分流后对应的数据
SplitStream → DataStream
Select one or more streams from a split stream.
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");
iterate
DataStream → IterativeStream → DataStream
Creates a "feedback" loop in the flow, by redirecting the output of one operator to some previous operator. This is especially useful for defining algorithms that continuously update a model. The following code starts with a stream and applies the iteration body continuously. Elements that are greater than 0 are sent back to the feedback channel, and the rest of the elements are forwarded downstream. See [iterations](https://ci.apache.org/projects/flink/flink-docs-master/dev/stream/operators/index.html#iterations) for a complete description.
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value <= 0;
}
});
总结
- Flink的stream算子学习第一部分,简单完成。后期优化;
李小李可不能落后啊
网友评论