美文网首页
hive-1.2.1自定义函数UDF、UDTF

hive-1.2.1自定义函数UDF、UDTF

作者: 大道至简_6a43 | 来源:发表于2019-11-23 21:48 被阅读0次

    Hive中有三种UDF:

        1、用户定义函数(user-defined function)UDF;

        2、用户定义聚集函数(user-defined aggregate function,UDAF);

        3、用户定义表生成函数(user-defined table-generating function,UDTF)。

    ==========================================================================

    UDF操作作用于单个数据行,并且产生一个数据行作为输出。大多数函数都属于这一类(比如数学函数和字符串函数)。

    UDAF 接受多个输入数据行,并产生一个输出数据行。像COUNT和MAX这样的函数就是聚集函数。

    UDTF 操作作用于单个数据行,并且产生多个数据行-------一个表作为输出。lateral view explore()

    简单来说:

    UDF:返回对应值,一对一

    UDAF:返回聚类值,多对一

    UDTF:返回拆分值,一对多

    ****************************************************************************************************************

    UDTF

    package com.byte.udtf;

    import org.apache.commons.lang.StringUtils;

    import org.apache.hadoop.hive.ql.exec.UDFArgumentException;

    import org.apache.hadoop.hive.ql.metadata.HiveException;

    import org.apache.hadoop.hive.ql.udf.generic.GenericUDTF;

    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;

    import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;

    import org.apache.hadoop.hive.serde2.objectinspector.StructObjectInspector;

    import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;

    import org.json.JSONArray;

    import org.json.JSONException;

    import java.util.ArrayList;

    public class EventJsonUDTFextends GenericUDTF {

    //该方法中,我们将指定输出参数的名称和参数类型:

        @Override

        public StructObjectInspector initialize(ObjectInspector[] argOIs)throws UDFArgumentException {

    ArrayList fieldNames =new ArrayList();

    ArrayList fieldOIs =new ArrayList();

    fieldNames.add("event_name");

    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

    fieldNames.add("event_json");

    fieldOIs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);

    return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldOIs);

    }

    //输入1条记录,输出若干条结果

        @Override

        public void process(Object[] objects)throws HiveException {

    // 获取传入的et

            String input = objects[0].toString();

    // 如果传进来的数据为空,直接返回过滤掉该数据

            if (StringUtils.isBlank(input)) {

    return;

    }else {

    try {

    // 获取一共有几个事件(ad/facoriters)

                    JSONArray ja =new JSONArray(input);

    if (ja ==null)

    return;

    // 循环遍历每一个事件

                    for (int i =0; i < ja.length(); i++) {

    String[] result =new String[2];

    try {

    // 取出每个的事件名称(ad/facoriters)

                            result[0] = ja.getJSONObject(i).getString("en");

    // 取出每一个事件整体

                            result[1] = ja.getString(i);

    }catch (JSONException e) {

    continue;

    }

    // 将结果返回

                        forward(result);

    }

    }catch (JSONException e) {

    e.printStackTrace();

    }

    }

    }

    //当没有记录处理的时候该方法会被调用,用来清理代码或者产生额外的输出

        @Override

        public void close()throws HiveException {

    }

    }

    ****************************************************************************************************************

    UDF

    package com.byte.udf;

    import org.apache.commons.lang.StringUtils;

    import org.apache.hadoop.hive.ql.exec.UDF;

    import org.json.JSONException;

    import org.json.JSONObject;

    public class BaseFieldUDFextends UDF {

    public String evaluate(String line, String jsonkeysString) {

    // 0 准备一个sb

            StringBuilder sb =new StringBuilder();

    // 1 切割jsonkeys  mid uid vc vn l sr os ar md

            String[] jsonkeys = jsonkeysString.split(",");

    // 2 处理line  服务器时间 | json

            String[] logContents = line.split("\\|");

    // 3 合法性校验

            if (logContents.length !=2 || StringUtils.isBlank(logContents[1])) {

    return "";

    }

    // 4 开始处理json

            try {

    JSONObject jsonObject =new JSONObject(logContents[1]);

    // 获取cm里面的对象

                JSONObject base = jsonObject.getJSONObject("cm");

    // 循环遍历取值

                for (int i =0; i < jsonkeys.length; i++) {

    String filedName = jsonkeys[i].trim();

    if (base.has(filedName)) {

    sb.append(base.getString(filedName)).append("\t");

    }else {

    sb.append("\t");

    }

    }

    sb.append(jsonObject.getString("et")).append("\t");

    sb.append(logContents[0]).append("\t");

    }catch (JSONException e) {

    e.printStackTrace();

    }

    return sb.toString();

    }

    }

    时间有限  晚天补一个UDAF函数                    哈哈!!!

    相关文章

      网友评论

          本文标题:hive-1.2.1自定义函数UDF、UDTF

          本文链接:https://www.haomeiwen.com/subject/tllkwctx.html