美文网首页
UDAF示例之等距直方图

UDAF示例之等距直方图

作者: duan_yy | 来源:发表于2019-03-18 16:48 被阅读0次

背景介绍

在数据收集、模型验证、人群分析等场景,我们有时需要按区间统计频数,制作直方图,或者折线图,本文提供的示例非常适合借助hive实现超大数据量情况下的数据分布查看。
  hive的UDAF(UserDefindAggregateFunction),用户自定义聚合函数实现了同时操作多行数据的功能,我们常用的SUM、MIN、MAX、COUNT、COLLECT_SET等都是用下文同样的方式实现的UDAF,很是方便。
  然而hive自己实现的直方图UDAF(histogram_numeric)是不等距的,不直观,不便于作图,既然如此,就自己实现一个好啦~

本文亮点

  1. 简明精要说明如何编写UDAF
  2. 提供实际代码,如果只是想做等距直方图,直接复用代码就好
  3. 深入浅出的说明UDAF调用的过程

实现过程

概览

整个过程精炼来说,主要做这么几件事:

  1. 告诉hive,本UDAF需要什么样的参数,输出什么样的值
  2. 各个分节点如何计算
  3. 节点如何合并

这里需要引用另一篇博客的配图,感觉很清晰

udaf流程图
这张图主要的含义是
  1. init在不同时期服务于不同的函数,PARTIAL1时服务于iterate,PARTIAL2和FINAL服务于merge
  2. PARTIAL1时期是在进行主体计算,PARTIAL2和FINAL是在合并mapper

如果感觉有点懵,看看下面的代码和注释相信你就懂了

实际代码

不慌不慌,我们来一步一步来
本UDF输入的三个参数分别
 ①用于画直方图的列的值
 ②直方图的每个柱的宽度
 ③直方图的柱的数量

import org.apache.hadoop.hive.ql.exec.UDFArgumentTypeException;
import org.apache.hadoop.hive.ql.metadata.HiveException;
import org.apache.hadoop.hive.ql.parse.SemanticException;
import org.apache.hadoop.hive.ql.udf.generic.AbstractGenericUDAFResolver;
import org.apache.hadoop.hive.ql.udf.generic.GenericUDAFEvaluator;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.ObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.PrimitiveObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.StandardMapObjectInspector;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorFactory;
import org.apache.hadoop.hive.serde2.objectinspector.primitive.PrimitiveObjectInspectorUtils;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfo;
import org.apache.hadoop.hive.serde2.typeinfo.TypeInfoUtils;
import org.apache.hadoop.io.IntWritable;

import java.util.HashMap;
import java.util.Map;

public class HistGram extends AbstractGenericUDAFResolver {

    @Override
    public GenericUDAFEvaluator getEvaluator(TypeInfo[] parameters) throws SemanticException {
        //1. 检查传入参数类型(其实也可以不检查,只是便于找问题,逻辑上也严格一些而已)
        if (parameters.length != 3) {//检查输入参数的个数
            throw new UDFArgumentTypeException(parameters.length - 1,
                    "Exactly 3 argument is expected.");
        }

        //根据参数的类型 判断传参是否正确
        ObjectInspector oi1 = TypeInfoUtils.getStandardJavaObjectInspectorFromTypeInfo(parameters[0]);

        //第一个参数是double类型,那么首先需要时基本数据类型(与List、Map、Struct想反)
        if (oi1.getCategory() != ObjectInspector.Category.PRIMITIVE) {
            throw new UDFArgumentTypeException(0,
                    "Argument must be PRIMITIVE, but "
                            + oi1.getCategory().name()
                            + " was passed.");
        }

        PrimitiveObjectInspector inputOI = (PrimitiveObjectInspector) oi1;

        //然后判断是基本数据类型里面的double
        if (inputOI.getPrimitiveCategory() != PrimitiveObjectInspector.PrimitiveCategory.DOUBLE) {
            throw new UDFArgumentTypeException(0,
                    "Argument 1 must be DOUBLE, but "
                            + inputOI.getPrimitiveCategory().name()
                            + " was passed.");
        }

        //只有这一句是必须的
        return new HistGramEvaluator();
    }

    public static class HistGramEvaluator extends GenericUDAFEvaluator {

        PrimitiveObjectInspector colValueOI, stepSizeOI, histNumOI;//列的值、直方图跨度、直方图数量

