美文网首页
【网站点击流数据分析】04-数据预处理

【网站点击流数据分析】04-数据预处理

作者: Aluha_f289 | 来源:发表于2020-05-20 14:35 被阅读0次

    1、主要目的

    1、过滤“不合规”数据
    2、格式转换和规整
    3、根据后续的统计需求,过滤分离出各种不同主题(不同栏目path)的基础数据

    2、实现方式

    开发一个mr程序WeblogPreProcess。

    package com.learn.bigdata.hive.mr.pre;
    import java.io.IOException;
    import java.util.HashSet;
    import java.util.Set;
    import com.learn.bigdata.hive.mrbean.WebLogBean;
    import com.learn.bigdata.hive.mrbean.WebLogParser;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * 处理原始日志,过滤出真实pv请求
     * 转换时间格式
     * 对缺失字段填充默认值
     * 对记录标记valid和invalid
     */
    public class WeblogPreProcess {     
            static class WeblogPreProcessMapper extends Mapper<LongWritable, Text, Text, NullWritable> {        
                      /** 用来存储网站url分类数据 */      
                      Set<String> pages = new HashSet<>();      
                      Text k = new Text();      
                      NullWritable v = NullWritable.get();      
                      /**        
                      * 从外部加载网站url分类数据
                      */        
                    @Override       
                    protected void setup(Context context) {         
                            pages.add("/about");            
                            pages.add("/black-ip-list/");           
                            pages.add("/cassandra-clustor/");           
                            pages.add("/finance-rhive-repurchase/");            
                            pages.add("/hadoop-family-roadmap/");           
                            pages.add("/hadoop-hive-intro/");           
                            pages.add("/hadoop-zookeeper-intro/");          
                            pages.add("/hadoop-mahout-roadmap/");                   
                    }       
                    @Override       
                    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {            
                            String line = value.toString();         
                            WebLogBean webLogBean = WebLogParser.parser(line);
                            // 过滤js/图片/css等静态资源
                            WebLogParser.filtStaticResource(webLogBean, pages);         
                            /* if (!webLogBean.isValid()) return; */            
                            k.set(webLogBean.toString());           
                            context.write(k, v);        
                      } 
            }   
            public static void main(String[] args) throws Exception {       
                    Configuration conf = new Configuration();       
                    Job job = Job.getInstance(conf);        
                    job.setJarByClass(WeblogPreProcess.class);      
                    job.setMapperClass(WeblogPreProcessMapper.class);
                    job.setOutputKeyClass(Text.class);      
                    job.setOutputValueClass(NullWritable.class);    
                    FileInputFormat.setInputPaths(job, new Path(args[0]));
                    FileOutputFormat.setOutputPath(job, new Path(args[1]));
                    job.setNumReduceTasks(0);       
                    job.waitForCompletion(true);    
              }
     }
    
    
    package com.learn.bigdata.hive.mrbean;
    
     
    
    import java.text.ParseException;
    
    import java.text.SimpleDateFormat;
    
    import java.util.Locale;
    
    import java.util.Set;
    
     
    
    public class WebLogParser {
    
     
    
        public static SimpleDateFormat df1 = new SimpleDateFormat("dd/MMM/yyyy:HH:mm:ss", Locale.US);
    
        public static SimpleDateFormat df2 = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
    
     
    
        public static WebLogBean parser(String line) {
    
            WebLogBean webLogBean = new WebLogBean();
    
            String[] arr = line.split(" ");
    
            if (arr.length > 11) {
    
                webLogBean.setRemote_addr(arr[0]);
    
                webLogBean.setRemote_user(arr[1]);
    
                String time_local = formatDate(arr[3].substring(1));
    
                if(null==time_local) time_local="-invalid_time-";
    
                webLogBean.setTime_local(time_local);
    
                webLogBean.setRequest(arr[6]);
    
                webLogBean.setStatus(arr[8]);
    
                webLogBean.setBody_bytes_sent(arr[9]);
    
                webLogBean.setHttp_referer(arr[10]);
    
     
    
                //如果useragent元素较多,拼接useragent
    
                if (arr.length > 12) {
    
                    StringBuilder sb = new StringBuilder();
    
                    for(int i=11;i<arr.length;i++){
    
                        sb.append(arr[i]);
    
                    }
    
                    webLogBean.setHttp_user_agent(sb.toString());
    
                } else {
    
                    webLogBean.setHttp_user_agent(arr[11]);
    
                }
    
                if (Integer.parseInt(webLogBean.getStatus()) >= 400) {// 大于400,HTTP错误
    
                    webLogBean.setValid(false);
    
                }
    
                if("-invalid_time-".equals(webLogBean.getTime_local())){
    
                    webLogBean.setValid(false);
    
                }
    
            } else {
    
                webLogBean.setValid(false);
    
            }
    
            return webLogBean;
    
        }
    
     
    
        public static void filtStaticResource(WebLogBean bean, Set<String> pages) {
    
            if (!pages.contains(bean.getRequest())) {
    
                bean.setValid(false);
    
            }
    
        }
    
     
    
        public static String formatDate(String time_local) {
    
            try {
    
                return df2.format(df1.parse(time_local));
    
            } catch (ParseException e) {
    
                return null;
    
            }
    
        }
    
    }
    
    
    
    package com.learn.bigdata.hive.mrbean;
    
     
    
    import java.io.DataInput;
    
    import java.io.DataOutput;
    
    import java.io.IOException;
    
     
    
    import org.apache.hadoop.io.Writable;
    
     
    
    /**
    
     * 对接外部数据的层,表结构定义最好跟外部数据源保持一致
    
     * 术语: 贴源表
    
     *
    
     */
    
    public class WebLogBean implements Writable {
    
     
    
        private boolean valid = true;// 判断数据是否合法
    
        private String remote_addr;// 记录客户端的ip地址
    
        private String remote_user;// 记录客户端用户名称,忽略属性"-"
    
        private String time_local;// 记录访问时间与时区
    
        private String request;// 记录请求的url与http协议
    
        private String status;// 记录请求状态;成功是200
    
        private String body_bytes_sent;// 记录发送给客户端文件主体内容大小
    
        private String http_referer;// 用来记录从那个页面链接访问过来的
    
        private String http_user_agent;// 记录客户浏览器的相关信息
    
     
    
        
    
        public void set(boolean valid,String remote_addr, String remote_user, String time_local, String request, String status, String body_bytes_sent, String http_referer, String http_user_agent) {
    
            this.valid = valid;
    
            this.remote_addr = remote_addr;
    
            this.remote_user = remote_user;
    
            this.time_local = time_local;
    
            this.request = request;
    
            this.status = status;
    
            this.body_bytes_sent = body_bytes_sent;
    
            this.http_referer = http_referer;
    
            this.http_user_agent = http_user_agent;
    
        }
    
     
    
        @Override
    
        public String toString() {
    
            StringBuilder sb = new StringBuilder();
    
            sb.append(this.valid);
    
            sb.append("\001").append(this.getRemote_addr());
    
            sb.append("\001").append(this.getRemote_user());
    
            sb.append("\001").append(this.getTime_local());
    
            sb.append("\001").append(this.getRequest());
    
            sb.append("\001").append(this.getStatus());
    
            sb.append("\001").append(this.getBody_bytes_sent());
    
            sb.append("\001").append(this.getHttp_referer());
    
            sb.append("\001").append(this.getHttp_user_agent());
    
            return sb.toString();
    
        }
    
     
    
        @Override
    
        public void readFields(DataInput in) throws IOException {
    
            this.valid = in.readBoolean();
    
            this.remote_addr = in.readUTF();
    
            this.remote_user = in.readUTF();
    
            this.time_local = in.readUTF();
    
            this.request = in.readUTF();
    
            this.status = in.readUTF();
    
            this.body_bytes_sent = in.readUTF();
    
            this.http_referer = in.readUTF();
    
            this.http_user_agent = in.readUTF();
    
        }
    
     
    
        @Override
    
        public void write(DataOutput out) throws IOException {
    
            out.writeBoolean(this.valid);
    
            out.writeUTF(null==remote_addr?"":remote_addr);
    
            out.writeUTF(null==remote_user?"":remote_user);
    
            out.writeUTF(null==time_local?"":time_local);
    
            out.writeUTF(null==request?"":request);
    
            out.writeUTF(null==status?"":status);
    
            out.writeUTF(null==body_bytes_sent?"":body_bytes_sent);
    
            out.writeUTF(null==http_referer?"":http_referer);
    
            out.writeUTF(null==http_user_agent?"":http_user_agent);
    
        }
    
     
    
    }
    
    

    运行mr对数据进行预处理(重要hadoop执行jar文件)

    hadoop jar weblog.jar  com.learn.bigdata.hive.mr.pre.WeblogPreProcess /weblog/input /weblog/preout
    

    3、点击流模型数据梳理

    由于大量的指标统计从点击流模型中更容易得出,所以在预处理阶段,可以使用mr程序来生成点击流模型的数据。

    3.1、点击流模型pageviews表

    
    package com.learn.bigdata.hive.mr;
    
     
    
    import java.io.IOException;
    
    import java.text.ParseException;
    
    import java.text.SimpleDateFormat;
    
    import java.util.ArrayList;
    
    import java.util.Collections;
    
    import java.util.Comparator;
    
    import java.util.Date;
    
    import java.util.Locale;
    
    import java.util.UUID;
    
     
    
    import com.learn.bigdata.hive.mrbean.WebLogBean;
    
    import org.apache.commons.beanutils.BeanUtils;
    
    import org.apache.hadoop.conf.Configuration;
    
    import org.apache.hadoop.fs.Path;
    
    import org.apache.hadoop.io.LongWritable;
    
    import org.apache.hadoop.io.NullWritable;
    
    import org.apache.hadoop.io.Text;
    
    import org.apache.hadoop.mapreduce.Job;
    
    import org.apache.hadoop.mapreduce.Mapper;
    
    import org.apache.hadoop.mapreduce.Reducer;
    
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
     
    
    /**
    
     * 
    
     * 将清洗之后的日志梳理出点击流pageviews模型数据
    
     * 
    
     * 输入数据是清洗过后的结果数据
    
     * 
    
     * 区分出每一次会话,给每一次visit(session)增加了session-id(随机uuid)
    
     * 梳理出每一次会话中所访问的每个页面(请求时间,url,停留时长,以及该页面在这次session中的序号)
    
     * 保留referral_url,body_bytes_send,useragent
    
     */
    
    public class ClickStreamThree {
    
     
    
        static class ClickStreamMapper extends Mapper<LongWritable, Text, Text, WebLogBean> {
    
            Text k = new Text();
    
            WebLogBean v = new WebLogBean();
    
     
    
            @Override
    
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
                String line = value.toString();
    
                String[] fields = line.split("\001");
    
                if (fields.length < 9) return;
    
                //将切分出来的各字段set到weblogbean中
    
                v.set("true".equals(fields[0]) ? true : false, fields[1], fields[2], fields[3], fields[4], fields[5], fields[6], fields[7], fields[8]);
    
                //只有有效记录才进入后续处理
    
                if (v.isValid()) {
    
                    k.set(v.getRemote_addr());
    
                    context.write(k, v);
    
                }
    
            }
    
        }
    
     
    
        static class ClickStreamReducer extends Reducer<Text, WebLogBean, NullWritable, Text> {
    
            Text v = new Text();
    
            @Override
    
            protected void reduce(Text key, Iterable<WebLogBean> values, Context context) throws IOException, InterruptedException {
    
                ArrayList<WebLogBean> beans = new ArrayList<WebLogBean>();
    
                // 先将一个用户的所有访问记录中的时间拿出来排序
    
                try {
    
                    for (WebLogBean bean : values) {
    
                        WebLogBean webLogBean = new WebLogBean();
    
                        try {
    
                            BeanUtils.copyProperties(webLogBean, bean);
    
                        } catch(Exception e) {
    
                            e.printStackTrace();
    
                        }
    
                        beans.add(webLogBean);
    
                    }
    
                    //将bean按时间先后顺序排序
    
                    Collections.sort(beans, new Comparator<WebLogBean>() {
    
                        @Override
    
                        public int compare(WebLogBean o1, WebLogBean o2) {
    
                            try {
    
                                Date d1 = toDate(o1.getTime_local());
    
                                Date d2 = toDate(o2.getTime_local());
    
                                if (d1 == null || d2 == null)
    
                                    return 0;
    
                                return d1.compareTo(d2);
    
                            } catch (Exception e) {
    
                                e.printStackTrace();
    
                                return 0;
    
                            }
    
                        }
    
                    });
    
     
    
                    /**
    
                     * 以下逻辑为:从有序bean中分辨出各次visit,并对一次visit中所访问的page按顺序标号step
    
                     */
    
                    int step = 1;
    
                    String session = UUID.randomUUID().toString();
    
                    for (int i = 0; i < beans.size(); i++) {
    
                        WebLogBean bean = beans.get(i);
    
                        // 如果仅有1条数据,则直接输出
    
                        if (1 == beans.size()) {
    
                            // 设置默认停留市场为60s
    
                            v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001"
    
                                    + bean.getStatus());
    
                            context.write(NullWritable.get(), v);
    
                            session = UUID.randomUUID().toString();
    
                            break;
    
                        }
    
                        // 如果不止1条数据,则将第一条跳过不输出,遍历第二条时再输出
    
                        if (i == 0) {
    
                            continue;
    
                        }
    
                        // 求近两次时间差
    
                        long timeDiff = timeDiff(toDate(bean.getTime_local()), toDate(beans.get(i - 1).getTime_local()));
    
                        // 如果本次-上次时间差<30分钟,则输出前一次的页面访问信息
    
                        if (timeDiff < 30 * 60 * 1000) {
    
                            v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + step + "\001" + (timeDiff / 1000) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
    
                                    + beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
    
                            context.write(NullWritable.get(), v);
    
                            step++;
    
                        } else {
    
                            // 如果本次-上次时间差>30分钟,则输出前一次的页面访问信息且将step重置,以分隔为新的visit
    
                            v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + (step) + "\001" + (60) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
    
                                    + beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
    
                            context.write(NullWritable.get(), v);
    
                            // 输出完上一条之后,重置step编号
    
                            step = 1;
    
                            session = UUID.randomUUID().toString();
    
                        }
    
                        // 如果此次遍历的是最后一条,则将本条直接输出
    
                        if (i == beans.size() - 1) {
    
                            // 设置默认停留市场为60s
    
                            v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus());
    
                            context.write(NullWritable.get(), v);
    
                        }
    
                    }
    
                } catch (ParseException e) {
    
                    e.printStackTrace();
    
                }
    
            }
    
     
    
            private String toStr(Date date) {
    
                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
    
                return df.format(date);
    
            }
    
            private Date toDate(String timeStr) throws ParseException {
    
                SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
    
                return df.parse(timeStr);
    
            }
    
            private long timeDiff(String time1, String time2) throws ParseException {
    
                Date d1 = toDate(time1);
    
                Date d2 = toDate(time2);
    
                return d1.getTime() - d2.getTime();
    
     
    
            }
    
            private long timeDiff(Date time1, Date time2){
    
                return time1.getTime() - time2.getTime();
    
            }
    
     
    
        }
    
     
    
        public static void main(String[] args) throws Exception {
    
            Configuration conf = new Configuration();
    
            Job job = Job.getInstance(conf);
    
            job.setJarByClass(ClickStreamThree.class);
    
            job.setMapperClass(ClickStreamMapper.class);
    
            job.setReducerClass(ClickStreamReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
    
            job.setMapOutputValueClass(WebLogBean.class);
    
            job.setOutputKeyClass(Text.class);
    
            job.setOutputValueClass(Text.class);
    
             FileInputFormat.setInputPaths(job, new Path(args[0]));
    
             FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            job.waitForCompletion(true);
    
        }
    
    }
    
    
    
    package com.learn.bigdata.hive.mrbean;
    
     
    
    import java.io.DataInput;
    
    import java.io.DataOutput;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
     
    
    public class PageViewsBean implements Writable {
    
       private String session;
    
       private String remote_addr;
    
       private String timestr;
    
       private String request;
    
       private int step;
    
       private String staylong;
    
       private String referal;
    
       private String useragent;
    
       private String bytes_send;
    
       private String status;
    
     
    
       public void set(String session, String remote_addr, String useragent, String timestr, String request, int step, String staylong, String referal, String bytes_send, String status) {
    
          this.session = session;
    
          this.remote_addr = remote_addr;
    
          this.useragent = useragent;
    
          this.timestr = timestr;
    
          this.request = request;
    
          this.step = step;
    
          this.staylong = staylong;
    
          this.referal = referal;
    
          this.bytes_send = bytes_send;
    
          this.status = status;
    
       }
    
     
    
       @Override
    
       public void readFields(DataInput in) throws IOException {
    
          this.session = in.readUTF();
    
          this.remote_addr = in.readUTF();
    
          this.timestr = in.readUTF();
    
          this.request = in.readUTF();
    
          this.step = in.readInt();
    
          this.staylong = in.readUTF();
    
          this.referal = in.readUTF();
    
          this.useragent = in.readUTF();
    
          this.bytes_send = in.readUTF();
    
          this.status = in.readUTF();
    
       }
    
     
    
       @Override
    
       public void write(DataOutput out) throws IOException {
    
          out.writeUTF(session);
    
          out.writeUTF(remote_addr);
    
          out.writeUTF(timestr);
    
          out.writeUTF(request);
    
          out.writeInt(step);
    
          out.writeUTF(staylong);
    
          out.writeUTF(referal);
    
          out.writeUTF(useragent);
    
          out.writeUTF(bytes_send);
    
          out.writeUTF(status);
    
       }
    
    }
    
    

    Pageviews表模型数据生成

    hadoop jar weblogpreprocess.jar  \
    com.learn.bigdata.hive.mr.ClickStreamThree   \
    /user/hive/warehouse/dw_click.db/test_ods_weblog_origin/datestr=2013-09-20/ /test-click/pageviews/
    
    

    表结构:


    image.png

    3.2、点击流模型visit信息表

    注:“一次访问”=“N次连续请求”
    直接从原始数据中用hql语法得出每个人的“次”访问信息比较困难,可先用mapreduce程序分析原始数据得出“次”信息数据,然后再用hql进行更多维度统计。
    用MR程序从pageviews数据中,梳理出每一次visit的起止时间、页面信息。

    
    package com.learn.bigdata.hive.mr;
    
     
    
    import java.io.IOException;
    
    import java.util.ArrayList;
    
    import java.util.Collections;
    
    import java.util.Comparator;
    
     
    
    import com.learn.bigdata.hive.mrbean.PageViewsBean;
    
    import com.learn.bigdata.hive.mrbean.VisitBean;
    
    import org.apache.commons.beanutils.BeanUtils;
    
    import org.apache.hadoop.conf.Configuration;
    
    import org.apache.hadoop.fs.Path;
    
    import org.apache.hadoop.io.LongWritable;
    
    import org.apache.hadoop.io.NullWritable;
    
    import org.apache.hadoop.io.Text;
    
    import org.apache.hadoop.mapreduce.Job;
    
    import org.apache.hadoop.mapreduce.Mapper;
    
    import org.apache.hadoop.mapreduce.Reducer;
    
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
     
    
    /**
    
     * 从pageviews模型结果数据中进一步梳理出visit模型
    
     * sessionid  start-time   out-time   start-page   out-page   pagecounts  ......
    
     */
    
    public class ClickStreamVisit {
    
       // 以session作为key,发送数据到reducer
    
       static class ClickStreamVisitMapper extends Mapper<LongWritable, Text, Text, PageViewsBean> {
    
          PageViewsBean pvBean = new PageViewsBean();
    
          Text k = new Text();
    
          @Override
    
          protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
             String line = value.toString();
    
             String[] fields = line.split("\001");
    
             int step = Integer.parseInt(fields[5]);
    
             //(String session, String remote_addr, String timestr, String request, int step, String staylong, String referal, String useragent, String bytes_send, String status)
    
             //299d6b78-9571-4fa9-bcc2-f2567c46df3472.46.128.140-2013-09-18 07:58:50/hadoop-zookeeper-intro/160"https://www.google.com/""Mozilla/5.0"14722200
    
             pvBean.set(fields[0], fields[1], fields[2], fields[3],fields[4], step, fields[6], fields[7], fields[8], fields[9]);
    
             k.set(pvBean.getSession());
    
             context.write(k, pvBean);
    
          }
    
       }
    
     
    
       static class ClickStreamVisitReducer extends Reducer<Text, PageViewsBean, NullWritable, VisitBean> {
    
          @Override
    
          protected void reduce(Text session, Iterable<PageViewsBean> pvBeans, Context context) throws IOException, InterruptedException {
    
             // 将pvBeans按照step排序
    
             ArrayList<PageViewsBean> pvBeansList = new ArrayList<PageViewsBean>();
    
             for (PageViewsBean pvBean : pvBeans) {
    
                PageViewsBean bean = new PageViewsBean();
    
                try {
    
                   BeanUtils.copyProperties(bean, pvBean);
    
                   pvBeansList.add(bean);
    
                } catch (Exception e) {
    
                   e.printStackTrace();
    
                }
    
             }
    
             Collections.sort(pvBeansList, new Comparator<PageViewsBean>() {
    
                @Override
    
                public int compare(PageViewsBean o1, PageViewsBean o2) {
    
                   return o1.getStep() > o2.getStep() ? 1 : -1;
    
                }
    
             });
    
             // 取这次visit的首尾pageview记录,将数据放入VisitBean中
    
             VisitBean visitBean = new VisitBean();
    
             // 取visit的首记录
    
             visitBean.setInPage(pvBeansList.get(0).getRequest());
    
             visitBean.setInTime(pvBeansList.get(0).getTimestr());
    
             // 取visit的尾记录
    
             visitBean.setOutPage(pvBeansList.get(pvBeansList.size() - 1).getRequest());
    
             visitBean.setOutTime(pvBeansList.get(pvBeansList.size() - 1).getTimestr());
    
             // visit访问的页面数
    
             visitBean.setPageVisits(pvBeansList.size());
    
             // 来访者的ip
    
             visitBean.setRemote_addr(pvBeansList.get(0).getRemote_addr());
    
             // 本次visit的referal
    
             visitBean.setReferal(pvBeansList.get(0).getReferal());
    
             visitBean.setSession(session.toString());
    
             context.write(NullWritable.get(), visitBean);
    
          }
    
       }
    
     
    
       public static void main(String[] args) throws Exception {
    
          Configuration conf = new Configuration();
    
          Job job = Job.getInstance(conf);
    
          job.setJarByClass(ClickStreamVisit.class);
    
          job.setMapperClass(ClickStreamVisitMapper.class);
    
          job.setReducerClass(ClickStreamVisitReducer.class);
    
          job.setMapOutputKeyClass(Text.class);
    
          job.setMapOutputValueClass(PageViewsBean.class);
    
          job.setOutputKeyClass(NullWritable.class);
    
          job.setOutputValueClass(VisitBean.class);
    
          FileInputFormat.setInputPaths(job, new Path(args[0]));
    
          FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
          boolean res = job.waitForCompletion(true);
    
          System.exit(res?0:1);
    
       }
    
    }
    
    
    hadoop jar weblogpreprocess.jar com.learn.bigdata.hive.mr.ClickStreamVisit /weblog/sessionout /weblog/visitout
    

    然后,在hive仓库中建点击流visit模型表

    
    drop table if exist click_stream_visit;
    
    create table click_stream_visit(
    
    session     string,
    
    remote_addr string,
    
    inTime      string,
    
    outTime     string,
    
    inPage      string,
    
    outPage     string,
    
    referal     string,
    
    pageVisits  int)
    
    partitioned by (datestr string);
    
    

    然后,将MR运算得到的visit数据导入visit模型表

    load data inpath '/weblog/visitout' into table click_stream_visit partition(datestr='2013-09-18');
    

    ————————————————
    版权声明:本文为CSDN博主「一直不懂」的原创文章,遵循CC 4.0 BY-SA版权协议,转载请附上原文出处链接及本声明。
    原文链接:https://blog.csdn.net/shenchaohao12321/article/details/82959444

    相关文章

      网友评论

          本文标题:【网站点击流数据分析】04-数据预处理

          本文链接:https://www.haomeiwen.com/subject/pqagohtx.html