UDTF函数,表生成函数,他可以把一行打成多行多列,也可以打成一行多列,一列多行。
比起UDAF,UDTF更好理解一些。
大致分为三个过程:initialize() -- process() -- close()
initialize():会校验用户输入,并定义输出的列
process():将数据打散的过程,其中会调用forward(),每调用一次,就会生成一行
close():方法调用完毕时关闭方法
源码:
打散的字段应该是a,b;c,d;e,f这样的形式,按照;分为多行,按照,分为多列
public class Tex extends GenericUDTF {
/**
* 对传入的参数进行初始化
* 判断参数个数/类型
* 初始化表结构
*/
@Override
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
if (argOIs.length != 1) {
throw new UDFArgumentLengthException("actuly only one argument is expected");
}
if (argOIs[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(0, "type of String is expected but " + argOIs[0].getTypeName() + "is passed");
}
//初始化表结构
//创建数组列表存储表字段
List<String> fieldNames = new LinkedList<String>();
List<ObjectInspector> fieldIOs = new LinkedList<ObjectInspector>();
//表字段
fieldNames.add("name");
fieldNames.add("age");
//表字段数据类型
fieldIOs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldIOs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
//将表结构两部分聚合在一起
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldIOs);
}
/**
* 对数据处理的代码
* 如果是多列的话,可以将每一行的数据存入数组中,然后将数组传入forward,
* forward每调用一次都会产生一行数据
*/
@Override
public void process(Object[] args) throws HiveException {
String str = args[0].toString();
String[] splited = str.split(";");
for (int i = 0; i < splited.length; i++) {
try {
String[] res = splited[i].split(",");
forward(res);
} catch (Exception e) {
continue;
}
}
}
//方法调用完毕时关闭方法
@Override
public void close() throws HiveException {
}
}
一步一步拆解
initialize()
一方面在进行用户输入内容的校验,
另外这里还在定义拆解后的列的名称、类型
public StructObjectInspector initialize(ObjectInspector[] argOIs) throws UDFArgumentException {
if (argOIs.length != 1) {
throw new UDFArgumentLengthException("actuly only one argument is expected");
}
if (argOIs[0].getCategory() != ObjectInspector.Category.PRIMITIVE) {
throw new UDFArgumentTypeException(0, "type of String is expected but " + argOIs[0].getTypeName() + "is passed");
}
List<String> fieldNames = new LinkedList<String>();
List<ObjectInspector> fieldIOs = new LinkedList<ObjectInspector>();
//表字段
fieldNames.add("name");
fieldNames.add("age");
//表字段数据类型
fieldIOs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
fieldIOs.add(PrimitiveObjectInspectorFactory.javaStringObjectInspector);
//将表结构两部分聚合在一起
return ObjectInspectorFactory.getStandardStructObjectInspector(fieldNames, fieldIOs);
process()
实现数据打散的逻辑,可以根据需求进行多次循环,也可以不做循环。
多次循环就会生成多行
不做循环就只打散成为多列
在这个process()里,a,b;c,d会先打成两行
a,b
c,d
然后,再逗号分割,打散成两列
a b
c d
forward()会返回每一行结果
public void process(Object[] args) throws HiveException {
String str = args[0].toString();
String[] splited = str.split(";");
for (int i = 0; i < splited.length; i++) {
try {
String[] res = splited[i].split(",");
forward(res);
} catch (Exception e) {
continue;
}
}
}
网友评论