        StandardMapObjectInspector result;

        @Override
        public ObjectInspector init(Mode m, ObjectInspector[] parameters)
                throws HiveException {
            //2. 初始化变量
            assert (parameters.length == 3);
            super.init(m, parameters);

            // 指定各个阶段输出数据格式都为Map<Int,Int>类型
            ObjectInspector returnKey = PrimitiveObjectInspectorFactory
                    .getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.INT);
            ObjectInspector returnValue = PrimitiveObjectInspectorFactory
                    .getPrimitiveJavaObjectInspector(PrimitiveObjectInspector.PrimitiveCategory.INT);

            // 指定各个阶段输出数据格式都为Map<Int,Int>类型  注意不同阶段的输入值不同
            /**
             * 这里是整个UDAF最精华的部分
             * 从博客的图片我们可以看到在PARTIAL1阶段,调用init主要用来初始化iterate方法,也就是我们的计算过程
             * 在PARTIAL2和FINAL阶段,分别是中途合并mapper和结束mapper过程,都是调用merge函数,对中间的输出结果进行合并
             * 在COMPLETE阶段是最终输出结果
             * 因此,各个阶段要初始化的变量不同,尤其是parameters里面放的变量完全不同,PARTIAL1阶段,parameters就是输入值
             * PARTIAL2和FINAL阶段,parameters是输出值
             */
            if (m == Mode.PARTIAL1) {
                colValueOI = (PrimitiveObjectInspector) parameters[0];
                stepSizeOI = (PrimitiveObjectInspector) parameters[1];
                histNumOI = (PrimitiveObjectInspector) parameters[2];
                return ObjectInspectorFactory.getStandardMapObjectInspector(returnKey, returnValue);
            } else if (m == Mode.PARTIAL2) {
                result = (StandardMapObjectInspector) parameters[0];
                return ObjectInspectorFactory.getStandardMapObjectInspector(returnKey, returnValue);
            } else if (m == Mode.FINAL) {
                result = (StandardMapObjectInspector) parameters[0];
                return ObjectInspectorFactory.getStandardMapObjectInspector(returnKey, returnValue);
            } else if (m == Mode.COMPLETE) {
                return ObjectInspectorFactory.getStandardMapObjectInspector(returnKey, returnValue);
            } else {
                throw new RuntimeException("no such mode Exception");
            }
        }

        /**
         * 存储当前中间变量的类
         */
        static class HistGramAgg implements AggregationBuffer {
            //3. 指明mapper计算的中间缓存值
            //此UDAF的中间缓存值就是记录频数的countMap
            Map<Integer, Integer> countMap = new HashMap<Integer, Integer>();

            //两个map相加
            void add(Map<?, ?> otherMap) {
                if (otherMap == null || otherMap.isEmpty()) return;

                //循环累加值
                for (Map.Entry<?, ?> entry : otherMap.entrySet()) {
                    Object key = (Object) entry.getKey();
                    Object value = (Object) entry.getValue();
                    int index = key instanceof Integer ? (Integer) key : ((IntWritable) key).get();
                    Integer otherCount = value instanceof Integer ? (Integer) value : ((IntWritable) value).get();
                    Integer beforeValue = countMap.get(index);//这个区间之前的数量
                    if (beforeValue == null) {
                        countMap.put(index, otherCount);
                    } else {
                        countMap.put(index, beforeValue + otherCount);
                    }
                }
            }
        }

        //创建新的聚合计算的需要的内存,用来存储mapper,combiner,reducer运算过程中的中间变量聚合值
        @Override
        public AggregationBuffer getNewAggregationBuffer() throws HiveException {
            //4. 新建mapper时,初始化中间缓存变量
            //本UDAF中,就是初始化countMap
            HistGramAgg myagg = new HistGramAgg();
            reset(myagg);
            return myagg;
        }

        //mapreduce支持mapper和reducer的重用,所以为了兼容,也需要做内存的重用。
        @Override
        public void reset(AggregationBuffer agg) throws HiveException {
            //5. 重置中间缓存变量
            //本UDAF中,就是重置countMap
            HistGramAgg myagg = (HistGramAgg) agg;
            myagg.countMap = new HashMap<Integer, Integer>();
        }

        private boolean warned = false;

