美文网首页
HIVE 之UDF

HIVE 之UDF

作者: aaron1993 | 来源:发表于2017-10-08 17:43 被阅读0次

    1. 前言

    因为最近工作中有需要自定义udf,所以本文记录下最近所了解到的udf的知识。主要讲述hive中如何自定义udf,至于udf一些原理性的东西,比如udf在mr过程中怎么起作用的,这个涉及到hive的细节,我也不清楚,所以本文不会涉及,知道多少写多少吧。

    2. UDF分类

    hive中udf主要分为三类:

    1. 标准UDF
      这种类型的udf每次接受的输入是一行数据中的一个列或者多个列(下面我把这个一行的一列叫着一个单元吧,类似表格中的一个单元格),然后输出是一个单元。比如abs, array,asin这种都是标准udf。
      自定义标准函数需要继承实现抽象类org.apache.hadoop.hive.ql.udf.generic.GenericUDF
    2. 自定义聚合函数(UDAF)
      比如max,min这种函数都是hive内置聚合函数。聚合函数和标准udf的区别是:聚合函数需要接收多行输入才能计算出结果,比如max就需要接收表中所有数据(或者group by中分组内所有数据)才能计算出最大值。
      自定义聚合函数需要实现抽象类org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver
    3. 自定义表生成函数(UDTF)
      上面1,2中的udf都只输出一个标量的数据(一个单元)。表生成函数故名思义,其输出有点像子查询,可以一次输出多行多列。
      自定义表生成函数需要实现抽象类org.apache.hadoop.hive.ql.udf.generic.GenericUDTF

    2. 自定义UDF

    引入maven依赖

    <dependency>
                <groupId>org.apache.hive</groupId>
                <artifactId>hive-exec</artifactId>
                <version>2.3.0</version>
    </dependency>
    

    2.1 自定义标准UDF

    2.1.1 实现抽象类GenericUDF

    该类的全路径为:org.apache.hadoop.hive.ql.udf.generic.GenericUDF
    1. 抽象类GenericUDF解释
    GenericUDF类如下:

    public abstract class GenericUDF implements Closeable {
         ...
         /* 实例化后initialize方法只会调用一次
            - 参数arguments即udf接收的参数列表对应的objectinspector
            - 返回的ObjectInspector对象就是udf返回值的对应的objectinspector
          initialize方法中往往做的工作是检查一下arguments是否和你udf需要的参数个数以及类型是否匹配。
         */
         
         public abstract ObjectInspector initialize(ObjectInspector[] arguments)
          throws UDFArgumentException;
         ...
          // 真正的udf逻辑在这里实现
          // - 参数arguments即udf函数输入数据,这个数组的长度和initialize的参数长度一样
          // 
          public abstract Object evaluate(DeferredObject[] arguments)
          throws HiveException;
    }
    

    GenericUDF有很多的方法,但是只有上面两个抽象方法需要自己实现。
    关于ObjectInspector,HIVE在传递数据时会包含数据本身以及对应的ObjectInspector,ObjectInspector中包含数据类型信息,通过oi去解析获得数据。

    2.1.2 实例

    假设这里要实现下面这种功能标准udf:

    cycle_range(col_name, num)
    它的接收一列,以及一了整数值为参数,然后将这列转换为一个index(index 属于[0,num))到列值的映射,像下面这样:
    > SELECT cycle_range(name, 3) FROM src_table;
       INDEX NAME
       {1, "eric"}
       {2, "aaron"}
       {0, "john"}
       {1, "marry"}
       {2, "hellen"}
       {0, "jerry"}
       {1, "ellen"}
       ...
    

    这里定义一个叫cycle_range的标准udf去实现列值的转换,实现如下:

    /**
      这里使用注解描述udf信息,当使用beeline命令'describe function cycle_range'时,会输出value中的介绍信息,其中_FUNC_会被替换成真实的udf名称。
     */
    @Description(name = "cycle_range",
        value = "_FUNC_(x, num) - return a map containes an index as key and x as value",
        extended = "Example:\n"
            + " > SELECT _FUNC_(x, 3) FROM src;\n"
            + "{i,x}, i in [0 - 3)\n"
    )
    public class GenericUDFRange extends GenericUDF {
        // 第二个参数是一个整型常量,放在这里
        private static LongWritable rangeNum = null;
        // index 递增并对rangeNum取模的结果
        private static Long index = 0L;
        // udf 返回值
        private transient Map<Object,Object> ret = new HashMap<Object,Object>();
        // udf参数整型常量可以是BYTE/SHORT/INT/LONG 这个converter将它们都转换成long处理
        private transient ObjectInspectorConverters.Converter rangeConverter;
        // 在inittialize里检查一下参数个数与类型
        public ObjectInspector initialize(ObjectInspector[] arguments) throws UDFArgumentException {
            // 只接受两个参数
            if(arguments.length != 2){
                throw new UDFArgumentException(
                        "RANGE() requires 2 arguments, got " + arguments.length
                );
            }
             
            // 第二个参数必须是PRIMITIVE这一类的,这是hive sql内置的类型,可以对应到java的primitive type,此外还必须是BYTE/SHORT/INT/LONG之一
            if(arguments[1].getCategory() != ObjectInspector.Category.PRIMITIVE){
                throw new UDFArgumentException(
                        "RANGE() only take primitive Integer type, got " + arguments[1].getTypeName()
                );
            }
            
            PrimitiveObjectInspector poi = (PrimitiveObjectInspector)arguments[1];
            // 获取到第二个参数的具体类型枚举
            PrimitiveObjectInspector.PrimitiveCategory rangeNumType = poi.getPrimitiveCategory();
            ObjectInspector outputInspector = null;
            switch (rangeNumType){
                case BYTE:
                case SHORT:
                case INT:
                case LONG:
                    // 以上4个case是合法类型,获得一个converter将这四类都转换成WritableLong处理
                    rangeConverter = ObjectInspectorConverters.getConverter(
                            arguments[1], PrimitiveObjectInspectorFactory.writableLongObjectInspector
                    );
                    // udf的输出值的oi,输出的是一个map对应的ObjectInspector, key是long,value还是原来的列的oi
                    outputInspector = ObjectInspectorFactory.getStandardMapObjectInspector(
                            PrimitiveObjectInspectorFactory.javaLongObjectInspector, arguments[0]);
                    return outputInspector;
                default:
                    throw new UDFArgumentException(
                            "RANGE only takes BYTE/SHORT/INT/LONG types as the second arguments type, got " + arguments[1].getTypeName()
                    );
            }
        }
        
        // 这里开始接收实际的一行一行的输入数据,然后返回处理后的值
        // deferredObjects应该包含两个值,第一个值是列的值,第二个值是那个整型常量range值
        public Object evaluate(DeferredObject[] deferredObjects) throws HiveException {
            // 拿到range值
            Object rangeObject = deferredObjects[1].get();
            if(rangeNum == null){
                rangeNum = new LongWritable();
                // 用coverter都转换成LongWritable,然后保存起来.
                rangeObject = rangeConverter.convert(rangeObject);
                rangeNum.set(Math.abs(((LongWritable)rangeObject).get()));
            }
            // 计算index 对range的模
            index = (index + 1) %    rangeNum.get();
            // 由于ret是这个udf实例的成员,用来保存返回的map,而evaluate又会不停的调用,所以这里put前都会clear一下,保证始终只有当前处理后的返回值。
            ret.clear();
            // 设置返回值,返回
            ret.put(index, deferredObjects[0].get());
            return ret;
        }
    
        public String getDisplayString(String[] strings) {
            return getStandardDisplayString("range", strings,",");
        }
    }
    

    编写好后:

    1. 打jar包,最好打fat jar,把依赖都打进去,假设我的jar包的路径:"/Users/eric/udf-1.0-SNAPSHOT.jar"
    2. 在beeline 终端将jar加入hive的classpath:
      add jar /Users/eric/udf-1.0-SNAPSHOT.jar
    3. 创建udf
      create temporary function cycle_range as 'me.eric.udfs.GenericUDFRange'

    成功后就可以使用了。

    2.2 自定义聚合函数UDAF

    2.2.1 实现抽象类AbstractGenericUDAFResolver

    实现自定义UDAF首先要继承并实现类AbstractGenericUDAFResolver,有下面两个方法:

    public abstract class AbstractGenericUDAFResolver
        implements GenericUDAFResolver2
    {
    
      @SuppressWarnings("deprecation")
      @Override
      public GenericUDAFEvaluator getEvaluator(GenericUDAFParameterInfo info)
        throws SemanticException {
    
        if (info.isAllColumns()) {
          throw new SemanticException(
              "The specified syntax for UDAF invocation is invalid.");
        }
    
        return getEvaluator(info.getParameters());
      }
    
      /**
        由于上面的getEvaluator也是调用的这个方法实现,所以只需要重写着这个
        getEvaluator即可。 udaf函数的主要逻辑不是getEvaluator方法里里完成的。
        而是在其返回的GenericUDAFEvaluator中实现的,那么在getEvaluator方法中往往只需要根据参数info(info中保存了传递给udaf的实际参数信息)做一下udaf的参数类型检查即可,
        然后返回用户自定义的GenericUDAFEvaluator。
       */
      @Override
      public GenericUDAFEvaluator getEvaluator(TypeInfo[] info) 
        throws SemanticException {
        throw new SemanticException(
              "This UDAF does not support the deprecated getEvaluator() method.");
      }
    

    上面介绍中说到GenericUDAFEvaluator才是真正实现udaf业务逻辑的地方,下面是GenericUDAFEvaluator抽象类的的实现:

    public abstract class GenericUDAFEvaluator implements Closeable {
    
      @Retention(RetentionPolicy.RUNTIME)
      public static @interface AggregationType {
        boolean estimable() default false;
      }
      ...
      public static enum Mode {
        /**
         * PARTIAL1: from original data to partial aggregation data: iterate() and
         * terminatePartial() will be called.
         */
        PARTIAL1,
            /**
         * PARTIAL2: from partial aggregation data to partial aggregation data:
         * merge() and terminatePartial() will be called.
         */
        PARTIAL2,
            /**
         * FINAL: from partial aggregation to full aggregation: merge() and
         * terminate() will be called.
         */
        FINAL,
            /**
         * COMPLETE: from original data directly to full aggregation: iterate() and
         * terminate() will be called.
         */
        COMPLETE
      };
    
    Mode mode;
    
    public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
        // This function should be overriden in every sub class
        // And the sub class should call super.init(m, parameters) to get mode set.
        mode = m;
        return null;
      }
    
    public abstract AggregationBuffer getNewAggregationBuffer() throws HiveException;
    
    public abstract void reset(AggregationBuffer agg) throws HiveException;
    
     public void aggregate(AggregationBuffer agg, Object[] parameters) throws HiveException {
        if (mode == Mode.PARTIAL1 || mode == Mode.COMPLETE) {
          iterate(agg, parameters);
        } else {
          assert (parameters.length == 1);
          merge(agg, parameters[0]);
        }
      }
    
      public Object evaluate(AggregationBuffer agg) throws HiveException {
        if (mode == Mode.PARTIAL1 || mode == Mode.PARTIAL2) {
          return terminatePartial(agg);
        } else {
          return terminate(agg);
        }
      }
    
    public abstract void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException;
    
    public abstract Object terminatePartial(AggregationBuffer agg) throws HiveException;
    
      public abstract void merge(AggregationBuffer agg, Object partial) throws HiveException;
    
      public abstract Object terminate(AggregationBuffer agg) throws HiveException;
    

    首先是上面枚举类型Mode的几个枚举值:PARTIAL1,PARTIAL2,FINAL,COMPLETE, 同时mode也是方法init的参数。这几个枚举值跟聚合涉及到的过程有关系, map-reduce中聚合往往涉及到shuffle的过程,这其中又可能涉及到map端的combine,然后map到reduce过程中数据的shuffle,然后在在reduce端merge。
    下面这张图大概的描述了一下各个阶段的对应关系:


    这张图中没有包含COMPLETE,从上面代码中COMPLETE的注释可以看出来,COMPLETE表示直接从原始数据聚合到最终结果,也就是说不存在中间需要先在map端完成部分聚合结果,然后再到reduce端完成最终聚合一个过程,COMPLETE出现在一个完全map only的任务中,所以没有和其他三个阶段一起出现。

    上图描述了三个阶段调用的方法,这也就是需要自己实现的方法:

    1. PARTIAL1
      • iterate(AggregationBuffer agg, Object[] parameters)
        AggregationBuffer是一个需要你实现的数据结构,用来临时保存聚合的数据,parameters是传递给udaf的实际参数,这个方法的功能可以描述成: 拿到一条条数据记录方法在parameters里,然后聚合到agg中,怎么聚合自己实现,比如agg就是一个数组,你把所有迭代的数据保存到数组中都可以。agg相当于返回结果,
      • terminatePartial(AggregationBuffer agg)
        iterate迭代了map中的数据并保存到agg中,并传递给terminatePartial,接下来terminatePartial完成计算,terminatePartial返回Object类型结果显然还是要传递给下一个阶段PARTIAL2的,但是PARTIAL2怎么知道Object到底是什么?前面提到HIVE都是通过ObjectInspector来获取数据类型信息的,但是PARTIAL2的输入数据ObjectInspector怎么来的?显然每个阶段输出数据对应的ObjectInspector只有你自己知道,上面代码中还有一个init()方法是需要你实现了(init在每一个阶段都会调用一次 ),init的参数m表明了当前阶段(当前处于PARTIAL1),你需要在init中根据当前阶段m,设置一个ObjectInspector表示当前的输出oi就行了,init返回一个ObjectInspcetor表示当前阶段的输出数据类信息(也就是下一阶段的输入数据信息)。
    2. PARTIAL2
      PARTIAL2的输入是基于PARTIAL1的输出的,PARTIAL1输出即terminatePartial的返回值。
      • merge(AggregationBuffer agg, Object partial)
        agg和partial1中的一样,既是参数,也是返回值。partial就是partial1中terminatePartial的返回值,partial的具体数据信息需要你根据ObjectInspector获取了。merger就表示把partial值先放到agg里,待会计算。
      • terminatePartial
        和partial1一样。
    3. FINAL
      FINAL进入到reduce阶段,也就是要完成最终结果的计算,和PARTIAL2不同的是它调用terminate,没什么好说的,输出最终结果而已。

    关于init方法,方法原型:

    public ObjectInspector init(Mode m, ObjectInspector[] parameters)
    

    这个方法会在每个阶段都会调用一次,参数m表示当前调用的阶段,parameters表示当前阶段输入数据的oi。前面提到partial1的terminatePartial的输出就是partial2的输入数据,那么此时partial1的输出数据对应的oi,应该和partial2时调用init的参数parameters对应起来才能保存不出错。

    2.2.1 UDAF实例

    这里实现的udaf的实例,他完成如下功能:

    > SELECT col_concat(id,  '<' ,  '>',  ',' ) FROM person;
    输出:
    <1,2,3,4,5,6>
    udaf实现将某一个使用特定符号连接起来,并使用另外的字符包围左右。
    第一个参数就是列名,然后open,close,seperator
    

    代码如下:

    public class GenericUDAFColConcat extends AbstractGenericUDAFResolver{
    
        public GenericUDAFColConcat() {
        }
    
       /**
         在getEvaluator中做一些类型检查,
        */
        @Override
        public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
             // col_concat这个udaf需要接收4个参数
            if(parameters.length != 4){
                throw new UDFArgumentTypeException(parameters.length - 1,
                        "COL_CONCAT requires 4 argument, got " + parameters.length);
            }
             
            // 且只能用于连接一下PRIMITIVE类型的列
            if(parameters[0].getCategory() != ObjectInspector.Category.PRIMITIVE){
                throw new UDFArgumentTypeException(0,
                        "COL_CONCAT can only be used to concat PRIMITIVE type column, got " + parameters[0].getTypeName());
            }
            // 分隔符和包围符,只能时char或者STRING
            for(int i = 1; i < parameters.length; ++i){
                if(parameters[i].getCategory() != ObjectInspector.Category.PRIMITIVE){
                    throw new UDFArgumentTypeException(i,
                            "COL_CONCAT only receive type CHAR/STRING as its 2nd to 4th argument's type, got " + parameters[i].getTypeName());
                }
    
                PrimitiveObjectInspector poi = (PrimitiveObjectInspector) TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[i]);
                if(poi.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.CHAR &&
                        poi.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){
                    throw new UDFArgumentTypeException(i,
                            "COL_CONCAT only receive type CHAR/STRING as its 2nd to 4th argument's type, got " + parameters[i].getTypeName());
                }
            }
            
            // 返回自定义的XXXEvaluator
            return new GenericUDAFCOLCONCATEvaluator();
        }
    
         // 前一节也说过需要实现AbstractAggregationBuffer用来保存聚合的值
        private static class ColCollectAggregationBuffer extends GenericUDAFEvaluator.AbstractAggregationBuffer{
            // 遍历的列值暂时都方放到列表中保存起来。
            private List<String> colValueList ;
            private String open;
            private String close;
            private String seperator;
            private boolean isInit;
    
            public ColCollectAggregationBuffer() {
                colValueList = new LinkedList<>();
                this.isInit = false;
            }
    
            public void init(String open, String close, String seperator){
                this.open = open;
                this.close = close;
                this.seperator = seperator;
                this.isInit = true;
            }
    
            public boolean isInit(){
                return isInit;
            }
    
            public String concat(){
                String c = StringUtils.join(colValueList,seperator);
                return open + c + close;
            }
    
        }
    
        public static class GenericUDAFCOLCONCATEvaluator extends GenericUDAFEvaluator{
            // transient避免序列化,因为这些成员其实都是在init中初始化了,没有序列化的意义
            // inputOIs用来保存PARTIAL1和COMPELE输入数据的oi,这个各个阶段都可能不一样
            private transient List<ObjectInspector> inputOIs = new LinkedList<>();
            private transient Mode m;
            private transient String pString;
            // soi保存PARTIAL2和FINAL的输入数据的oi
            private transient StructObjectInspector soi;
            private transient ListObjectInspector valueFieldOI;
            private transient PrimitiveObjectInspector openFieldOI;
            private transient PrimitiveObjectInspector closeFieldOI;
            private transient PrimitiveObjectInspector seperatorFieldOI;
            private transient StructField valueField;
            private transient StructField openField;
            private transient StructField closeField;
            private transient StructField seperatorField;
            @Override
            public ObjectInspector init(Mode m, ObjectInspector[] parameters) throws HiveException {
                // 父类的init必须调用
                super.init(m,parameters);
                this.m = m;
                pString = "";
    
                for(ObjectInspector p : parameters){
                    pString += p.getTypeName();
                }
               
                
                if(m == Mode.PARTIAL1 || m == Mode.COMPLETE){
                    // 在PARTIAL1和COMPLETE阶段,输入数据都是原始表中数据,而不是中间聚合数据,这里初始化inputOIs
                    inputOIs.clear();
                    for(ObjectInspector p : parameters){
                        inputOIs.add((PrimitiveObjectInspector)p);
                    }
                }else {
                    // FINAL和PARTIAL2的输入数据OI都是上一阶段的输出,而不是原始表数据,这里parameter[0]其实就是上一阶段的输出oi,具体情况看下面
                    soi = (StructObjectInspector)parameters[0];
                    valueField = soi.getStructFieldRef("values");
                    valueFieldOI = (ListObjectInspector)valueField.getFieldObjectInspector();
                    openField = soi.getStructFieldRef("open");
                    openFieldOI = (PrimitiveObjectInspector) openField.getFieldObjectInspector();
                    closeField = soi.getStructFieldRef("close");
                    closeFieldOI = (PrimitiveObjectInspector)closeField.getFieldObjectInspector();
                    seperatorField = soi.getStructFieldRef("seperator");
                    seperatorFieldOI = (PrimitiveObjectInspector)seperatorField.getFieldObjectInspector();
                }
    
                // 这里开始返回各个阶段的输出OI
                if(m == Mode.PARTIAL1 || m == Mode.PARTIAL2){
                    // 后面的terminatePartial实现中,PARTIAL1 PARTIAL2的输出数据都是一个列表,我把中间聚合和结果values, 以及open,close, seperator
                   // 按序方到列表中,所以这个地方返回的oi是一个StructObjectInspector的实现类,它能够获取list中的值。
                    ArrayList<ObjectInspector> foi = new ArrayList<>();
                    foi.add(ObjectInspectorFactory.getStandardListObjectInspector(PrimitiveObjectInspectorFactory.javaStringObjectInspector));
                    foi.add(
                            PrimitiveObjectInspectorFactory.javaStringObjectInspector
                    );
                    foi.add(
                            PrimitiveObjectInspectorFactory.javaStringObjectInspector
                    );
                    foi.add(
                            PrimitiveObjectInspectorFactory.javaStringObjectInspector
                    );
    
                    ArrayList<String> fname = new ArrayList<String>();
                    fname.add("values");
                    fname.add("open");
                    fname.add("close");
                    fname.add("seperator");
                    return ObjectInspectorFactory.getStandardStructObjectInspector(fname, foi);
                }else{
                    // COMPLETE和FINAL都是返回最终聚合结果了,也就是String,所以这里返回javaStringObjectInspector即可
                    return PrimitiveObjectInspectorFactory.javaStringObjectInspector;
                }
            }
    
            @Override
            public AggregationBuffer getNewAggregationBuffer() throws HiveException {
                return new ColCollectAggregationBuffer();
            }
    
            @Override
            public void reset(AggregationBuffer aggregationBuffer) throws HiveException {
                ((ColCollectAggregationBuffer)aggregationBuffer).colValueList.clear();
    
            }
            
            // PARTIAL1和COMPLETE调用,iterate里就是把原始数据(参数objects[0])中的值保存到aggregationBuffer的列表中
            @Override
            public void iterate(AggregationBuffer aggregationBuffer, Object[] objects) throws HiveException {
                assert objects.length == 4;
                ColCollectAggregationBuffer ccAggregationBuffer = (ColCollectAggregationBuffer)aggregationBuffer;
                ccAggregationBuffer.colValueList.add(
                        PrimitiveObjectInspectorUtils.getString(objects[0], (PrimitiveObjectInspector)inputOIs.get(0)));
    
    
                if(!ccAggregationBuffer.isInit()){
                    ccAggregationBuffer.init(
                            PrimitiveObjectInspectorUtils.getString(objects[1], (PrimitiveObjectInspector)inputOIs.get(1)),
                            PrimitiveObjectInspectorUtils.getString(objects[2],(PrimitiveObjectInspector)inputOIs.get(2)),
                            PrimitiveObjectInspectorUtils.getString(objects[3],(PrimitiveObjectInspector)inputOIs.get(3))
                    );
                }
            }
    
             // PARTIAL1和PARTIAL2调用,没做什么,但是返回的值的一个‘List<Object> partialRet’ 和init中返回的StructObjectInspector对应,
            @Override
            public Object terminatePartial(AggregationBuffer aggregationBuffer) throws HiveException {
                ColCollectAggregationBuffer ccAggregationBuffer = (ColCollectAggregationBuffer)aggregationBuffer;
                List<Object> partialRet = new ArrayList<>();
                partialRet.add(ccAggregationBuffer.colValueList);
                partialRet.add(ccAggregationBuffer.open);
                partialRet.add(ccAggregationBuffer.close);
                partialRet.add(ccAggregationBuffer.seperator);
                return partialRet;
            }
    
             // PARTIAL2和FINAL调用,参数partial对应上面terminatePartial返回的列表,
            @Override
            public void merge(AggregationBuffer aggregationBuffer, Object partial) throws HiveException {
                ColCollectAggregationBuffer ccAggregationBuffer = (ColCollectAggregationBuffer)aggregationBuffer;
                if(partial != null){
                    // soi在init中初始化了,用它来获取partial中数据。
                    List<Object> partialList = soi.getStructFieldsDataAsList(partial);
                    // terminalPartial中数据被保存在list中,这个地方拿出来只是简单了合并两个list,其他不变。
                    List<String> values = (List<String>)valueFieldOI.getList(partialList.get(0));
                    ccAggregationBuffer.colValueList.addAll(values);
                    if(!ccAggregationBuffer.isInit){
                        ccAggregationBuffer.open = PrimitiveObjectInspectorUtils.getString(partialList.get(1), openFieldOI);
                        ccAggregationBuffer.close = PrimitiveObjectInspectorUtils.getString(partialList.get(2), closeFieldOI);
                        ccAggregationBuffer.seperator = PrimitiveObjectInspectorUtils.getString(partialList.get(3), seperatorFieldOI);
                    }
                }
            }
    
           // FINAL和COMPLETE调用,此时aggregationBuffer中用list保存了原始表表中一列的所有值,这里完成连接操作,返回一个string类型的连接结果。
            @Override
            public Object terminate(AggregationBuffer aggregationBuffer) throws HiveException {
                return ((ColCollectAggregationBuffer)aggregationBuffer).concat();
            }
        }
    }
    

    2.3 自定义表生成函数

    待完成。。。

    相关文章

      网友评论

          本文标题:HIVE 之UDF

          本文链接:https://www.haomeiwen.com/subject/alolextx.html