Flink流式计算WordCountTopN可以采用流处理编程和FlinkSql自定义UDTF函数的方式
流处理编程方法:
public class Flink05_WC_TOPN {
public static void main(String[] args)throws Exception {
//TODO 1.获取执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());
//并行度1
env.setParallelism(1);
//读取无界数据
DataStreamSource streamSource = env.socketTextStream("hadoop102", 8888);
//TODO 2.切分数据
SingleOutputStreamOperator outputStreamOperator = streamSource.flatMap(new FlatMapFunction() {
@Override
public void flatMap(String value, Collector out)throws Exception {
String[] strings = value.split(" ");
for (String string : strings) {
out.collect(string);
}
}
});
//转为tuple
SingleOutputStreamOperator> streamOperator = outputStreamOperator.map(new MapFunction>() {
@Override
public Tuple2map(String value)throws Exception {
return Tuple2.of(value, 1);
}
});
ArrayList> top3list =new ArrayList<>();
//TODO 3. 聚合求sum
KeyedStream, Tuple> tuple2TupleKeyedStream = streamOperator.keyBy(0);
SingleOutputStreamOperator> KVstream = tuple2TupleKeyedStream.sum(1);
SingleOutputStreamOperator TOP3 = KVstream.process(new ProcessFunction, String>() {
@Override
//除去此String在List中保存的上一个状态
public void processElement(Tuple2 value, Context ctx, Collector out)throws Exception {
for (int i =0; i
Tuple2 stringIntegerTuple2 =top3list.get(i);
if(stringIntegerTuple2.f0.equals(value.f0)){
top3list.remove(i);
}
}
top3list.add(value);
top3list.sort(new Comparator>() {
@Override
public int compare(Tuple2 o1, Tuple2 o2) {
return Integer.valueOf(o2.f1) - Integer.valueOf(o1.f1);
}
});
if (top3list.size() >3) {
top3list.remove(3);
}
out.collect(top3list.toString());
}
});
TOP3.print();
env.execute();
}
}
FlinkSql自定义UDATF函数的方式使用
public class Flink26_Fun_UDATF {
public static void main(String[] args) {
//TODO 1.获取流执行环境
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);
//TODO 2.获取表的执行环境
StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
//TODO 3.获取数据
DataStreamSource streamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),
new WaterSensor("sensor_1", 2000L, 20),
new WaterSensor("sensor_2", 3000L, 30),
new WaterSensor("sensor_1", 4000L, 40),
new WaterSensor("sensor_1", 5000L, 50),
new WaterSensor("sensor_2", 6000L, 60));
//2.从端口读取数据
/* DataStreamSource Source= env.socketTextStream("hadoop102", 8888);
//3.将数据转为WaterSensor
SingleOutputStreamOperator streamSource = Source.map(new MapFunction() {
@Override
public WaterSensor map(String value) throws Exception {
String[] split = value.split(" ");
return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
}
});
*/
//TODO 4.将流转为表
Table table = tableEnv.fromDataStream(streamSource, $("id"), $("ts"), $("vc"));
//TODO 5.使用UDF
//不注册直接使用
table
.groupBy("id")
.flatAggregate(call(MyUDATF.class,$("vc")).as("value","top"))
.select($("id"),$("value"),$("top"))
// .select($("id"),$("f0"),$("f1")) //不as calue 和top写法
.execute().print();
//注册一个定义函数
/* tableEnv.createTemporarySystemFunction("MyUDATF", MyUDATF.class);
//TableAPi
table
.groupBy("id")
.flatAggregate(call("MyUDATF",$("vc")).as("value","top"))
.select($("id"),$("value"),$("top"))
.execute().print();*/
}
//定义一个类当做累加器
public static class top2Vc{
public Integerfirst=Integer.MIN_VALUE;;
public Integersecond = Integer.MIN_VALUE;
}
//自定义UDATF函数,实现TOP2
public static class MyUDATFextends TableAggregateFunction,top2Vc>{
//创建累加器
@Override
public top2VccreateAccumulator() {
top2Vc top2Vc =new top2Vc();
return top2Vc;
}
//累加操作
public void accumulate(top2Vc acc,Integer value){
//先比较当前数据是否大于第一
if(value>acc.first){
acc.second=acc.first;
acc.first=value;
}else if(value>acc.second) {
acc.second=value;
}
}
//将结果发送出去
public void emitValue(top2Vc acc, Collector> out){
if(acc.first!=Integer.MIN_VALUE){
out.collect(Tuple2.of(acc.first,1));
}if(acc.second!=Integer.MIN_VALUE){
out.collect(Tuple2.of(acc.second,2));
}
}
}
}
网友评论