        //map阶段调用,实现计算逻辑
        @Override
        public void iterate(AggregationBuffer agg, Object[] parameters)
                throws HiveException {
            //6. 主体计算过程
            //在我看来,这反而是最简单的部分
            assert (parameters.length == 3);
            if (parameters[0] != null) {
                HistGramAgg myagg = (HistGramAgg) agg;
                //获取相应输入变量
                double colValue = PrimitiveObjectInspectorUtils.getDouble(parameters[0], colValueOI);
                double stepSize = PrimitiveObjectInspectorUtils.getDouble(parameters[1], stepSizeOI);
                int histNum = PrimitiveObjectInspectorUtils.getInt(parameters[2], histNumOI);

                Map<Integer, Integer> theCountMap = new HashMap<Integer, Integer>();
                //如果countMap为空先进行初始化
                if (myagg.countMap.isEmpty()) {
                    for (int i = 0; i < histNum; i++) {
                        theCountMap.put(i, 0);
                    }
                }
                //在对应的区间累加
                int index = (int) (colValue / stepSize);
                if (index >= histNum) index = histNum - 1;
                theCountMap.put(index, 1);
                myagg.add(theCountMap);
            }
        }

        //mapper结束要返回的结果,还有combiner结束返回的结果
        @Override
        public Object terminatePartial(AggregationBuffer agg) throws HiveException {
            return terminate(agg);
        }

        @Override
        public void merge(AggregationBuffer agg, Object partial)
                throws HiveException {
            //7. 合并mapper
            //调用add方法,目的在于合并两个mapper的中间缓存变量,此UDAF指的是countMap
            if (partial != null) {
                HistGramAgg myagg = (HistGramAgg) agg;
                Map<?, ?> countMap = result.getMap(partial);
                myagg.add(countMap);
            }
        }

        //reducer返回结果,或者是只有mapper,没有reducer时,在mapper端返回结果。
        @Override
        public Object terminate(AggregationBuffer agg) throws HiveException {
            //8. 输出结果
            //返回最终的结果 countMap
            HistGramAgg myagg = (HistGramAgg) agg;
            return myagg.countMap;
        }
    }
}

同时代码我也放在了github上,有需要的直接下载下来打包就能用,传送门 UDAF示例之等距直方图

起飞!!! ->_<-

参考链接

[1]UDAF示例之等距直方图
[2]插图来源博客
[3]hive udaf开发入门和运行过程详解

相关文章

  • UDAF示例之等距直方图

    背景介绍 在数据收集、模型验证、人群分析等场景,我们有时需要按区间统计频数,制作直方图,或者折线图,本文提供的示例...

  • hist函数

    hist 用于绘制直方图,下面介绍每个参数的作用; 1)x: 用于绘制直方图的数据,该参数的值为一个向量 代码示例...

  • Hive 之 UDAF

    1. Background 一句话概括 UDAF 的背景就是系统自带的聚合函数无法满足用户需求。 2. Basic...

  • Android自定义柱状图表

    本文通过示例代码介绍如何自定义简单的直方图表,此图表并非常见的直方图表,而是可以分组的。此文不会过多涉及原理,比较...

  • spark的UDAF使用

    什么是UDAF? UDAF(User Defined Aggregate Function),即用户定义的聚合函数...

  • OpenCV 之ios 直方图均衡化

    OpenCV 之ios 直方图均衡化 目标 在这个教程中你将学到: 什么是图像的直方图和为什么图像的直方图很有用 ...

  • 大数据面试题搜集--持续更新

    #Hadoop HIVEUDF UDTF UDAF UDF:单行进入,单行输出UDAF:多行进入,单行输出UDTF...

  • Python代码库OpenCV之04读取图片绘制直方图(含代码)

    Python代码库OpenCV之04读取图片绘制直方图(含代码) 展开成直方图 代码 效果 分离颜色通道 代码 运...

  • Hive的Transform和UDF

    UDTF Hive中UDTF编写和使用 UDAF Hive udaf开发入门和运行过程详解 Hive通用型自定义聚...

  • Lynda中级教程-2色阶(亮度调整)

    色阶其实就是巧妙的运用直方图 更改之后,色阶的属性面板的直方图和图片的直方图完全不同了,为什么呢》因为一个是修改之...

网友评论

      本文标题:UDAF示例之等距直方图

      本文链接:https://www.haomeiwen.com/subject/gddbmqtx.html