//所有数据的 key,省略一些值
public static final String colomns = "touch_type,touch_time,event_type,click_time,customer_user_id...";
/**
* 正则表达式 截取用户行为埋点值
*/
public static final String REGUBC = "(.*?)=(.*?)&";
/**
* 定义hive分隔符
*/
public static final String HIVE_SEPERATE = "\001";
下面是 Main 方法中要执行的:
//从 HDFS 中加载文件,转成RDD
JavaRDD<String> logRdd = javaSparkContext.textFile(scanFiles);
final Broadcast<String[]> broadcast = javaSparkContext.broadcast(colomns.split(","));
//把原始数据 通过正则表达式过滤,最终转换为 JSONObject 类型的 HashSet
JavaRDD<JSONObject> jsonRdd = logRdd.mapPartitions(new FlatMapFunction<Iterator<String>, JSONObject>() {
final String[] broadcastValue = broadcast.value();
@Override
public Iterable<JSONObject> call(Iterator<String> stringIterator) throws Exception {
Set<JSONObject> jsonObjs = new HashSet<JSONObject>();
while(stringIterator.hasNext()){
//把 log 数据取出来
String inputValue = stringIterator.next();
int startIndex = inputValue.indexOf("_app.gif?");
int endIndex = inputValue.lastIndexOf("HTTP/");
String eventLog = inputValue.substring(startIndex + 9 , endIndex-1) + "&";
//通过正则表达式匹配,存到 Map
Matcher m = CommonUtil.getMatcher(eventLog, REGUBC);
JSONObject jsonObj = new JSONObject();
Map<String, String> map = new HashMap<>();
while (m.find()) {
map.put(m.group(1), m.group(2));
}
//Map <--> colomns,保存到 JSONObject
for (int i = 0; i<broadcastValue.length; i++) {
String columnName = broadcastValue[i];
if (map.containsKey(columnName)){
//String value = URLDecoder.decode(map.get(columnName) , "UTF-8");
jsonObj.put(columnName, map.get(columnName));
} else {
jsonObj.put(columnName,"");
}
}
//保存到 JSONObject 类型的 HashSet 中
jsonObjs.add(jsonObj);
}
return jsonObjs;
}
});
//从 jsonObject 中 取出 value,并连接起来
JavaRDD<String> javaJsonRdd = jsonRdd.map(new Function<JSONObject, String>() {
@Override
public String call(JSONObject jsonObject) throws Exception {
List<String> valuesList = new ArrayList<String>(jsonObject.values());
//把 list 中的数据 连接起来,比如:Joiner.on("; ").join("tom", "jerry", "jack") => "tom; jerry; jack"
String hiveEvent = Joiner.on(HIVE_SEPERATE).join(valuesList.iterator());
return hiveEvent;
}
});
//保存到 HDFS
javaJsonRdd.saveAsTextFile(hdfsOutputFilePath);
网友评论