美文网首页
Hive中UDTF的实现方式调研

Hive中UDTF的实现方式调研

作者: 分裂四人组 | 来源:发表于2019-04-01 14:20 被阅读0次

    简单UDTF的实现

    实现基于切割字符串并生成多行数据。

    package com.alipay.mbaprod.spark.udtf;
    
    import java.util.ArrayList;
    import java.util.Iterator;
    import java.util.List;
    
    import com.google.common.annotations.VisibleForTesting;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hive.ql.exec.MapredContext;
    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;
    import org.apache.hadoop.hive.ql.metadata.HiveException;
    import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;
    import org.apache.hadoop.hive.serde.serdeConstants;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
    import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;
    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
    
    public class SimpleUDTF extends GenericUDTF {
      private PrimitiveObjectInspector stringOI = null;
    
      @Override
      public void configure(MapredContext context) {
        super.configure(context);
        Configuration conf = context.getJobConf();
      }
    
      @Override
      public StructObjectInspector initialize(ObjectInspector[] args) throws UDFArgumentException {
        if (args.length != 1) {
          throw new UDFArgumentException("SimpleUDTF() should have 1 arguments");
        }
    
        for (int i = 0; i < args.length; ++i) {
          if (args[i].getCategory() != ObjectInspector.Category.PRIMITIVE ||
              !args[i].getTypeName().equals(serdeConstants.STRING_TYPE_NAME)) {
            throw new UDFArgumentException("SimpleUDTF()'s arguments have to be string type");
          }
        }
    
        stringOI = (PrimitiveObjectInspector) args[0];
    
        List<String> fieldNames = new ArrayList<String>(2);
        List<ObjectInspector> fieldOIs = new ArrayList<ObjectInspector>(2);
    
        fieldNames.add("col1");
        fieldNames.add("col2");
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
        return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);
      }
    
      @VisibleForTesting
      public List<Object[]> processInputRecord(String id){
        ArrayList<Object[]> result = new ArrayList<Object[]>();
        // ignoring null or empty input
        if (id == null || id.isEmpty()) {
          return result;
        }
        String[] lines = id.split("\n");
        for (String line : lines) {
          String[] tokens = line.trim().split("\\s+");
          if (tokens.length == 2){
            result.add(new Object[] { tokens[0], tokens[1]});
          } else if (tokens.length == 3){
            result.add(new Object[] { tokens[0], tokens[1]});
            result.add(new Object[] { tokens[0], tokens[2]});
          }
        }
        return result;
      }
    
      @Override
      public void process(Object[] args) throws HiveException {
        final String id = stringOI.getPrimitiveJavaObject(args[0]).toString();
        List<Object[]> results = processInputRecord(id);
        Iterator<Object[]> it = results.iterator();
        while (it.hasNext()) {
          Object[] r = it.next();
          forward(r);
        }
      }
    
      @Override
      public void close() throws HiveException {
        // TODO Auto-generated method stub
      }
    }
    
    

    相关文章

      网友评论

          本文标题:Hive中UDTF的实现方式调研

          本文链接:https://www.haomeiwen.com/subject/gfidbqtx.html