美文网首页
HIVE 自定义UDF、UDTF函数

HIVE 自定义UDF、UDTF函数

作者: 无来无去_A | 来源:发表于2020-07-27 21:03 被阅读0次

HIVE自定义函数类型

1)Hive 自带了一些函数,比如:max/min等,但是数量有限,自己可以通过自定义UDF来方便的扩展。

2)当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(UDF:user-defined function)。

3)根据用户自定义函数类别分为以下三种:

(1)UDF(User-Defined-Function)

一进一出

(2)UDAF(User-Defined Aggregation Function)

聚集函数,多进一出

类似于:count/max/min

(3)UDTF(User-Defined Table-Generating Functions)

一进多出

如lateral view explore()

4)官方文档地址

https://cwiki.apache.org/confluence/display/Hive/HivePlugins

5)编程步骤:

(1)继承org.apache.hadoop.hive.ql.exec.UDF

(2)需要实现evaluate函数;evaluate函数支持重载;

(3)在hive的命令行窗口创建函数

添加jar

add jar linux_jar_path

创建function

create [temporary] function [dbname.]function_name AS class_name;

(4)在hive的命令行窗口删除函数

Drop [temporary] function [if exists] [dbname.]function_name;

6)注意事项:UDF必须要有返回类型,可以返回null,但是返回类型不能为void;

1)在项目中是否自定义过UDF、UDTF函数,以及用他们处理了什么问题,及自定义步骤?
(1)用UDF函数解析公共字段;用UDTF函数解析事件字段。
(2)自定义UDF:继承UDF,重写evaluate方法
(3)自定义UDTF:继承自GenericUDTF,重写3个方法:initialize(自定义输出的列名和类型),process(将结果返回forward(result)),close
2)为什么要自定义UDF/UDTF?
因为自定义函数,可以自己埋点Log打印日志,出错或者数据异常,方便调试。

示例:

1.Maven 依赖

<dependencies>
        <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
        <dependency>
            <groupId>org.apache.hive</groupId>
            <artifactId>hive-exec</artifactId>
            <version>3.1.2</version>
        </dependency>
</dependencies>

UDF 一进一出

进行数据结构的转换
把JSON数组字符串转换为 hive 中结构体格式

 jsonstr  ==>  array<struct<action_id:string,item:string,item_type:string,ts:bigint>>

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;
import org.json.JSONObject;

import java.util.ArrayList;
import java.util.List;

@Description(name = "json_array_to_struct_array", value = "-- convert json_array to struct_array")
public class JsonArrayToStructArray extends GenericUDF {
    /*
        对输入检测
        返回输出的值的对象检测器
     */
    @Override
    public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
        // 1. 对输入检测
        if (arguments.length < 3) {
            throw new UDFArgumentException("参数个数必须执行3个");
        }
        for (ObjectInspector argument : arguments) {
            if (!"string".equals(argument.getTypeName())) {
                throw new UDFArgumentException("参数必须是 string");
            }
        }
        // 2. 返回输出的值的对象检测器
        // array(struct(k:v, k:v), struct(...))
        List<String> fieldNames = new ArrayList<>();  // 结构体的每个k的名字
        List<ObjectInspector> oiList = new ArrayList<>();  // 结构体的每个k的名字

        int size = arguments.length;
        /*for (int i = 1; i < (size - 1) / 2 + 1; i++) {
            String fieldName = getConstantStringValue(arguments, i).split(":")[0];
            fieldNames.add(fieldName);
        }*/

        for (int i = (size - 1) / 2 + 1; i < size; i++) {

            String fieldName = getConstantStringValue(arguments, i).split(":")[0];
            fieldNames.add(fieldName);

            // 不同的类型, 使用不同的检测器
            String type = getConstantStringValue(arguments, i).split(":")[1];
            switch (type) {
                case "string":
                    oiList.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
                    break;
                case "int":
                    oiList.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
                    break;
                case "bigint":
                    oiList.add(PrimitiveObjectInspectorFactory.javaLongObjectInspector);
                    break;

                default:
                    throw new UDFArgumentException("未知的不支持的类型....");
            }
        }

