美文网首页
Hive中UDAF的实现方式调研

Hive中UDAF的实现方式调研

作者: 分裂四人组 | 来源:发表于2019-04-01 14:17 被阅读0次

    UDAF在Hive中实现主要有两种,类似于UDF:

    • 继承org.apache.hadoop.hive.ql.exec.UDAF.udaf: 实现比较简单,但在Hive中已经deperated;
    • 继承org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver: 实现比较繁琐,但功能强大;

    示例:实现UDAF,当有多行时,选择某一列中包含最多字符的值;

    基于AbstractGenericUDAFResolver方式实现

    package com.alipay.mbaprod.spark.udaf;
    
    import org.apache.hadoop.hive.ql.exec.Description;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.parse.SemanticException;
    import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorUtils;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.StringObjectInspector;
    import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
    import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
    
    
    @Description(name = "longest_record", value = "_FUNC_(expr) - Returns the maximum's length row of expr")
    public class LongestRecordUDAF extends AbstractGenericUDAFResolver {
    
      @Override
      public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
        // Now only support one parameter.
        if (parameters.length != 1) {
          throw new UDFArgumentTypeException(parameters.length - 1,
              "Exactly one argument is expected.");
        }
    
        ObjectInspector oi = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);
        if (!ObjectInspectorUtils.compareSupported(oi)) {
          throw new UDFArgumentTypeException(0, "Cannot support comparison of map<> type or complex type containing map<>.");
        }
    
        return new GenericUDAFMaxLenRowEvaluator();
      }
    
      // @UDFType(distinctLike=true)
      public static class GenericUDAFMaxLenRowEvaluator extends GenericUDAFEvaluator {
        ObjectInspector inputOI;
        ObjectInspector outputOI;
    
        @Override
        public ObjectInspector init(Mode mode, ObjectInspector[] parameters) throws HiveException {
          super.init(mode, parameters);
    
          inputOI = parameters[0];
          outputOI = ObjectInspectorUtils.getStandardObjectInspector(inputOI, ObjectInspectorUtils.ObjectInspectorCopyOption.JAVA);
          return outputOI;
        }
    
        static class MaxLenAgg implements AggregationBuffer {
          Object o;
        }
    
        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
          MaxLenAgg result = new MaxLenAgg();
          return result;
        }
    
        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
          MaxLenAgg maxagg = (MaxLenAgg) agg;
          maxagg.o = null;
        }
    
        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters) throws HiveException {
          System.out.println("iterate, len:" + parameters.length);
          merge(agg, parameters[0]);
        }
    
        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
          System.out.println("terminal:");
          return terminate(agg);
        }
    
        @Override
        public void merge(AggregationBuffer agg, Object partial) throws HiveException {
          if (partial != null) {
            MaxLenAgg maxagg = (MaxLenAgg) agg;
    
            if (maxagg.o != null) {
              String oldV = ((StringObjectInspector) inputOI).getPrimitiveJavaObject(maxagg.o);
              String newV = ((StringObjectInspector) inputOI).getPrimitiveJavaObject(partial);
              if (newV.length() > oldV.length()) {
                System.out.println("newV is greater, new:" + newV + ", old:" + oldV);
                maxagg.o = ObjectInspectorUtils.copyToStandardObject(partial, inputOI,
                    ObjectInspectorUtils.ObjectInspectorCopyOption.JAVA);
              }
            } else {
              maxagg.o = ObjectInspectorUtils.copyToStandardObject(partial, inputOI,
                  ObjectInspectorUtils.ObjectInspectorCopyOption.JAVA);
            }
          }
        }
    
        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
          System.out.println("ter:");
          MaxLenAgg maxagg = (MaxLenAgg) agg;
          return maxagg.o;
        }
      }
    }
    

    基于udaf的方式实现

    package com.alipay.mbaprod.spark.udaf;
    
    import com.alipay.mbaprod.spark.udtf.CollectCardUDTF;
    import org.apache.hadoop.hive.ql.exec.UDAF;
    import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    
    /**
     * @author shuming.lsm
     * @version 2019/03/29
     */
    public class MaxLenRowUDAF extends UDAF {
      // Define Logging
      private static final Logger LOG = LoggerFactory.getLogger(CollectCardUDTF.class);
    
      public static class MaxLenRowUDAFEvaluator implements UDAFEvaluator {
        private String result;
    
        // A - Initalize evaluator - indicating that no values have been
        // aggregated yet.
        public void init() {
          LOG.debug("Initialize evaluator");
          result = null;
        }
    
        // B- Iterate every time there is a new value to be aggregated
        public boolean iterate(String value) throws HiveException {
          LOG.debug("Iterating over each value for aggregation: " + value);
          if (result == null) {
            result = value;
            return true;
          }
    
          if (value != null && value.length() > result.length()) {
            result = value;
          }
          return true;
        }
    
        // C - Called when Hive wants partially aggregated results.
        public String terminatePartial() {
          LOG.debug("Return partially aggregated results");
          return result;
        }
    
        // D - Called when Hive decides to combine one partial aggregation with another
        public boolean merge(String other) {
          LOG.debug("merging by combining partial aggregation");
          if(other == null) {
            return true;
          }
    
          if (result == null) {
            result = other;
            return true;
          }
    
          if (other.length() > result.length()) {
            result = other;
          }
    
          return true;
        }
    
        // E - Called when the final result of the aggregation needed.
        public String terminate(){
          LOG.debug("At the end of last record of the group - returning final result");
          return result;
        }
      }
    }
    

    相关文章

      网友评论

          本文标题:Hive中UDAF的实现方式调研

          本文链接:https://www.haomeiwen.com/subject/yfidbqtx.html