美文网首页
数据清洗例子

数据清洗例子

作者: 博弈史密斯 | 来源:发表于2018-07-21 21:46 被阅读0次
    //所有数据的 key,省略一些值
    public static final String colomns = "touch_type,touch_time,event_type,click_time,customer_user_id...";
    
    /**
     * 正则表达式 截取用户行为埋点值
     */
    public static final String REGUBC = "(.*?)=(.*?)&";
    
    /**
     * 定义hive分隔符
     */
    public static final String HIVE_SEPERATE = "\001";
    

    下面是 Main 方法中要执行的:

    //从 HDFS 中加载文件,转成RDD
    JavaRDD<String> logRdd = javaSparkContext.textFile(scanFiles);
    
    final Broadcast<String[]> broadcast = javaSparkContext.broadcast(colomns.split(","));
    
    //把原始数据 通过正则表达式过滤,最终转换为 JSONObject 类型的 HashSet
    JavaRDD<JSONObject> jsonRdd = logRdd.mapPartitions(new FlatMapFunction<Iterator<String>, JSONObject>() {
        final String[] broadcastValue = broadcast.value();
    
        @Override
        public Iterable<JSONObject> call(Iterator<String> stringIterator) throws Exception {
            Set<JSONObject> jsonObjs = new HashSet<JSONObject>();
    
            while(stringIterator.hasNext()){
                //把 log 数据取出来
                String inputValue = stringIterator.next();
                int startIndex = inputValue.indexOf("_app.gif?");
                int endIndex = inputValue.lastIndexOf("HTTP/");
                String eventLog = inputValue.substring(startIndex + 9 , endIndex-1) + "&";
    
                //通过正则表达式匹配,存到 Map
                Matcher m = CommonUtil.getMatcher(eventLog, REGUBC);
                JSONObject jsonObj = new JSONObject();
                Map<String, String> map = new HashMap<>();
                while (m.find()) {
                    map.put(m.group(1), m.group(2));
                }
    
                //Map <--> colomns,保存到 JSONObject
                for (int i = 0; i<broadcastValue.length; i++) {
                    String columnName = broadcastValue[i];
                    if (map.containsKey(columnName)){
                        //String value = URLDecoder.decode(map.get(columnName) , "UTF-8");
                        jsonObj.put(columnName, map.get(columnName));
                    } else {
                        jsonObj.put(columnName,"");
                    }
                }
    
                //保存到 JSONObject 类型的 HashSet 中
                jsonObjs.add(jsonObj);
    
            }
    
            return jsonObjs;
        }
    });
    
    //从 jsonObject 中 取出 value,并连接起来
    JavaRDD<String> javaJsonRdd = jsonRdd.map(new Function<JSONObject, String>() {
        @Override
        public String call(JSONObject jsonObject) throws Exception {
            List<String> valuesList = new ArrayList<String>(jsonObject.values());
            //把 list 中的数据 连接起来,比如:Joiner.on("; ").join("tom", "jerry", "jack") => "tom; jerry; jack"
            String hiveEvent = Joiner.on(HIVE_SEPERATE).join(valuesList.iterator());
            return hiveEvent;
        }
    });
    
    //保存到 HDFS
    javaJsonRdd.saveAsTextFile(hdfsOutputFilePath);
    

    相关文章

      网友评论

          本文标题:数据清洗例子

          本文链接:https://www.haomeiwen.com/subject/imgtyftx.html