接下来我们就要开始创建pageView的表了,所谓PageView就是展示用户的行为,从哪个网页过来。跳转了几个网页,停留了多长时间等。那么我们用

在这里我们省去了运行的main程序,这个参考上节我们再PreCleanData中的main方法。因为都是标准化代码。就不再展示了。现在就开始归纳怎么将标准化数据变成点击流模型:
mapper的话没有什么好说的。主要是把该赋值的属性参数赋值,因为这里主要涉及到聚合的操作。在mapper阶段就相当简化了。
package cn.leon.transMapper;
import cn.leon.pageViewBean.CleanBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class TransMapper extends Mapper<LongWritable,Text, Text, CleanBean> {
CleanBean cleanBean = new CleanBean();
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//切分
String lines[] = value.toString().split("\001");
//如果对象长度不为9个则直接返回空
if (lines.length < 9) return;
//将每个字段都对应赋值到cleanBean上
boolean vaild = lines[0].equals("true") ? true : false;
cleanBean.set(vaild,lines[1],lines[2],lines[3],lines[4],lines[5],lines[6],lines[7],lines[8]);
//将cleanBean中的ip为key,内容为cleanbean本身作为k2,v2
if (cleanBean.isValid()){
context.write(new Text(cleanBean.getRemote_addr()),cleanBean);
}
}
}
接下来就是重头戏Reduce阶段了。我们知道,我们要聚合的是用户的ip地址。是要统计在相同ip下用户的操作步骤和停留时间。那么久可以用ip作为key,cleanBean作为Value进行Reduce的聚合。然后进行排序,比较相邻两条记录中的时间差,如果时间差<30分钟,则该两条记录属于同一个session。
package cn.leon.transReducer;
import cn.leon.pageViewBean.CleanBean;
import org.apache.commons.beanutils.BeanUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import org.codehaus.jackson.map.util.BeanUtil;
import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.text.ParseException;
import java.text.SimpleDateFormat;
import java.util.*;
public class TransReducer extends Reducer<Text, CleanBean, NullWritable, Text> {
@Override
protected void reduce(Text key, Iterable<CleanBean> values, Context context) throws IOException, InterruptedException {
ArrayList<CleanBean> beans = new ArrayList<CleanBean>();
Text v = new Text();
for (CleanBean bean : values){
//为什么list集合当中不能直接添加循环出来的这个bean?
//这里通过属性拷贝,每次new 一个对象,避免了bean的属性值每次覆盖
//这是涉及到java的深浅拷贝问题
CleanBean cleanBean = new CleanBean();
try {
BeanUtils.copyProperties(cleanBean,bean);
} catch (IllegalAccessException e) {
e.printStackTrace();
} catch (InvocationTargetException e) {
e.printStackTrace();
}
beans.add(cleanBean);
}
//将bean按时间先后顺序排序,排好序之后,就计算这个集合当中下一个时间和上一个时间的差值 ,如
//如果差值小于三十分钟,那么就代表一次会话,如果差值大于30分钟,那么就代表多次会话
//将我们的weblogBean塞到一个集合当中,我们就可以自定义排序,对集合当中的数据进行排序
Collections.sort(beans, new Comparator<CleanBean>() {
@Override
public int compare(CleanBean o1, CleanBean o2) {
try {
Date d1 = toDate(o1.getTime_local());
Date d2 = toDate(o2.getTime_local());
if (d1 == null || d2 == null) {
return 0;
}
return d1.compareTo(d2);
} catch (ParseException e) {
e.printStackTrace();
return 0;
}
}
});
/**
* 以下逻辑为:从有序bean中分辨出各次visit,并对一次visit中所访问的page按顺序标号step
* 核心思想:
* 就是比较相邻两条记录中的时间差,如果时间差<30分钟,则该两条记录属于同一个session
* 否则,就属于不同的session
*
*/
int step = 1;
//为了避免重复,session取UUID中的随机数
String session = UUID.randomUUID().toString();
for(int i = 0; i <beans.size(); i ++){
CleanBean bean = beans.get(i);
//如果数据只有一条,则直接输出
if (beans.size() == 1){
// 设置默认停留时长为60s
v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001"
+ bean.getStatus());
context.write(NullWritable.get(), v);
session = UUID.randomUUID().toString();
break;
}
//如果数据不止一条,则将第一条跳过,遍历第二条时候再输出
if (i == 0){
continue;
}
//求两次时间差
try {
long timeDiff = timeDiff(toDate(bean.getTime_local()), toDate(beans.get(i - 1).getTime_local()));
// 如果本次-上次时间差<30分钟,则输出前一次的页面访问信息
if (timeDiff < 30 * 60 * 1000) {
v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + step + "\001" + (timeDiff / 1000) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
+ beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
context.write(NullWritable.get(), v);
step++;
} else {
// 如果本次-上次时间差>30分钟,则输出前一次的页面访问信息且将step重置,以分隔为新的visit
v.set(session+"\001"+key.toString()+"\001"+beans.get(i - 1).getRemote_user() + "\001" + beans.get(i - 1).getTime_local() + "\001" + beans.get(i - 1).getRequest() + "\001" + (step) + "\001" + (60) + "\001" + beans.get(i - 1).getHttp_referer() + "\001"
+ beans.get(i - 1).getHttp_user_agent() + "\001" + beans.get(i - 1).getBody_bytes_sent() + "\001" + beans.get(i - 1).getStatus());
context.write(NullWritable.get(), v);
// 输出完上一条之后,重置step编号
step = 1;
session = UUID.randomUUID().toString();
}
// 如果此次遍历的是最后一条,则将本条直接输出
if (i == beans.size() - 1) {
// 设置默认停留时长为60s
v.set(session+"\001"+key.toString()+"\001"+bean.getRemote_user() + "\001" + bean.getTime_local() + "\001" + bean.getRequest() + "\001" + step + "\001" + (60) + "\001" + bean.getHttp_referer() + "\001" + bean.getHttp_user_agent() + "\001" + bean.getBody_bytes_sent() + "\001" + bean.getStatus());
context.write(NullWritable.get(), v);
}
} catch (ParseException e) {
e.printStackTrace();
}
}
}
private Date toDate(String timeStr) throws ParseException {
SimpleDateFormat df = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.US);
return df.parse(timeStr);
}
private long timeDiff(Date time1, Date time2) throws ParseException {
return time1.getTime() - time2.getTime();
}
}
pageBean对象
package cn.leon.pageViewBean;
import org.apache.hadoop.io.Writable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class PageViewBean implements Writable {
private String session; // 回话session
private String remote_addr;
private String timestr;
private String request;
private int step;
private String staylong;
private String referal;
private String useragent;
private String bytes_send;
private String status;
public String getSession() {
return session;
}
public void setSession(String session) {
this.session = session;
}
public String getRemote_addr() {
return remote_addr;
}
public void setRemote_addr(String remote_addr) {
this.remote_addr = remote_addr;
}
public String getTimestr() {
return timestr;
}
public void setTimestr(String timestr) {
this.timestr = timestr;
}
public String getRequest() {
return request;
}
public void setRequest(String request) {
this.request = request;
}
public int getStep() {
return step;
}
public void setStep(int step) {
this.step = step;
}
public String getStaylong() {
return staylong;
}
public void setStaylong(String staylong) {
this.staylong = staylong;
}
public String getReferal() {
return referal;
}
public void setReferal(String referal) {
this.referal = referal;
}
public String getUseragent() {
return useragent;
}
public void setUseragent(String useragent) {
this.useragent = useragent;
}
public String getBytes_send() {
return bytes_send;
}
public void setBytes_send(String bytes_send) {
this.bytes_send = bytes_send;
}
public String getStatus() {
return status;
}
public void setStatus(String status) {
this.status = status;
}
@Override
public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeUTF(this.session);
dataOutput.writeUTF(this.remote_addr);
dataOutput.writeUTF(this.timestr);
dataOutput.writeUTF(this.request);
dataOutput.writeInt(this.step);
dataOutput.writeUTF(this.staylong);
dataOutput.writeUTF(this.referal);
dataOutput.writeUTF(this.useragent);
dataOutput.writeUTF(this.bytes_send);
dataOutput.writeUTF(this.status);
}
@Override
public void readFields(DataInput dataInput) throws IOException {
this.session = dataInput.readUTF();
this.remote_addr = dataInput.readUTF();
this.timestr = dataInput.readUTF();
this.request = dataInput.readUTF();
this.step = dataInput.readInt();
this.staylong = dataInput.readUTF();
this.referal = dataInput.readUTF();
this.useragent = dataInput.readUTF();
this.bytes_send = dataInput.readUTF();
this.status = dataInput.readUTF();
}
}
网友评论