Hive-UDTF

作者: raincoffee | 来源:发表于2017-04-06 15:25 被阅读2457次

    UDTF

    上一篇介绍了基础的UDF——UDF和GenericUDF的实现,这一篇将介绍更复杂的用户自定义表生成函数(UDTF)。用户自定义表生成函数(UDTF)接受零个或多个输入,然后产生多列或多行的输出,如explode()。要实现UDTF,需要继承org.apache.hadoop.hive.ql.udf.generic.GenericUDTF,同时实现三个方法

    // 该方法指定输入输出参数:输入的Object Inspectors和输出的Struct。  
    abstract StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException;   
      
    // 该方法处理输入记录,然后通过forward()方法返回输出结果。  
    abstract void process(Object[] record) throws HiveException;  
      
    // 该方法用于通知UDTF没有行可以处理了。可以在该方法中清理代码或者附加其他处理输出。  
    abstract void close() throws HiveException;  
    

    其中:在0.13.0中initialize不需要实现。

    定义如下:

    public abstract class GenericUDTF {
        Collector collector;
    
        public GenericUDTF() {
            this.collector = null;
        }
    
        public void configure(MapredContext mapredContext) {
        }
    
        public StructObjectInspector initialize(StructObjectInspector argOIs) throws UDFArgumentException {
            List inputFields = argOIs.getAllStructFieldRefs();
            ObjectInspector[] udtfInputOIs = new ObjectInspector[inputFields.size()];
            for (int i = 0; i < inputFields.size(); ++i) {
                udtfInputOIs[i] = ((StructField) inputFields.get(i)).getFieldObjectInspector();
            }
            return initialize(udtfInputOIs);
        }
    
        @Deprecated
        public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
            throw new IllegalStateException("Should not be called directly");
        }
    
        public abstract void process(Object[] paramArrayOfObject) throws HiveException;
    
        public abstract void close() throws HiveException;
    
        public final void setCollector(Collector collector) {
            this.collector = collector;
        }
    
        protected final void forward(Object o) throws HiveException {
            this.collector.collect(o);
        }
    

    看一个例子

    FUNC(a) - separates the elements of array a into multiple rows, or the elements of a map into multiple rows and columns

    /*** Eclipse Class Decompiler plugin, copyright (c) 2016 Chen Chao (cnfree2000@hotmail.com) ***/
    package org.apache.hadoop.hive.ql.udf.generic;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    import java.util.Map;
    import java.util.Map.Entry;
    import org.apache.hadoop.hive.ql.exec.Description;
    import org.apache.hadoop.hive.ql.exec.TaskExecutionException;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.serde2.objectinspector.ListObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.MapObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
    
    @Description(name = "explode", value = "_FUNC_(a) - separates the elements of array a into multiple rows, or the elements of a map into multiple rows and columns ")
    public class GenericUDTFExplode extends GenericUDTF {
        private transient ObjectInspector inputOI;
        private final transient Object[] forwardListObj;
        private final transient Object[] forwardMapObj;
    
        public GenericUDTFExplode() {
            this.inputOI = null;
    
            this.forwardListObj = new Object[1];
            this.forwardMapObj = new Object[2];
        }
    
        public void close() throws HiveException {
        }
    
        public StructObjectInspector initialize(ObjectInspector[] args)
        throws UDFArgumentException
      {
        if (args.length != 1) {
          throw new UDFArgumentException("explode() takes only one argument");
        }
    
        ArrayList fieldNames = new ArrayList();
        ArrayList fieldOIs = new ArrayList();
    
        switch (1.$SwitchMap$org$apache$hadoop$hive$serde2$objectinspector$ObjectInspector$Category[args[0].getCategory().ordinal()])
        {
        case 1:
          this.inputOI = args[0];
          fieldNames.add("col");
          fieldOIs.add(((ListObjectInspector)this.inputOI).getListElementObjectInspector());
          break;
        case 2:
          this.inputOI = args[0];
          fieldNames.add("key");
          fieldNames.add("value");
          fieldOIs.add(((MapObjectInspector)this.inputOI).getMapKeyObjectInspector());
          fieldOIs.add(((MapObjectInspector)this.inputOI).getMapValueObjectInspector());
          break;
        default:
          throw new UDFArgumentException("explode() takes an array or a map as a parameter");
        }
    
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
      }
    
        public void process(Object[] o)
        throws HiveException
      {
        Iterator i$;
        switch (1.$SwitchMap$org$apache$hadoop$hive$serde2$objectinspector$ObjectInspector$Category[this.inputOI.getCategory().ordinal()])
        {
        case 1:
          ListObjectInspector listOI = (ListObjectInspector)this.inputOI;
          List list = listOI.getList(o[0]);
          if (list == null) {
            return;
          }
          for (i$ = list.iterator(); i$.hasNext(); ) { Object r = i$.next();
            this.forwardListObj[0] = r;
            forward(this.forwardListObj);
          }
          break;
        case 2:
          MapObjectInspector mapOI = (MapObjectInspector)this.inputOI;
          Map map = mapOI.getMap(o[0]);
          if (map == null) {
            return;
          }
          for (Map.Entry r : map.entrySet()) {
            this.forwardMapObj[0] = r.getKey();
            this.forwardMapObj[1] = r.getValue();
            forward(this.forwardMapObj);
          }
          break;
        default:
          throw new TaskExecutionException("explode() can only operate on an array or a map");
        }
      }
    
        public String toString() {
            return "explode";
        }
    }
    

    一个分割字符串的例子:

    @Description(  
        name = "explode_name",  
        value = "_FUNC_(col) - The parameter is a column name."  
            + " The return value is two strings.",  
        extended = "Example:\n"  
            + " > SELECT _FUNC_(col) FROM src;"  
            + " > SELECT _FUNC_(col) AS (name, surname) FROM src;"  
            + " > SELECT adTable.name,adTable.surname"  
            + " > FROM src LATERAL VIEW _FUNC_(col) adTable AS name, surname;"  
    )  
    public class ExplodeNameUDTF extends GenericUDTF{  
      
        @Override  
        public StructObjectInspector initialize(ObjectInspector[] argOIs)  
                throws UDFArgumentException {  
              
            if(argOIs.length != 1){  
                throw new UDFArgumentException("ExplodeStringUDTF takes exactly one argument.");  
            }  
            if(argOIs[0].getCategory() != ObjectInspector.Category.PRIMITIVE  
                    && ((PrimitiveObjectInspector)argOIs[0]).getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.STRING){  
                throw new UDFArgumentTypeException(0, "ExplodeStringUDTF takes a string as a parameter.");  
            }  
              
            ArrayList<String> fieldNames = new ArrayList<String>();  
            ArrayList<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>();  
            fieldNames.add("name");  
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);  
            fieldNames.add("surname");  
            fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);  
                  
            return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);  
        }  
          
        @Override  
        public void process(Object[] args) throws HiveException {  
            // TODO Auto-generated method stub  
            String input = args[0].toString();  
            String[] name = input.split(" ");  
            forward(name);  
        }  
      
        @Override  
        public void close() throws HiveException {  
            // TODO Auto-generated method stub  
              
        }  
      
    }  
    

    记住 最后调用forward函数。

    相关文章

      网友评论

        本文标题:Hive-UDTF

        本文链接:https://www.haomeiwen.com/subject/darqattx.html