美文网首页
Flink流式计算WordCountTopN

Flink流式计算WordCountTopN

作者: 大空翼123 | 来源:发表于2022-01-23 09:48 被阅读0次

    Flink流式计算WordCountTopN可以采用流处理编程和FlinkSql自定义UDTF函数的方式

    流处理编程方法:

    public class Flink05_WC_TOPN {

    public static void main(String[] args)throws Exception {

    //TODO 1.获取执行环境

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

          // StreamExecutionEnvironment env = StreamExecutionEnvironment.createLocalEnvironmentWithWebUI(new Configuration());

    //并行度1

            env.setParallelism(1);

            //读取无界数据

            DataStreamSource streamSource = env.socketTextStream("hadoop102", 8888);

            //TODO 2.切分数据

            SingleOutputStreamOperator outputStreamOperator = streamSource.flatMap(new FlatMapFunction() {

    @Override

                public void flatMap(String value, Collector out)throws Exception {

    String[] strings = value.split(" ");

                    for (String string : strings) {

    out.collect(string);

                    }

    }

    });

            //转为tuple

            SingleOutputStreamOperator> streamOperator = outputStreamOperator.map(new MapFunction>() {

    @Override

                public Tuple2map(String value)throws Exception {

    return Tuple2.of(value, 1);

                }

    });

            ArrayList> top3list =new ArrayList<>();

            //TODO 3. 聚合求sum

            KeyedStream, Tuple> tuple2TupleKeyedStream = streamOperator.keyBy(0);

            SingleOutputStreamOperator> KVstream = tuple2TupleKeyedStream.sum(1);

            SingleOutputStreamOperator TOP3 = KVstream.process(new ProcessFunction, String>() {

    @Override

                //除去此String在List中保存的上一个状态

                public void processElement(Tuple2 value, Context ctx, Collector out)throws Exception {

    for (int i =0; i

    Tuple2 stringIntegerTuple2 =top3list.get(i);

                        if(stringIntegerTuple2.f0.equals(value.f0)){

    top3list.remove(i);

                        }

    }

    top3list.add(value);

                    top3list.sort(new Comparator>() {

    @Override

                        public int compare(Tuple2 o1, Tuple2 o2) {

    return Integer.valueOf(o2.f1) - Integer.valueOf(o1.f1);

                        }

    });

                    if (top3list.size() >3) {

    top3list.remove(3);

                    }

    out.collect(top3list.toString());

                }

    });

            TOP3.print();

            env.execute();

        }

    }


    FlinkSql自定义UDATF函数的方式使用

    public class Flink26_Fun_UDATF {

    public static void main(String[] args) {

    //TODO 1.获取流执行环境

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

            env.setParallelism(1);

            //TODO 2.获取表的执行环境

            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);

            //TODO 3.获取数据

            DataStreamSource streamSource = env.fromElements(new WaterSensor("sensor_1", 1000L, 10),

                    new WaterSensor("sensor_1", 2000L, 20),

                    new WaterSensor("sensor_2", 3000L, 30),

                    new WaterSensor("sensor_1", 4000L, 40),

                    new WaterSensor("sensor_1", 5000L, 50),

                    new WaterSensor("sensor_2", 6000L, 60));

            //2.从端口读取数据

    /*      DataStreamSource Source= env.socketTextStream("hadoop102", 8888);

    //3.将数据转为WaterSensor

    SingleOutputStreamOperator streamSource = Source.map(new MapFunction() {

    @Override

    public WaterSensor map(String value) throws Exception {

    String[] split = value.split(" ");

    return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));

    }

    });

    */

            //TODO 4.将流转为表

            Table table = tableEnv.fromDataStream(streamSource, $("id"), $("ts"), $("vc"));

            //TODO 5.使用UDF

            //不注册直接使用

            table

    .groupBy("id")

    .flatAggregate(call(MyUDATF.class,$("vc")).as("value","top"))

    .select($("id"),$("value"),$("top"))

    //  .select($("id"),$("f0"),$("f1"))  //不as calue 和top写法

                    .execute().print();

            //注册一个定义函数

    /*  tableEnv.createTemporarySystemFunction("MyUDATF", MyUDATF.class);

    //TableAPi

    table

    .groupBy("id")

    .flatAggregate(call("MyUDATF",$("vc")).as("value","top"))

    .select($("id"),$("value"),$("top"))

    .execute().print();*/

        }

    //定义一个类当做累加器

        public static class top2Vc{

    public Integerfirst=Integer.MIN_VALUE;;

            public Integersecond = Integer.MIN_VALUE;

        }

    //自定义UDATF函数,实现TOP2

      public  static class MyUDATFextends TableAggregateFunction,top2Vc>{

    //创建累加器

            @Override

            public top2VccreateAccumulator() {

    top2Vc top2Vc =new top2Vc();

                return top2Vc;

            }

    //累加操作

            public  void accumulate(top2Vc acc,Integer value){

    //先比较当前数据是否大于第一

                if(value>acc.first){

    acc.second=acc.first;

                    acc.first=value;

                }else if(value>acc.second) {

    acc.second=value;

                }

    }

    //将结果发送出去

            public void emitValue(top2Vc acc, Collector> out){

    if(acc.first!=Integer.MIN_VALUE){

    out.collect(Tuple2.of(acc.first,1));

                }if(acc.second!=Integer.MIN_VALUE){

    out.collect(Tuple2.of(acc.second,2));

                }

    }

    }

    }

    相关文章

      网友评论

          本文标题:Flink流式计算WordCountTopN

          本文链接:https://www.haomeiwen.com/subject/qrryhrtx.html