1. Transformation
map
import org.apache.flink.api.common.functions.MapFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class mapTransformation {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> source = env.fromElements(
new Event("小米", "0001", 100000L),
new Event("小米", "0001", 200000L)
);
//1. 实现一个类
source.map(new myMap()).print();
//2. 使用匿名类
source.map(new MapFunction<Event, String>() {
@Override
public String map(Event event) throws Exception {
return event.url;
}
}).print();
//3. Lambda表达式,这里已经帮你实现了没有类型擦除
source.map(data -> data.timestamp.toString()).print();
env.execute();
}
public static class myMap implements MapFunction<Event,String>{
@Override
public String map(Event event) throws Exception {
return event.user;
}
}
}
filter
import org.apache.flink.api.common.functions.FilterFunction;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class filterTransformation {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Event> source = env.fromElements(
new Event("小米", "0001", 100000L),
new Event("小红", "0002", 200000L),
new Event("小黄", "0003", 200000L)
);
//1. 实现一个类
source.filter(new myFilter()).print();
//2. 使用匿名类
source.filter(new FilterFunction<Event>() {
@Override
public boolean filter(Event event) throws Exception {
return event.user.equals("小红");
}
}).print();
//3. lambda表达式
source.filter(data -> data.user.equals("小黄")).print();
env.execute();
}
public static class myFilter implements FilterFunction<Event>{
@Override
public boolean filter(Event s) throws Exception {
return s.user.equals("小米");
}
}
}
flatmap
import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.common.typeinfo.TypeHint;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.util.Collector;
public class flatmapTransformation {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStreamSource<Event> source = env.fromElements(
new Event("小米", "0001", 100000L),
new Event("小红", "0002", 200000L)
);
//1.实现一个类
source.flatMap(new myflatMap()).print("1");
//2. 匿名类
//3. 注意使用lambda类型擦除的问题
source.flatMap((Event event ,Collector<String> collector) -> {
if (event.user.equals("小米")){
collector.collect(event.user);
collector.collect(event.url);
collector.collect(event.timestamp.toString());
}
else if (event.user.equals("小红")){
collector.collect("空");
}
}).returns(TypeInformation.of(String.class)).print("2");
//returns(new TypeHint<String>() {}).print("2");
env.execute();
}
public static class myflatMap implements FlatMapFunction<Event , String>{
@Override
public void flatMap(Event event, Collector<String> collector) throws Exception {
collector.collect(event.user);
collector.collect(event.url);
collector.collect(event.timestamp.toString());
}
}
}
keyBy
DataStream → KeyedStream:逻辑地将一个流拆分成不相交的分区,每个分区包含具有相同 key 的元素,在内部以 hash 的形式实现的。
max&maxBy
区别max返回最大值 maxBy 把最大值对应的元素全部返回
public class keyByAggregation {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource<Event> source = env.fromElements(
new Event("小米", "0001", 100000L),
new Event("小米", "0002", 200000L),
new Event("小米", "0003", 150000L),
new Event("小红", "0001", 200000L)
);
source.keyBy(new KeySelector<Event, String>() {
@Override
public String getKey(Event event) throws Exception {
return event.user;
}
}).max("timestamp").print("max");
source.keyBy(data -> data.user).maxBy("timestamp").print("maxBy");
env.execute();
}
}
reduce
import org.apache.flink.api.common.typeinfo.Types;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
public class reduceAggregation {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
DataStreamSource<Event> source = env.fromElements(
new Event("小米", "0001", 100000L),
new Event("小米", "0002", 200000L),
new Event("小米", "0003", 150000L),
new Event("小红", "0001", 200000L)
);
// 1.次数统计
SingleOutputStreamOperator<Tuple2<String, Integer>> reduce = source.map(data -> Tuple2.of(data.user, 1)).returns(Types.TUPLE(Types.STRING, Types.INT)).keyBy(data -> data.f0).reduce((data1, data2) -> Tuple2.of(data1.f0, data1.f1 + data2.f1));
reduce.print();
//2.选取当前最活跃的用户
reduce.keyBy(data -> "key").reduce((data1,data2) -> data1.f1 > data2.f1 ? data1 : data2 ).print();
env.execute();
}
}
用户自定义UDF
函数类(Function Classes
匿名函数(Lambda Functions
富函数(Rich Functions)
“富函数”是 DataStream API 提供的一个函数类的接口,所有 Flink 函数类都有其 Rich 版本。它与常规函数的不同在于,可以获取运行环境的上下文,并拥有一些生命周期方法,所以可以实现更复杂的功能
Rich Function 有一个生命周期的概念。典型的生命周期方法有:
- open()方法是 rich function 的初始化方法,当一个算子例如 map 或者 filter被调用之前 open()会被调用。
- close()方法是生命周期中的最后一个调用的方法,做一些清理工作。
- getRuntimeContext()方法提供了函数的 RuntimeContext 的一些信息,例如函数执行的并行度,任务的名字,以及 state 状态
public class richMap {
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(2);
DataStreamSource<Event> source = env.fromElements(
new Event("小米", "0001", 100000L),
new Event("小米", "0002", 200000L),
new Event("小米", "0003", 150000L),
new Event("小红", "0001", 200000L)
);
source.map(new myRichMap()).print();
env.execute();
}
public static class myRichMap extends RichMapFunction<Event,Tuple2<String,Integer>>{
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
System.out.println("open" + getRuntimeContext().getTaskNameWithSubtasks());
}
@Override
public Tuple2<String, Integer> map(Event event) throws Exception {
return Tuple2.of(event.user,1);
}
@Override
public void close() throws Exception {
super.close();
System.out.println("close" + getRuntimeContext().getTaskName());
}
}
}
网友评论