        return ObjectInspectorFactory
                .getStandardListObjectInspector(ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, oiList));
    }

    /*
       对传入的数据做计算, 返回函数最终的值
     */
    @Override
    public Object evaluate(DeferredObject[] arguments) throws HiveException {

        if (arguments[0].get() == null) {
            return null;
        }

        // 1.获取传入的json数组
        String jsonArrayString = arguments[0].get().toString();
        JSONArray jsonArray = new JSONArray(jsonArrayString);

        // 2. 解析数组中的数据
        // 2.1 最终的数组

        List<List<Object>> result = new ArrayList<>();
        // 2.2 解析出来需要的每个结构体
        for(int i = 0; i < jsonArray.length(); i++){
            List<Object> struct = new ArrayList<>();
            result.add(struct);

            JSONObject obj = jsonArray.getJSONObject(i);

            // 表示结构体应该有多个少个字段
            for(int j = 1; j < (arguments.length - 1)/2 + 1; j++){
                // 获取字段名
                String name = arguments[j].get().toString();
                if(obj.has(name)){
                    struct.add(obj.get(name));
                }else{
                    struct.add(null);
                }
            }
            /*
            {
                    "displayType":"promotion",
                    "item":"3",
                    "item_type":"sku_id",
                    "order":1
            }

             json_array_to_struct_array(
                       get_json_object(line,'$.actions'),
                       'action_id',
                      'item',
                      'item_type',
                      'ts',
                      'action_id:string',
                      'item:string',
                      'item_type:string',
                      'ts:bigint')
        array(struct(..), struct(....))

             */

        }

        return result;
    }

    /*
    select  a(...)
    返回要展示的字符串
     */
    @Override
    public String getDisplayString(String[] children) {
        return getStandardDisplayString("json_array_to_struct_array", children);
    }
}
/*
[
        {
            "displayType":"promotion",
            "item":"3",
            "item_type":"sku_id",
            "order":1
        },
        {
            "displayType":"promotion",
            "item":"1",
            "item_type":"sku_id",
            "order":2
        },
        {
            "displayType":"query",
            "item":"7",
            "item_type":"sku_id",
            "order":3
        },
        {
            "displayType":"promotion",
            "item":"5",
            "item_type":"sku_id",
            "order":4
        }
]
 */

UDTF 一进多出

例如:把JSON数组:
[{"action_id":"favor_add","item":"6","item_type":"sku_id","ts":1592841743610},
{"action_id":"cart_add","item":"6","item_type":"sku_id","ts":1592841746886}]

转换为多行JSON对象:
{"action_id":"favor_add","item":"6","item_type":"sku_id","ts":1592841743610}
{"action_id":"cart_add","item":"6","item_type":"sku_id","ts":1592841746886}

import org.apache.hadoop.hive.ql.exec.Description;
import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
import org.apache.hadoop.hive.serde2.objectinspector.*;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.json.JSONArray;

import java.util.ArrayList;
import java.util.List;

/**
 * udtf:  一进多出
 *  1进:   [{}, {}]
 *  多出:  "{}", "{}"
 */
@Description(name = "explode_json_array", value = "explode json array ....")
public class ExplodeJsonArray extends GenericUDTF {
    /*
      作用:
      1.检测输入
      2. 返回期望的数据类型的检测器
     */
    @Override
    public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
        // 1. 检测输入
        // 1.1 获取到传入的参数
        List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();
        if(inputFields.size() != 1)
            throw new UDFArgumentException("函数 explode_json_array 需要正好1个参数");

        ObjectInspector inspector = inputFields.get(0).getFieldObjectInspector();
        if(inspector.getCategory() != ObjectInspector.Category.PRIMITIVE || !inspector.getTypeName().equals("string")){
            throw new UDFArgumentException("函数 explode_json_array 参数类型不匹配, 必须是一个字符串");
        }
        // 2. 返回期望数据类型的检测器
        List<String> names = new ArrayList<>();
        names.add("action");
        List<ObjectInspector> inspectors = new ArrayList<>();
        inspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

        return ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors);

    }
    /*
        处理数据   [{}, {}, ...]
     */
    @Override
    public void process(Object[] args) throws HiveException {
        String jsonArrayString = args[0].toString();
        JSONArray jsonArray = new JSONArray(jsonArrayString);
        for (int i = 0; i < jsonArray.length(); i++) {
            String col = jsonArray.getString(i);
            String[] cols = new String[1];
            cols[0] = col;
            forward(cols);  // 为什么要传递数组? 有可能炸裂出来多列数据, 所以才需要传递数字
        }
    }
    /**
     * 关闭资源
     * 不用实现
     */
    @Override
    public void close() throws HiveException {

    }
}

相关文章

网友评论

      本文标题:HIVE 自定义UDF、UDTF函数

      本文链接:https://www.haomeiwen.com/subject/pijtrktx.html