美文网首页
HIVE 自定义UDF、UDTF函数

HIVE 自定义UDF、UDTF函数

作者: 无来无去_A | 来源:发表于2020-07-27 21:03 被阅读0次

    HIVE自定义函数类型

    1)Hive 自带了一些函数,比如:max/min等,但是数量有限,自己可以通过自定义UDF来方便的扩展。

    2)当Hive提供的内置函数无法满足你的业务处理需要时,此时就可以考虑使用用户自定义函数(UDF:user-defined function)。

    3)根据用户自定义函数类别分为以下三种:

    (1)UDF(User-Defined-Function)

    一进一出

    (2)UDAF(User-Defined Aggregation Function)

    聚集函数,多进一出

    类似于:count/max/min

    (3)UDTF(User-Defined Table-Generating Functions)

    一进多出

    如lateral view explore()

    4)官方文档地址

    https://cwiki.apache.org/confluence/display/Hive/HivePlugins

    5)编程步骤:

    (1)继承org.apache.hadoop.hive.ql.exec.UDF

    (2)需要实现evaluate函数;evaluate函数支持重载;

    (3)在hive的命令行窗口创建函数

    添加jar

    add jar linux_jar_path

    创建function

    create [temporary] function [dbname.]function_name AS class_name;

    (4)在hive的命令行窗口删除函数

    Drop [temporary] function [if exists] [dbname.]function_name;

    6)注意事项:UDF必须要有返回类型,可以返回null,但是返回类型不能为void;

    1)在项目中是否自定义过UDF、UDTF函数,以及用他们处理了什么问题,及自定义步骤?
    (1)用UDF函数解析公共字段;用UDTF函数解析事件字段。
    (2)自定义UDF:继承UDF,重写evaluate方法
    (3)自定义UDTF:继承自GenericUDTF,重写3个方法:initialize(自定义输出的列名和类型),process(将结果返回forward(result)),close
    2)为什么要自定义UDF/UDTF?
    因为自定义函数,可以自己埋点Log打印日志,出错或者数据异常,方便调试。

    示例:

    1.Maven 依赖

    <dependencies>
            <!-- https://mvnrepository.com/artifact/org.apache.hive/hive-exec -->
            <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-exec</artifactId>
                <version>3.1.2</version>
            </dependency>
    </dependencies>
    
    

    UDF 一进一出

    进行数据结构的转换
    把JSON数组字符串转换为 hive 中结构体格式
    
     jsonstr  ==>  array<struct<action_id:string,item:string,item_type:string,ts:bigint>>
    
    
    import org.apache.hadoop.hive.ql.exec.Description;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    import org.json.JSONArray;
    import org.json.JSONObject;
    
    import java.util.ArrayList;
    import java.util.List;
    
    @Description(name = "json_array_to_struct_array", value = "-- convert json_array to struct_array")
    public class JsonArrayToStructArray extends GenericUDF {
        /*
            对输入检测
            返回输出的值的对象检测器
         */
        @Override
        public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
            // 1. 对输入检测
            if (arguments.length < 3) {
                throw new UDFArgumentException("参数个数必须执行3个");
            }
            for (ObjectInspector argument : arguments) {
                if (!"string".equals(argument.getTypeName())) {
                    throw new UDFArgumentException("参数必须是 string");
                }
            }
            // 2. 返回输出的值的对象检测器
            // array(struct(k:v, k:v), struct(...))
            List<String> fieldNames = new ArrayList<>();  // 结构体的每个k的名字
            List<ObjectInspector> oiList = new ArrayList<>();  // 结构体的每个k的名字
    
            int size = arguments.length;
            /*for (int i = 1; i < (size - 1) / 2 + 1; i++) {
                String fieldName = getConstantStringValue(arguments, i).split(":")[0];
                fieldNames.add(fieldName);
            }*/
    
            for (int i = (size - 1) / 2 + 1; i < size; i++) {
    
                String fieldName = getConstantStringValue(arguments, i).split(":")[0];
                fieldNames.add(fieldName);
    
                // 不同的类型, 使用不同的检测器
                String type = getConstantStringValue(arguments, i).split(":")[1];
                switch (type) {
                    case "string":
                        oiList.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
                        break;
                    case "int":
                        oiList.add(PrimitiveObjectInspectorFactory.javaIntObjectInspector);
                        break;
                    case "bigint":
                        oiList.add(PrimitiveObjectInspectorFactory.javaLongObjectInspector);
                        break;
    
                    default:
                        throw new UDFArgumentException("未知的不支持的类型....");
                }
            }
    
            return ObjectInspectorFactory
                    .getStandardListObjectInspector(ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, oiList));
        }
    
        /*
           对传入的数据做计算, 返回函数最终的值
         */
        @Override
        public Object evaluate(DeferredObject[] arguments) throws HiveException {
    
            if (arguments[0].get() == null) {
                return null;
            }
    
            // 1.获取传入的json数组
            String jsonArrayString = arguments[0].get().toString();
            JSONArray jsonArray = new JSONArray(jsonArrayString);
    
            // 2. 解析数组中的数据
            // 2.1 最终的数组
    
            List<List<Object>> result = new ArrayList<>();
            // 2.2 解析出来需要的每个结构体
            for(int i = 0; i < jsonArray.length(); i++){
                List<Object> struct = new ArrayList<>();
                result.add(struct);
    
                JSONObject obj = jsonArray.getJSONObject(i);
    
                // 表示结构体应该有多个少个字段
                for(int j = 1; j < (arguments.length - 1)/2 + 1; j++){
                    // 获取字段名
                    String name = arguments[j].get().toString();
                    if(obj.has(name)){
                        struct.add(obj.get(name));
                    }else{
                        struct.add(null);
                    }
                }
                /*
                {
                        "displayType":"promotion",
                        "item":"3",
                        "item_type":"sku_id",
                        "order":1
                }
    
                 json_array_to_struct_array(
                           get_json_object(line,'$.actions'),
                           'action_id',
                          'item',
                          'item_type',
                          'ts',
                          'action_id:string',
                          'item:string',
                          'item_type:string',
                          'ts:bigint')
            array(struct(..), struct(....))
    
                 */
    
            }
    
            return result;
        }
    
        /*
        select  a(...)
        返回要展示的字符串
         */
        @Override
        public String getDisplayString(String[] children) {
            return getStandardDisplayString("json_array_to_struct_array", children);
        }
    }
    /*
    [
            {
                "displayType":"promotion",
                "item":"3",
                "item_type":"sku_id",
                "order":1
            },
            {
                "displayType":"promotion",
                "item":"1",
                "item_type":"sku_id",
                "order":2
            },
            {
                "displayType":"query",
                "item":"7",
                "item_type":"sku_id",
                "order":3
            },
            {
                "displayType":"promotion",
                "item":"5",
                "item_type":"sku_id",
                "order":4
            }
    ]
     */
    

    UDTF 一进多出

    例如:把JSON数组:
    [{"action_id":"favor_add","item":"6","item_type":"sku_id","ts":1592841743610},
    {"action_id":"cart_add","item":"6","item_type":"sku_id","ts":1592841746886}]
    
    转换为多行JSON对象:
    {"action_id":"favor_add","item":"6","item_type":"sku_id","ts":1592841743610}
    {"action_id":"cart_add","item":"6","item_type":"sku_id","ts":1592841746886}
    
    
    import org.apache.hadoop.hive.ql.exec.Description;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
    import org.apache.hadoop.hive.serde2.objectinspector.*;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    import org.json.JSONArray;
    
    import java.util.ArrayList;
    import java.util.List;
    
    /**
     * udtf:  一进多出
     *  1进:   [{}, {}]
     *  多出:  "{}", "{}"
     */
    @Description(name = "explode_json_array", value = "explode json array ....")
    public class ExplodeJsonArray extends GenericUDTF {
        /*
          作用:
          1.检测输入
          2. 返回期望的数据类型的检测器
         */
        @Override
        public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
            // 1. 检测输入
            // 1.1 获取到传入的参数
            List<? extends StructField> inputFields = argOIs.getAllStructFieldRefs();
            if(inputFields.size() != 1)
                throw new UDFArgumentException("函数 explode_json_array 需要正好1个参数");
    
            ObjectInspector inspector = inputFields.get(0).getFieldObjectInspector();
            if(inspector.getCategory() != ObjectInspector.Category.PRIMITIVE || !inspector.getTypeName().equals("string")){
                throw new UDFArgumentException("函数 explode_json_array 参数类型不匹配, 必须是一个字符串");
            }
            // 2. 返回期望数据类型的检测器
            List<String> names = new ArrayList<>();
            names.add("action");
            List<ObjectInspector> inspectors = new ArrayList<>();
            inspectors.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
    
            return ObjectInspectorFactory.getStandardStructObjectInspector(names, inspectors);
    
        }
        /*
            处理数据   [{}, {}, ...]
         */
        @Override
        public void process(Object[] args) throws HiveException {
            String jsonArrayString = args[0].toString();
            JSONArray jsonArray = new JSONArray(jsonArrayString);
            for (int i = 0; i < jsonArray.length(); i++) {
                String col = jsonArray.getString(i);
                String[] cols = new String[1];
                cols[0] = col;
                forward(cols);  // 为什么要传递数组? 有可能炸裂出来多列数据, 所以才需要传递数字
            }
        }
        /**
         * 关闭资源
         * 不用实现
         */
        @Override
        public void close() throws HiveException {
    
        }
    }
    

    相关文章

      网友评论

          本文标题:HIVE 自定义UDF、UDTF函数

          本文链接:https://www.haomeiwen.com/subject/pijtrktx.html