美文网首页
【Hive】Hive UDF

【Hive】Hive UDF

作者: w1992wishes | 来源:发表于2019-08-15 16:33 被阅读0次

    [TOC]

    一、UDF 介绍

    UDF(User-Defined Functions)即是用户自定义的hive函数。当 Hive 自带的函数并不能完全满足业务的需求,这时可以根据具体需求自定义函数。UDF 函数可以直接应用于 select 语句,对查询结构做格式化处理后,再输出内容。

    Hive 自定义函数包括三种:

    • UDF: one to one ,进来一个出去一个,row mapping, 如:upper、substr函数;
    • UDAF(A:aggregation): many to one,进来多个出去一个,row mapping,如sum/min;
    • UDTF(T:table-generating):one to mang,进来一个出去多行,如 lateral view 与 explode 。

    注解使用:

    @Describtion 注解是可选的,用于对函数进行说明,其中的 FUNC 字符串表示函数名,当使用 DESCRIBE FUNCTION 命令时,替换成函数名。@Describtion包含三个属性:

    • name:用于指定 Hive 中的函数名。
    • value:用于描述函数的参数。
    • extended:额外的说明,如,给出示例,当使用 DESCRIBE FUNCTION EXTENDED name 的时候打印。

    二、UDF

    开发自定义 UDF 函数有两种方式:

    • 如果函数读和返回都是基础数据类型,即 Hadoop 和 Hive 的基本类型,如,Text、IntWritable、LongWritable、DoubleWritable 等,那么继承 org.apache.hadoop.hive.ql.exec.UDF ;
    • 如果用来操作内嵌数据结构,如 Map,List 和 Set,则继承 org.apache.hadoop.hive.ql.udf.generic.GenericUDF;

    2.1、简单 UDF

    用简单 UDF API 来构建一个 UDF 只涉及到编写一个类继承实现一个方法(evaluate),下面的例子来自 《Hive 编程指南》,将表中的生日字段转换为星座。

    @UDFType
    @Description(
            name = "zodiac",
            value = "_FUNC_ (date) - " +
                    " from the input date string " +
                    " or separate month and day arguments, \n" +
                    " returns the sign of the Zodiac.",
            extended = "Example :\n" +
                    "> SELECT _FUNC_ (date_string) FROM src;\n" +
                    "> SELECT _FUNC_ (month, day) FROM src;")
    public class UDFZodiacSign extends UDF {
    
        private static final String ERROR_DATE_OF_MONTH = "invalid date of specify month";
    
        private static final String ERROR_MONTH_ARGS = "invalid argument of month";
    
        private static final String ERROR_DATE_STRING = "invalid date format";
    
        public String evaluate(Date bday) {
            return this.evaluate(bday.getMonth() + 1, bday.getDate());
        }
    
        public String evaluate(String dateString) {
            DateTime dateTime;
            try {
                dateTime = new DateTime(dateString);
            } catch (Exception e) {
                return ERROR_DATE_STRING;
            }
            return this.evaluate(dateTime.getMonthOfYear(), dateTime.getDayOfMonth());
        }
    
        public String evaluate(Integer month, Integer day) {
    
            switch (month) {
                //判断是几月
                case 1:
                    //判断是当前月的哪一段时间;然后就可以得到星座了;下面代码都一样的
                    if (day > 0 && day < 20) {
                        return "魔蝎座";
                    } else if (day < 32) {
                        return "水瓶座";
                    } else {
                        return ERROR_DATE_OF_MONTH;
                    }
                case 2:
                    if (day > 0 && day < 19) {
                        return "水瓶座";
                    } else if (day < 29) {
                        return "双鱼座";
                    } else {
                        return ERROR_DATE_OF_MONTH;
                    }
                case 3:
                    if (day > 0 && day < 21) {
                        return "双鱼座";
                    } else if (day < 32) {
                        return "白羊座";
                    } else {
                        return ERROR_DATE_OF_MONTH;
                    }
                case 4:
                    if (day > 0 && day < 20) {
                        return "白羊座";
                    } else if (day < 31) {
                        return "金牛座";
                    } else {
                        return ERROR_DATE_OF_MONTH;
                    }
                case 5:
                    if (day > 0 && day < 21) {
                        return "金牛座";
                    } else if (day < 32) {
                        return "双子座";
                    } else {
                        return ERROR_DATE_OF_MONTH;
                    }
                case 6:
                    if (day > 0 && day < 22) {
                        return "双子座";
                    } else if (day < 31) {
                        return "巨蟹座";
                    } else {
                        return ERROR_DATE_OF_MONTH;
                    }
                case 7:
                    if (day > 0 && day < 23) {
                        return "巨蟹座";
                    } else if (day < 32) {
                        return "狮子座";
                    } else {
                        return ERROR_DATE_OF_MONTH;
                    }
                case 8:
                    if (day > 0 && day < 23) {
                        return "狮子座";
                    } else if (day < 32) {
                        return "处女座";
                    } else {
                        return ERROR_DATE_OF_MONTH;
                    }
                case 9:
                    if (day > 0 && day < 23) {
                        return "处女座";
                    } else if (day < 31) {
                        return "天平座";
                    } else {
                        return ERROR_DATE_OF_MONTH;
                    }
                case 10:
                    if (day > 0 && day < 24) {
                        return "天平座";
                    } else if (day < 32) {
                        return "天蝎座";
                    } else {
                        return ERROR_DATE_OF_MONTH;
                    }
                case 11:
                    if (day > 0 && day < 23) {
                        return "天蝎座";
                    } else if (day < 31) {
                        return "射手座";
                    } else {
                        return ERROR_DATE_OF_MONTH;
                    }
                case 12:
                    if (day > 0 && day < 22) {
                        return "射手座";
                    } else if (day < 32) {
                        return "摩羯座";
                    } else {
                        return ERROR_DATE_OF_MONTH;
                    }
                default:
                    return ERROR_MONTH_ARGS;
            }
    
        }
    
    }
    

    测试一下:

    public class UDFZodiacSignTest {
    
        @Test
        public void testUDFZodiacSign() {
            UDFZodiacSign example = new UDFZodiacSign();
            Assert.assertEquals("魔蝎座", example.evaluate(1, 1));
            Assert.assertEquals("魔蝎座", example.evaluate("2019-01-01"));
        }
    
    }
    

    2.2、复杂 GenericUDF

    GenericUDF API 提供了一种方法去处理那些不是可写类型的对象,例如:struct,map 和 array 类型。

    这个 API 需要用户亲自为函数的参数管理对象存储格式,验证接收的参数的数量与类型。

    这个 API 要求实现以下方法:

    // 这个类似于简单 API 的 evaluate 方法,它可以读取输入数据和返回结果
    abstract Object evaluate(GenericUDF.DeferredObject[] arguments);  
      
    // 该方法应当是描述该 UDF 的字符串,显示函数的提示信息
    abstract String getDisplayString(String[] children);  
      
    // 只调用一次,在任何 evaluate() 调用之前,可以接收到一个可以表示函数输入参数类型的 object inspectors 数组
    // 是用来验证该函数是否接收正确的参数类型和参数个数的地方
    abstract ObjectInspector initialize(ObjectInspector[] arguments);  
    

    例子同样来自 《Hive 编程指南》,编写一个用户自定义函数,称之为nvl(),这个函数传入的值如果是 null,那么就返回一个默认值。

    函数 nvl() 要求有 2 个参数。如果第 1 个参数是非null值,那么就返回这个值;如果第 1 个参数是 null,那么就返回第 2 个参数的值。

    import org.apache.hadoop.hive.ql.exec.Description;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDF;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDFUtils;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    
    @Description(name = "nvl",
            value = "_FUNC_(value, default_value) - Returns default value if value is null else returns value",
            extended = "Example:\n"
                    + " > SELECT _FUNC_(null, 'bla') FROM src limit 1; \n")
    public class GenericUDFNvl extends GenericUDF {
    
        private GenericUDFUtils.ReturnObjectInspectorResolver returnOIResolver;
        private ObjectInspector[] argumentOIs;
    
        @Override
        public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
            argumentOIs = arguments;
            // 1.检验参数个数
            if (arguments.length != 2) {
                throw new UDFArgumentException("The operator 'NVL' accepts 2 arguments.");
            }
    
            // 2.检验参数类型
            returnOIResolver = new GenericUDFUtils.ReturnObjectInspectorResolver(true);
            if (!(returnOIResolver.update(arguments[0]) && returnOIResolver.update(arguments[1]))) {
                throw new UDFArgumentTypeException(2, "The 1st and 2nd args of function NLV should have the same type, "
                        + "but they are different: \"" + arguments[0].getTypeName() + "\" and \"" + arguments[1].getTypeName() + "\"");
            }
    
            // 3.返回类型,和传入的参数类型一致
            return returnOIResolver.get();
        }
    
        @Override
        public Object evaluate(DeferredObject[] arguments) throws HiveException {
            Object retVal = returnOIResolver.convertIfNecessary(arguments[0].get(), argumentOIs[0]);
            if (retVal == null) {
                retVal = returnOIResolver.convertIfNecessary(arguments[1].get(), argumentOIs[1]);
            }
            return retVal;
        }
    
        @Override
        public String getDisplayString(String[] children) {
            StringBuilder sb = new StringBuilder();
            sb.append("if ");
            sb.append(children[0]);
            sb.append(" is null ");
            sb.append("returns ");
            sb.append(children[1]);
            return sb.toString();
        }
    
    }
    

    测试一下:

    public class GenericUDFNvlTest {
    
        @Test
        public void testGenericUDFNvl() throws HiveException {
            // 建立需要的模型
            GenericUDFNvl example = new GenericUDFNvl();
            ObjectInspector stringOI1 = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
            ObjectInspector stringOI2 = PrimitiveObjectInspectorFactory.javaStringObjectInspector;
            StringObjectInspector resultInspector = (StringObjectInspector) example.initialize(new ObjectInspector[]{stringOI1, stringOI2});
    
            // 测试结果
            Object result1 = example.evaluate(new GenericUDF.DeferredObject[]{new GenericUDF.DeferredJavaObject(null), new GenericUDF.DeferredJavaObject("a")});
            Assert.assertEquals("a", resultInspector.getPrimitiveJavaObject(result1));
    
            // 测试结果
            Object result2 = example.evaluate(new GenericUDF.DeferredObject[]{new GenericUDF.DeferredJavaObject("dd"), new GenericUDF.DeferredJavaObject("a")});
            Assert.assertNotEquals("a", resultInspector.getPrimitiveJavaObject(result2));
        }
    
    }
    

    三、UDAF

    PS:该段部分来自 Hive UDAF开发详解

    UDAF 开发主要涉及到以下两个抽象类:

    org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
    org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator
    

    大致上,UDAF 函数读取数据(mapper),聚集一堆 mapper 输出到部分聚集结果(combiner),并且最终创建一个最终的聚集结果(reducer)。因为需要对多个combiner 进行聚集,所以需要保存部分聚集结果。

    3.1、AbstractGenericUDAFResolver

    Resolver 要覆盖实现 getEvaluator 方法,该方法会根据 sql 传人的参数数据格式指定调用哪个 Evaluator 进行处理。

    public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) 
      throws SemanticException {
      throw new SemanticException(
            "This UDAF does not support the deprecated getEvaluator() method.");
    }
    

    3.2、GenericUDAFEvaluator

    UDAF 逻辑处理主要发生在 Evaluator 中,要实现该抽象类的几个方法。理解Evaluator 之前,先介绍 ObjectInspector 接口与 GenericUDAFEvaluator 中的内部类 Model。

    • ObjectInspector:主要是解耦数据使用与数据格式,使数据流在输入输出端可以切换不同的输入输出格式,不同的 Operator上使用不同的格式。

    • Model:Model 代表了 UDAF 在 mapreduce 的各个阶段。

      public static enum Mode {
          /**
           * PARTIAL1: 这个是mapreduce的map阶段:从原始数据到部分数据聚合
           * 将会调用iterate()和terminatePartial()
           */
          PARTIAL1,
              /**
           * PARTIAL2: 这个是mapreduce的map端的Combiner阶段,负责在map端合并map的数据:从部分数据聚合到部分数据聚合
           * 将会调用merge() 和 terminatePartial() 
           */
          PARTIAL2,
              /**
           * FINAL: mapreduce的reduce阶段:从部分数据的聚合到完全聚合 
           * 将会调用merge()和terminate()
           */
          FINAL,
              /**
           * COMPLETE: 如果出现了这个阶段,表示mapreduce只有map,没有reduce,所以map端就直接出结果了:从原始数据直接到完全聚合
            * 将会调用 iterate()和terminate()
           */
          COMPLETE
        };
      

    一般情况下,完整的 UDAF 逻辑是一个 mapreduce 过程,如果有mapper 和reducer,就会经历 PARTIAL1(mapper),FINAL(reducer),如果还有 combiner,那就会经历 PARTIAL1(mapper),PARTIAL2(combiner),FINAL(reducer)。

    而有一些情况下的 mapreduce,只有mapper,而没有 reducer,所以就会只有COMPLETE 阶段,这个阶段直接输入原始数据,出结果。

    3.3、GenericUDAFEvaluator 的方法

    // 确定各个阶段输入输出参数的数据格式 ObjectInspectors,一般负责初始化内部字段,通常初始化用来存放最终结果的变量
    public  ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException;
     
    // 保存数据聚集结果的类
    abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;
     
    // 重置聚集结果
    public void reset(AggregationBuffer agg) throws HiveException;
     
    // map阶段,迭代处理输入sql传过来的列数据
    public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
     
    // map与combiner结束返回结果,得到部分数据聚集结果
    public Object terminatePartial(AggregationBuffer agg) throws HiveException;
     
    // combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。
    public void merge(AggregationBuffer agg, Object partial) throws HiveException;
     
    // reducer阶段,输出最终结果
    public Object terminate(AggregationBuffer agg) throws HiveException;
    

    3.4、图解Model与Evaluator关系

    Model 各阶段对应 Evaluator 方法调用

    image

    Evaluator 各个阶段下处理 mapreduce 流程

    image

    3.5、编码实例

    下面的函数代码是计算指定列中字符的总数(包括空格):

    /**
     * @author Administrator
     */
    @Description(
            name = "letters",
            value = "_FUNC_(expr) - 返回该列中所有字符串的字符总数")
    public class GenericUDAFTotalNumOfLetters extends AbstractGenericUDAFResolver {
    
        @Override
        public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters)
                throws SemanticException {
            if (parameters.length != 1) {
                throw new UDFArgumentTypeException(parameters.length - 1,
                        "Exactly one argument is expected.");
            }
    
            ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);
    
            if (oi.getCategory() != ObjectInspector.Category.PRIMITIVE) {
                throw new UDFArgumentTypeException(0,
                        "Argument must be PRIMITIVE, but "
                                + oi.getCategory().name()
                                + " was passed.");
            }
    
            PrimitiveObjectInspector inputOI = (PrimitiveObjectInspector) oi;
    
            if (inputOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
                throw new UDFArgumentTypeException(0,
                        "Argument must be String, but "
                                + inputOI.getPrimitiveCategory().name()
                                + " was passed.");
            }
    
            return new TotalNumOfLettersEvaluator();
        }
    
        public static class TotalNumOfLettersEvaluator extends GenericUDAFEvaluator {
    
            PrimitiveObjectInspector inputOI;
            PrimitiveObjectInspector integerOI;
    
            private IntWritable result;
    
            @Override
            public ObjectInspector init(Mode m, ObjectInspector[] parameters)
                    throws HiveException {
    
                super.init(m, parameters);
                result = new IntWritable(0);
                inputOI = (PrimitiveObjectInspector) parameters[0];
                integerOI = PrimitiveObjectInspectorFactory.writableIntObjectInspector;
                // 指定各个阶段输出数据格式都为Integer类型
                return PrimitiveObjectInspectorFactory.writableIntObjectInspector;
    
            }
    
            /**
             * 存储当前字符总数的类
             */
            static class LetterSumAgg implements AggregationBuffer {
                int sum = 0;
    
                void add(int num) {
                    sum += num;
                }
            }
    
            /**
             * 创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的相加总和。
             */
            @Override
            public AggregationBuffer getNewAggregationBuffer() throws HiveException {
                LetterSumAgg sum = new LetterSumAgg();
                reset(sum);
                return sum;
            }
    
            /**
             * mapreduce支持mapper和reducer的重用,所以为了兼容,也需要做内存的重用。
             */
            @Override
            public void reset(AggregationBuffer agg) throws HiveException {
                LetterSumAgg myagg = (LetterSumAgg) agg;
                myagg.sum = 0;
            }
    
            /**
             * map阶段调用,把保存当前和的对象agg,再加上输入的参数传入。
             */
            @Override
            public void iterate(AggregationBuffer agg, Object[] parameters)
                    throws HiveException {
                if (parameters[0] != null) {
                    LetterSumAgg myagg = (LetterSumAgg) agg;
                    Object p1 = inputOI.getPrimitiveJavaObject(parameters[0]);
                    myagg.add(String.valueOf(p1).length());
                }
            }
    
            /**
             * mapper 结束要返回的结果,还有 combiner 结束返回的结果
             */
            @Override
            public Object terminatePartial(AggregationBuffer agg) throws HiveException {
                return terminate(agg);
            }
    
            /**
             * combiner合并map返回的结果,还有reducer合并mapper或combiner返回的结果。
             */
            @Override
            public void merge(AggregationBuffer agg, Object partial)
                    throws HiveException {
                if (partial != null) {
    
                    LetterSumAgg myagg = (LetterSumAgg) agg;
    
                    myagg.sum += PrimitiveObjectInspectorUtils.getInt(partial, integerOI);
                }
            }
    
            /**
             * reducer返回结果,或者是只有mapper,没有reducer时,在mapper端返回结果。
             */
            @Override
            public Object terminate(AggregationBuffer agg) throws HiveException {
                LetterSumAgg myagg = (LetterSumAgg) agg;
                result.set(myagg.sum);
                return result;
            }
    
        }
    }
    

    测试:

    public class GenericUDAFTotalNumOfLettersTest {
    
        private GenericUDAFTotalNumOfLetters example;
        private GenericUDAFEvaluator evaluator;
        private ObjectInspector[] output;
        private PrimitiveObjectInspector[] poi;
    
        GenericUDAFTotalNumOfLetters.TotalNumOfLettersEvaluator.LetterSumAgg agg;
    
        Object[] param1 = {"tom"};
        Object[] param2 = {"tomT"};
        Object[] param3 = {"wu kong"};
        Object[] param4 = {"wu le"};
    
        @Before
        public void setUp() throws Exception {
    
            example = new GenericUDAFTotalNumOfLetters();
    
            //All the data are String
            String[] typeStrs = {"string"/*, "string", "string"*/};
            TypeInfo[] types = makePrimitiveTypeInfoArray(typeStrs);
    
            evaluator = example.getEvaluator(types);
    
            poi = new PrimitiveObjectInspector[1];
            poi[0] = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
                    PrimitiveObjectInspector.PrimitiveCategory.STRING);
    /*        poi[1] =  PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
                    PrimitiveObjectInspector.PrimitiveCategory.STRING);
            poi[2] =  PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
                    PrimitiveObjectInspector.PrimitiveCategory.STRING);*/
    
            //The output inspector
            output = new ObjectInspector[1];
            output[0] = PrimitiveObjectInspectorFactory.getPrimitiveJavaObjectInspector(
                    PrimitiveObjectInspector.PrimitiveCategory.INT);
            /*output[0] = ObjectInspectorFactory.getStandardListObjectInspector(poi[0]);*/
    
            agg = (GenericUDAFTotalNumOfLetters.TotalNumOfLettersEvaluator.LetterSumAgg) evaluator.getNewAggregationBuffer();
        }
    
        @After
        public void tearDown() throws Exception {
    
        }
    
        @Test(expected = UDFArgumentTypeException.class)
        public void testGetEvaluateorWithComplexTypes() throws Exception {
            TypeInfo[] types = new TypeInfo[1];
            types[0] = TypeInfoFactory.getListTypeInfo(TypeInfoFactory.getPrimitiveTypeInfo("string"));
            example.getEvaluator(types);
        }
    
        @Test(expected = UDFArgumentTypeException.class)
        public void testGetEvaluateorWithNotSupportedTypes() throws Exception {
            TypeInfo[] types = new TypeInfo[1];
            types[0] = TypeInfoFactory.getPrimitiveTypeInfo("boolean");
            example.getEvaluator(types);
        }
    
        @Test(expected = UDFArgumentTypeException.class)
        public void testGetEvaluateorWithMultiParams() throws Exception {
            String[] typeStrs3 = {"double", "int", "string"};
            TypeInfo[] types3 = makePrimitiveTypeInfoArray(typeStrs3);
            example.getEvaluator(types3);
        }
    
        @Test
        public void testIterate() throws HiveException {
            evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, poi);
            evaluator.reset(agg);
    
            evaluator.iterate(agg, param1);
            Assert.assertEquals(3, agg.sum);
    
            evaluator.iterate(agg, param2);
            Assert.assertEquals(7, agg.sum);
    
            evaluator.iterate(agg, param3);
            Assert.assertEquals(14, agg.sum);
        }
    
        @Test
        public void testTerminatePartial() throws Exception {
    
            testIterate();
    
            Object partial = evaluator.terminatePartial(agg);
    
            Assert.assertTrue(partial instanceof IntWritable);
            Assert.assertEquals(new IntWritable(14), partial);
        }
    
        @Test
        public void testMerge() throws Exception {
            evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, poi);
            evaluator.reset(agg);
            evaluator.iterate(agg, param1);
            evaluator.iterate(agg, param2);
            Object partial1 = evaluator.terminatePartial(agg);
    
            evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, poi);
            evaluator.reset(agg);
            evaluator.iterate(agg, param3);
            Object partial2 = evaluator.terminatePartial(agg);
    
            evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL1, poi);
            evaluator.reset(agg);
            evaluator.iterate(agg, param4);
            Object partial3 = evaluator.terminatePartial(agg);
    
            evaluator.init(GenericUDAFEvaluator.Mode.PARTIAL2, output);
            evaluator.reset(agg);
            evaluator.merge(agg, partial1);
            Assert.assertEquals(7, agg.sum);
    
            evaluator.merge(agg, partial2);
            Assert.assertEquals(14, agg.sum);
    
            evaluator.merge(agg, partial3);
            Assert.assertEquals(19, agg.sum);
        }
    
        @Test
        public void testTerminate() throws Exception {
            evaluator.init(GenericUDAFEvaluator.Mode.COMPLETE, poi);
            evaluator.reset(agg);
    
            evaluator.iterate(agg, param1);
            evaluator.iterate(agg, param2);
            evaluator.iterate(agg, param3);
            evaluator.iterate(agg, param4);
            Object term = evaluator.terminate(agg);
    
            Assert.assertTrue(term instanceof IntWritable);
            Assert.assertEquals(term, new IntWritable(19));
        }
    
        /**
         * Generate some TypeInfo from the typeStrs
         */
        private TypeInfo[] makePrimitiveTypeInfoArray(String[] typeStrs) {
            int len = typeStrs.length;
    
            TypeInfo[] types = new TypeInfo[len];
    
            for (int i = 0; i < len; i++) {
                types[i] = TypeInfoFactory.getPrimitiveTypeInfo(typeStrs[i]);
            }
    
            return types;
        }
    }
    

    四、UDTF

    Hive 中 UDTF 可以将一行转成一行多列,也可以将一行转成多行多列,使用频率较高。

    一个 UDTF 必须继承 GenericUDTF 抽象类然后实现抽象类中的 initialize,process,和 close方法。

    • initialize:确定传入参数的类型并确定 UDTF 生成表的每个字段的数据类型(即输入类型和输出类型),主要是判断输入类型并确定返回的字段类型。
    • process:调用了 initialize() 后,Hive 将把 UDTF 参数传给 process() 方法,处理一条输入记录,输出若干条结果记录,该方法中,每一次调用 forward() 产生一行;如果产生多列可以将多个列的值放在一个数组中,然后将该数组传入到 forward() 函数。
    • close:在 process 调用结束后调用,用于进行其它一些额外操作,只执行一次。
    public class GenericUDTFNameParserGeneric extends GenericUDTF {
    
        private PrimitiveObjectInspector stringOI = null;
    
        @Override
        public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
    
            if (args.length != 1) {
                throw new UDFArgumentException("GenericUDTFNameParserGeneric() takes exactly one argument");
            }
    
            if (args[0].getCategory() != ObjectInspector.Category.PRIMITIVE
                    && ((PrimitiveObjectInspector) args[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING) {
                throw new UDFArgumentException("GenericUDTFNameParserGeneric() takes a string as a parameter");
            }
    
            // 输入格式(inspectors)
            stringOI = (PrimitiveObjectInspector) args[0];
    
            // 输出格式(inspectors) -- 有两个属性的对象
            List<String> fieldNames = new ArrayList<>(2);
            List<ObjectInspector> fieldOIs = new ArrayList<>(2);
            fieldNames.add("name");
            fieldNames.add("surname");
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
        }
    
        private ArrayList<Object[]> processInputRecord(String name) {
            ArrayList<Object[]> result = new ArrayList<>();
    
            // 忽略null值与空值
            if (name == null || name.isEmpty()) {
                return result;
            }
    
            String[] tokens = name.split("\\s+");
    
            if (tokens.length == 2) {
                result.add(new Object[]{tokens[0], tokens[1]});
            } else if (tokens.length == 4 && tokens[1].equals("and")) {
                result.add(new Object[]{tokens[0], tokens[3]});
                result.add(new Object[]{tokens[2], tokens[3]});
            }
    
            return result;
        }
    
        @Override
        public void process(Object[] record) throws HiveException {
    
            final String name = stringOI.getPrimitiveJavaObject(record[0]).toString();
    
            ArrayList<Object[]> results = processInputRecord(name);
    
            for (Object[] r : results) {
                forward(r);
            }
        }
    
        @Override
        public void close() throws HiveException {
            // do nothing
        }
    }
    

    五、UDF 使用

    5.1、准备步骤

    数据准备:

    cat ./people.txt
    
    John Smith
    John and Ann White
    Ted Green
    Dorothy
    

    把该文件上载到 hdfs 目录 /user/wqf 中:

    hadoop fs -mkdir /user/wqf/people
    hadoop fs -put ./people.txt /user/wqf/people
    

    然后创建 hive 外部表,在 hive shell 中执行:

    CREATE EXTERNAL TABLE people (name string)
    ROW FORMAT DELIMITED 
    FIELDS TERMINATED BY '\t' 
    ESCAPED BY '' 
    LINES TERMINATED BY '\n'
    STORED AS TEXTFILE 
    LOCATION '/user/wqf/people';
    

    maven pom 中添加如下配置,然后运行 mvn assembly:assembly:

    <build>
        <plugins>
            <plugin>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
            </plugin>
        </plugins>
    </build>
    

    将 jar 包上传到 hive 服务器。

    5.2、临时添加 UDF

    进入 hive 中:

    hive> add jar /home/hadoop/testdir/hive/hive-udf-1.0-SNAPSHOT.jar
    Added [/home/hadoop/testdir/hive/hive-udf-1.0-SNAPSHOT.jar] to class path
    Added resources: [/home/hadoop/testdir/hive/hive-udf-1.0-SNAPSHOT.jar]
    
    hive > CREATE TEMPORARY FUNCTION myNvl as 'me.w1992wishes.hive.udf.GenericUDFNvl';
    hive> select myNvl(name, 'a') from people limit 1;
    OK
    John Smith
    
    hive> CREATE TEMPORARY FUNCTION myCount as 'me.w1992wishes.hive.udf.GenericUDAFTotalNumOfLetters';
    hive> select myCount(name) from people;
    OK
    44
    
    hive> CREATE TEMPORARY FUNCTION myParser as 'me.w1992wishes.hive.udf.GenericUDTFNameParser';
    hive> select myParser(name) from people;
    OK
    John    Smith
    John    White
    Ann White
    Ted Green
    Time taken: 0.18 seconds, Fetched: 4 row(s)
    

    这种方式在会话结束后,函数自动销毁,因此每次打开新的会话,都需要重新 add jar 并且 CREATE TEMPORARY FUNCTION

    5.3、永久添加 UDF

    不能是本地 jar 包,需要上传 jar 包到 hdfs 目录中:

    hadoop fs -put hive-udf-1.0-SNAPSHOT.jar /user/hive/jars
    

    然后进入 hive 中,创建函数:

    hive> create function myCount as 'me.w1992wishes.hive.udf.GenericUDAFTotalNumOfLetters' using jar 'hdfs:/user/hive/jars/hive-udf-1.0-SNAPSHOT.jar';
    OK
    44
    

    六、参考资料

    1.《Hive 编程指南》
    2.Hive UDAF开发详解

    相关文章

      网友评论

          本文标题:【Hive】Hive UDF

          本文链接:https://www.haomeiwen.com/subject/honzjctx.html