美文网首页
Flink-sql自定义UDTFA函数

Flink-sql自定义UDTFA函数

作者: wudl | 来源:发表于2021-08-19 00:24 被阅读0次

    1. 用自定义的函数在Flink Sql 中使用

    1.1 官网也说的很详细

    https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/tableApi.html

    1.1.1 官网上面的例子:

    **
     * Accumulator for Top2.
     */
    public class Top2Accum {
        public Integer first;
        public Integer second;
    }
    
    /**
     * The top2 user-defined table aggregate function.
     */
    public class Top2 extends TableAggregateFunction<Tuple2<Integer, Integer>, Top2Accum> {
    
        @Override
        public Top2Accum createAccumulator() {
            Top2Accum acc = new Top2Accum();
            acc.first = Integer.MIN_VALUE;
            acc.second = Integer.MIN_VALUE;
            return acc;
        }
    
    
        public void accumulate(Top2Accum acc, Integer v) {
            if (v > acc.first) {
                acc.second = acc.first;
                acc.first = v;
            } else if (v > acc.second) {
                acc.second = v;
            }
        }
    
        public void merge(Top2Accum acc, java.lang.Iterable<Top2Accum> iterable) {
            for (Top2Accum otherAcc : iterable) {
                accumulate(acc, otherAcc.first);
                accumulate(acc, otherAcc.second);
            }
        }
    
        public void emitValue(Top2Accum acc, Collector<Tuple2<Integer, Integer>> out) {
            // emit the value and rank
            if (acc.first != Integer.MIN_VALUE) {
                out.collect(Tuple2.of(acc.first, 1));
            }
            if (acc.second != Integer.MIN_VALUE) {
                out.collect(Tuple2.of(acc.second, 2));
            }
        }
    }
    
    tEnv.registerFunction("top2", new Top2());
    Table orders = tableEnv.from("Orders");
    Table result = orders
        .groupBy($("key"))
        .flatAggregate(call("top2", $("a")).as("v", "rank"))
        .select($("key"), $("v"), $("rank");
    

    2.自己实现

    个人理解:对实时输入的一组数据找到最大的一个

    package com.wudl.flink.sql;
    
    import com.wudl.flink.bean.WaterSensor;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.table.api.Table;
    import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
    import org.apache.flink.table.functions.AggregateFunction;
    import org.apache.flink.table.functions.TableAggregateFunction;
    import org.apache.flink.table.planner.plan.nodes.calcite.LogicalWindowTableAggregate;
    import org.apache.flink.util.Collector;
    
    import static org.apache.flink.table.api.Expressions.$;
    import static org.apache.flink.table.api.Expressions.call;
    
    
    /**
     * @ClassName : Flink_Sql_Function_UDTFA
     * @Description : Flink自定义udtfa 函数
     * @Author :wudl
     * @Date: 2021-08-18 23:55
     */
    
    public class Flink_Sql_Function_UDTFA {
        public static void main(String[] args) throws Exception {
    
    
            //1. 获取执行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setParallelism(1);
            StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
            //2. 读取端口中的数据并且转化为javaBean
            SingleOutputStreamOperator<WaterSensor> waterSensorDs = env.socketTextStream("192.168.1.161", 9999)
                    .map(line -> {
                        String[] split = line.split(",");
                        return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                    });
    
            // 3. 讲流 转化为动态表
            Table table = tableEnv.fromDataStream(waterSensorDs);
    
    
            // 5. 先注册在使用
            tableEnv.createTemporarySystemFunction("Top2", Top2.class);
            //5.1 使用table api 实现的方式
            table.groupBy($("id"))
                    .flatAggregate(call("top2",$("vc")).as("top","rank"))
                    .select($("id"),$("top"),$("rank"))
                    .execute()
                    .print();
            // 5.2 采用sql 的写法
    
            //5. 执行任务
            env.execute();
    
    
        }
    
        // 自定义函数类Udtf 求平均数
        public static class Top2 extends TableAggregateFunction<Tuple2<Integer,String>,VcTop2> {
    
    
            public void accumulate(VcTop2 acc, Integer value) {
    
                if (value > acc.getTopOne()) {
                    acc.setTopTwo(acc.getTopOne());
                    acc.setTopOne(value);
                } else if (value > acc.getTopTwo()) {
                    acc.setTopTwo(value);
                }
            }
    
            public void emitValue(VcTop2 acc, Collector<org.apache.flink.api.java.tuple.Tuple2<Integer, String>> out) {
                out.collect(new org.apache.flink.api.java.tuple.Tuple2<>(acc.getTopOne(), "Top1"));
                if (acc.getTopTwo() > Integer.MIN_VALUE) {
                    out.collect(new org.apache.flink.api.java.tuple.Tuple2<>(acc.getTopTwo(), "Top2"));
                }
            }
    
            @Override
            public VcTop2 createAccumulator() {
                return new VcTop2(Integer.MIN_VALUE, Integer.MIN_VALUE);
            }
        }
    
    }
    
    

    实体类

    package com.wudl.flink.sql;
    
    import lombok.AllArgsConstructor;
    import lombok.Data;
    import lombok.NoArgsConstructor;
    
    /**
     * @ClassName : VcTop2
     * @Description :
     * @Author :wudl
     * @Date: 2021-08-18 23:46
     */
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class VcTop2 {
        private int topOne;
        private int topTwo;
    }
    
    
    Flink-UDTFA函数.png

    相关文章

      网友评论

          本文标题:Flink-sql自定义UDTFA函数

          本文链接:https://www.haomeiwen.com/subject/tyfobltx.html