package com.vanke.udfs.cust;
import org.apache.hadoop.hive.ql.exec.UDAF;
import org.apache.hadoop.hive.ql.exec.UDAFEvaluator;
public class AccumulatedTransactions extends UDAF {
public static class MyEvaluator implements UDAFEvaluator {
//最终结果
private StringBuilder result;
private StringBuilder content;
//负责初始化计算函数并设置它的内部状态,result是存放最终结果的
//初始化
public MyEvaluator()
{
init();
}
@Override
public void init() {
result = new StringBuilder();
content = new StringBuilder();
}
//每次对一个新值进行聚集计算都会调用iterate方法
public boolean iterate(String roomName,String buy_time,String payment_method)
{
content.append("{").append("\"room_name\":\"").append(roomName).append("\"").append(",")
.append("\"buy_time\":\"").append(buy_time).append("\"").append(",")
.append("\"payment_method\":\"").append(payment_method).append("\"").append("},");
return true;
}
//Hive需要部分聚集结果的时候会调用该方法
//会返回一个封装了聚集计算当前状态的对象
public String terminatePartial()
{
return content.toString();
}
//合并两个部分聚集值会调用这个方法
public boolean merge(String other)
{ content.append(other);
return true;
}
//Hive需要最终聚集结果时候会调用该方法
public String terminate()
{
return result.append("[").append(content.toString()).append("]").toString();
}
}
}
网友评论