美文网首页
Flink-sql自定义UDAF函数

Flink-sql自定义UDAF函数

作者: wudl | 来源:发表于2021-08-17 23:56 被阅读0次

1. 用自定义的函数在Flink Sql 中使用

1.1 官网也说的很详细

https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/functions/udfs.html#scalar-functions

1.1.1 官网上面的例子:

import org.apache.flink.table.api.*;
import org.apache.flink.table.functions.AggregateFunction;
import static org.apache.flink.table.api.Expressions.*;

// mutable accumulator of structured type for the aggregate function
public static class WeightedAvgAccumulator {
  public long sum = 0;
  public int count = 0;
}

// function that takes (value BIGINT, weight INT), stores intermediate results in a structured
// type of WeightedAvgAccumulator, and returns the weighted average as BIGINT
public static class WeightedAvg extends AggregateFunction<Long, WeightedAvgAccumulator> {

  @Override
  public WeightedAvgAccumulator createAccumulator() {
    return new WeightedAvgAccumulator();
  }

  @Override
  public Long getValue(WeightedAvgAccumulator acc) {
    if (acc.count == 0) {
      return null;
    } else {
      return acc.sum / acc.count;
    }
  }

  public void accumulate(WeightedAvgAccumulator acc, Long iValue, Integer iWeight) {
    acc.sum += iValue * iWeight;
    acc.count += iWeight;
  }

  public void retract(WeightedAvgAccumulator acc, Long iValue, Integer iWeight) {
    acc.sum -= iValue * iWeight;
    acc.count -= iWeight;
  }

  public void merge(WeightedAvgAccumulator acc, Iterable<WeightedAvgAccumulator> it) {
    for (WeightedAvgAccumulator a : it) {
      acc.count += a.count;
      acc.sum += a.sum;
    }
  }

  public void resetAccumulator(WeightedAvgAccumulator acc) {
    acc.count = 0;
    acc.sum = 0L;
  }
}

TableEnvironment env = TableEnvironment.create(...);

// call function "inline" without registration in Table API
env
  .from("MyTable")
  .groupBy($("myField"))
  .select($("myField"), call(WeightedAvg.class, $("value"), $("weight")));

// register function
env.createTemporarySystemFunction("WeightedAvg", WeightedAvg.class);

// call registered function in Table API
env
  .from("MyTable")
  .groupBy($("myField"))
  .select($("myField"), call("WeightedAvg", $("value"), $("weight")));

// call registered function in SQL
env.sqlQuery(
  "SELECT myField, WeightedAvg(`value`, weight) FROM MyTable GROUP BY myField"
);

2.自己实现

package com.wudl.flink.sql;

import com.wudl.flink.bean.WaterSensor;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.table.annotation.DataTypeHint;
import org.apache.flink.table.annotation.FunctionHint;
import org.apache.flink.table.api.Table;
import org.apache.flink.table.api.bridge.java.StreamTableEnvironment;
import org.apache.flink.table.functions.AggregateFunction;
import org.apache.flink.table.functions.ScalarFunction;
import org.apache.flink.table.functions.TableFunction;
import org.apache.flink.types.Row;

import static org.apache.flink.table.api.Expressions.$;
import static org.apache.flink.table.api.Expressions.call;

/**
 * @ClassName : Flink_Sql_Function_UDTF
 * @Description : Flink自定义udtf 函数
 * @Author :wudl
 * @Date: 2021-08-17 22:55
 */

public class Flink_Sql_Function_UDTF {
    public static void main(String[] args) throws Exception {


        //1. 获取执行环境
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);
        StreamTableEnvironment tableEnv = StreamTableEnvironment.create(env);
        //2. 读取端口中的数据并且转化为javaBean
        SingleOutputStreamOperator<WaterSensor> waterSensorDs = env.socketTextStream("192.168.1.180", 9999)
                .map(line -> {
                    String[] split = line.split(",");
                    return new WaterSensor(split[0], Long.parseLong(split[1]), Integer.parseInt(split[2]));
                });

        // 3. 讲流 转化为动态表
        Table table = tableEnv.fromDataStream(waterSensorDs);
        // 5. 先注册在使用
        tableEnv.createTemporarySystemFunction("MyAvg", MyAvg.class);
        //5.1 使用table api 实现的方式
//        table.groupBy($("id")).select($("id"),call("MyAvg",$("vc")))
//                .execute()
//                .print();
        // 5.2 采用sql 的写法
        tableEnv.sqlQuery("select id , MyAvg(vc) from "+table+" group by id")
                .execute()
                .print();

        //5. 执行任务
        env.execute();


    }

    // 自定义函数类Udtf 求平均数
    public static class MyAvg extends AggregateFunction<Double, SumCount> {

        public void accumulate(SumCount acc, Integer vc) {
            acc.setVcSum(acc.getVcSum() + vc);
            acc.setCount(acc.getCount() + 1);
        }


        @Override
        public Double getValue(SumCount sumCount) {
            return sumCount.getVcSum() * 1D / sumCount.getCount();
        }

        @Override
        public SumCount createAccumulator() {
            return new SumCount();
        }
    }

}

一个应用类

package com.wudl.flink.sql;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
 * @ClassName : SumCount
 * @Description : 自定义UDAF函数的 的bean
 * @Author :wudl
 * @Date: 2021-08-17 23:33
 */

@Data
@AllArgsConstructor
@NoArgsConstructor
public class SumCount {

    private int vcSum;
    private int count;
}

打印结果


Flink-sql-udtf.png

相关文章

  • Flink-sql自定义UDAF函数

    1. 用自定义的函数在Flink Sql 中使用 1.1 官网也说的很详细 https://ci.apache.o...

  • Hive函数

    自定义函数 自定义函数包括三种:UDF、UDAF、UDTF。 UDF(User-Defined-Function)...

  • Hive-UDAF

    UDAF 前两节分别介绍了基础UDF和UDTF,这一节我们将介绍最复杂的用户自定义聚合函数(UDAF)。用户自定义...

  • Hive 用户自定义函数 UDF,UDAF

    Hive有UDF:(普通)UDF,用户自定义聚合函数(UDAF)以及用户自定义生表函数(UDTF)。它们所接受的输...

  • Hive开发自定义函数UDF

    Hive 内置函数 Hive自定义函数 UDF(User-Defined-Function) 一进一出 UDAF(...

  • 添加 Hive 自定义函数

    使用 Java 编写好 UDF 或 UDAF 函数后,Hive 要如何使用这些自定义函数呢? 1 在 HDFS 上...

  • Spark SQL实战之UDF与UDAF的使用

    1.概念:UDF就是用户自定义的函数UDAF就是用户自定义的聚合函数 2.代码:(1)pom.xml (2)Spa...

  • 【Hive】注册UDF的过程

    实现步骤 对比SparkSQL注册UDF的过程SparkSQL用户自定义函数UDF和UDAF、UDTF[https...

  • spark的UDAF使用

    什么是UDAF? UDAF(User Defined Aggregate Function),即用户定义的聚合函数...

  • 案例解析丨 Spark Hive 自定义函数应用

    摘要:Spark目前支持UDF,UDTF,UDAF三种类型的自定义函数。 1. 简介 Spark目前支持UDF,U...

网友评论

      本文标题:Flink-sql自定义UDAF函数

      本文链接:https://www.haomeiwen.com/subject/xcpwbltx.html