这节课我们一起学习Hadoop的自定义排序,Hadoop是有一套默认的排序规则的,但是这往往不能满足我们多样化的需求,为了让排序更多样化,这就需要用到我们本节课所要学习的自定义排序功能。枯燥的说理论我相信大家都很难记住,我们一起来通过一个例子来更好的理解学习它。
我们所要用到的数据如下所示,有四列,分别代表账户、收入、支出、时间。我们想要实现的功能是按照用户的收入金额排序收入金额越大排序越靠前,如果收入金额一样则比较支出,支出越少排名越靠前。
zhangsan@163.com 6000 0 2014-02-20
lisi@163.com 2000 0 2014-02-20
lisi@163.com 0 100 2014-02-20
zhangsan@163.com 3000 0 2014-02-20
wangwu@126.com 9000 0 2014-02-20
wangwu@126.com 0 200 2014-02-20
我们通过上面的数据可以知道我们最终要得到的结果如下,最后一列的意思是收入减去支出而得到的结余多少钱。
zhangsan@163.com 9000 0 9000
wangwu@126.com 9000 200 8800
lisi@163.com 2000 100 1900
我们使用Maven来管理我们的jar包,为此我们需要创建一个Maven工程,如下图所示
image
点击上图的“Maven Project”之后,我们会看到如下图所示的界面,我们勾选上第一个复选框(Create a simple project),然后点击Next
image
点击上图的"Next"之后,我们会进入到如下图所示的界面,我们在Group Id和Artifact Id中输入组ID和工程ID,然后点击“Finish”
image
点击上图的"Finish"之后我们便创建了一个Maven工程,不过需要说明的是,如果你是第一次在Eclipse中创建Maven工程,它会报错,其实是找不到对应的依赖包,关于这个问题的解决方法是在root根目录下放一个m2.tar.gz本地仓库并解压它,然后在你创建的工程上右键,鼠标放到Maven上,在其子菜单中点击“Update Project”,错误就消失了。如果大家还不是很清楚如何操作,请参考:http://blog.csdn.net/u012453843/article/details/52600313这篇博客进行学习。如果已经创建过Maven工程了,第二次再创建便不会报错了。
image
创建好了Maven工程,接下来我们需要在pom.xml中配置依赖项,由于我们要用到的是MapReduce的功能,因此我们需要把MapReduce所需要依赖的包给导进来,还要说明的是无论用到HDFS也好还是MapReduce也好,都需要依赖hadoop-common的包,因此也需要配置进来,由于我用的Hadoop版本是2.2.0,因此包的版本也写成2.2.0,如下所示:
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>
<groupId>com.myhadoop.mr</groupId>
<artifactId>sortdatacount</artifactId>
<version>0.0.1-SNAPSHOT</version>
<dependencies>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>2.2.0</version>
</dependency>
</dependencies>
</project>
添加好上面的配置之后我们保存,这时我们会发现Maven会自动给我们引进来我们所需要的jar包,如下图所示
image
接下来我们开始写我们的程序,由于这个小例子涉及到了求和和排序,因此一个MapReduce无法完成我们的需求,我们需要两个MapReduce来实现,第一个MapReduce用来求和,第一个MapReduce的输出可以做为第二个MapReduce的输入,第二个MapReduce用来排序。首先我们来写第一个MapReduce,我们给类起名为SumStep意思是求和阶段。
image
首先我们需要定义一个Bean,因为MapReduce都是以<key,value>的形式存放数据的,因此当我们所需要的数据比较多时,我们就需要用类来封装我们要处理的数据,这样才能满足使用MapReduce的条件,我们首先定义一个InfoBean,在这个Bean中我们定义了四个字段,分别是账户、收入、支出、结余。InfoBean要满足两个条件,第一个是序列化,第二个是可比较,因此我们继承了WritableComparable类,这样我们需要实现它的三个方法,分别是write、readFileds、compareTo,为了结果可读性更好,我们重写一下toString方法,对于类的初始化,我们也采用新的方式,就是定义一个set方法,在这个方法中进行初始化而不再是通过构造方法来实现,这样做的好处是我们可以避免忘了写无参构造器而导致的错误。InfoBean的代码如下所示。
package myhadoop.mr.sort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class InfoBean implements WritableComparable<InfoBean> {
private String account;//帐号
private double income;//收入,这里我们不考虑多么精准,因我我们用到的数据都是整数
private double outcome;//支出
private double surplus;//结余
/**
-
Text类中专门有个set方法让我们给它赋值,我们模仿Text类,也定义一个set方法来初始化InfoBean
-
@param account 帐号
-
@param income 收入
-
@param outcome 支出
/
public void set(String account,double income,double outcome){
this.account=account;
this.income=income;
this.outcome=outcome;
this.surplus=income-outcome;
}
/* -
序列化Bean
*/
public void write(DataOutput out) throws IOException {
out.writeUTF(account);
out.writeDouble(income);
out.writeDouble(outcome);
out.writeDouble(surplus);
}/**
- 反序列化Bean
/
public void readFields(DataInput in) throws IOException {
this.account=in.readUTF();
this.income=in.readDouble();
this.outcome=in.readDouble();
this.surplus=in.readDouble();
}
/* - 自定义比较方法,返回到int值是参数Bean与当前Bean的比较结果
- 如果传过来到Bean的收入值小于当前Bean的收入值,那么应该返回-1;
- 表示传过来的Bean应该排在当前Bean的后面。
- 如果传过来的Bean与当前Bean的收入值一样,那么就再比较支出的值,
- 如果传过来的Bean的支出值小于当前Bean的支出值的话,应该返回1,
- 表示传过来的Bean应该排在当前Bean的前面。
*/
public int compareTo(InfoBean o) {
if(this.income==o.getIncome()){
return this.getOutcome()>o.getOutcome() ? 1:-1;
}else{
return this.getIncome()>o.getIncome() ? -1:1;
}
}
- 反序列化Bean
/**
- 重写toString方法
*/
@Override
public String toString() {
return this.income+"\t"+this.outcome+"\t"+this.surplus;
}
public String getAccount() {
return account;
}
public void setAccount(String account) {
this.account = account;
}
public double getIncome() {
return income;
}
public void setIncome(double income) {
this.income = income;
}
public double getOutcome() {
return outcome;
}
public void setOutcome(double outcome) {
this.outcome = outcome;
}
public double getSurplus() {
return surplus;
}
public void setSurplus(double surplus) {
this.surplus = surplus;
}
}
接下来我们写第一个MapReduce,第一个MapReduce用来求和,SumStep类的代码如下:
package myhadoop.mr.sort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SumStep {
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(SumStep.class);
job.setMapperClass(SumMapper.class);
job.setMapOutputKeyClass(Text.class);
job.setMapOutputValueClass(InfoBean.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(SumReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class SumMapper extends Mapper<LongWritable, Text, Text, InfoBean>{
private Text k=new Text();
private InfoBean v=new InfoBean();
/**
* 重写map方法
*/
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//首先获取一行的数据
String line=value.toString();
//由于我们的数据是以空格分割的,因此我们以空格进行分割
String[] fields=line.split("\t");
//获取帐号
String account=fields[0];
//获取收入的值
double income=Double.parseDouble(fields[1]);
//获取支出的值
double outcome=Double.parseDouble(fields[2]);
//给k赋值
k.set(account);
//给v赋值
v.set(account, income, outcome);
//输出
context.write(k, v);
}
}
public static class SumReducer extends Reducer<Text, InfoBean, Text, InfoBean>{
private InfoBean v=new InfoBean();
/**
* 重写Reduce方法
*/
@Override
protected void reduce(Text key, Iterable<InfoBean> values,Context context)
throws IOException, InterruptedException {
//每个账户总的收入
double sum_income=0;
//每个账户总的支出
double sum_outcome=0;
for(InfoBean bean:values){
sum_income+=bean.getIncome();
sum_outcome+=bean.getOutcome();
}
//由于key就是账户,因此这里不用传account进去也没问题
v.set("", sum_income, sum_outcome);
//输出
context.write(key, v);
}
}
}
接下来我们写第二个MapReduce,第二个MapReduce用来排序,我们起名为SortStep,如下所示:
package myhadoop.mr.sort;
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class SortStep {
public static void main(String[] args) throws Exception {
Configuration conf=new Configuration();
Job job=Job.getInstance(conf);
job.setJarByClass(SortStep.class);
job.setMapperClass(SortMapper.class);
job.setMapOutputKeyClass(InfoBean.class);
job.setMapOutputValueClass(NullWritable.class);
FileInputFormat.setInputPaths(job, new Path(args[0]));
job.setReducerClass(SortReducer.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(InfoBean.class);
FileOutputFormat.setOutputPath(job, new Path(args[1]));
job.waitForCompletion(true);
}
public static class SortMapper extends Mapper<LongWritable, Text, InfoBean, NullWritable>{
private InfoBean k=new InfoBean();
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
//首先获取一行的数据
String line=value.toString();
//由于我们第一个MapReduce处理完之后toString方法是以\t为分割写的,因此我们这里分割数据应该用\t来分割
String[] fields=line.split("\t");
//获取帐号
String account=fields[0];
//获取收入的值
double income=Double.parseDouble(fields[1]);
//获取支出的值
double outcome=Double.parseDouble(fields[2]);
//给v赋值
k.set(account, income, outcome);
//输出,把Bean当作key它自己就具有排序功能,因此我们只需要Bean就可以了,value的值我们可以用NullWritable来代替
context.write(k, NullWritable.get());
}
}
public static class SortReducer extends Reducer<InfoBean, NullWritable, Text, InfoBean>{
private Text k=new Text();
@Override
protected void reduce(InfoBean bean, Iterable<NullWritable> values,Context context)
throws IOException, InterruptedException {
String account=bean.getAccount();
k.set(account);
context.write(k, bean);
}
}
}
接下来我们把工程导出,我们在工程上右键,点击"Export",如下图所示:
image 我们点击上图的"Export"之后我们会进入如下图所示界面,我们点击"JAR file",然后点击"Next"
image
点击上图的"Next"之后我们会进入到如下图所示界面,我们勾选第二个复选框,JAR file这一栏我们输入/root/sortmr.jar,然后点击"Finish",如下图所示。
image
导出成功后,我们到root根目录下看一下是否有我们刚才命名的sortmr.jar这个文件,如下图所示,发现确实有这个文件。
image
我们要想执行MapReduce,首先得有数据,因此我们在root根目录下建一个trade_info文件,并把我们在本篇博客开始的时候所说的数据粘贴进去,**我们以\t来分隔每列的值**,如zhangsan@163.com与6000之间是以制表符(\t)来分隔的,保存并退出。如下图所示。
image
接下来我们把这个文件上传到hdfs系统根目录下,在上传之前我们先检查该起的进程是否都起来了,如下图所示,发现进程确实都启动起来了,如果你的进程还没有启动并且不知道怎么启动的话,请参考:http://blog.csdn.net/u012453843/article/details/52600313这篇博客进行启动。
image
接下来我们把刚才创建的trade_info文件上传到hdfs系统根目录下,上传完之后我们到hdfs根目录下查看文件是否已经上传成功,如下图所示,发现trade_info文件确实已经被上传到hdfs系统根目录下了。
image
接下来我们执行第一个MapReduce程序,先把源数据求和,如下图所示,执行成功之后我们查看一下是否执行成功了,我们可以看到确实已经帮我们把数据根据账户求和了。
image
接下来我们执行第二个MapReduce,进行排序,我们在命令中使用了/sum而没有使用/sum/part-r-00000,这样也是没错的,因为hadoop会去sum文件夹下去扫描,凡是不以下划线开头的都被扫描进来,我们知道在sum文件夹下只有_SUCCESS和part-r-00000两个文件,第一个文件会被忽略,因此也就等同于/sum/part-r-00000了。执行成功之后我们查看sort文件夹下的part-r-00000文件,看看结果如何,我们发现确实已经帮我们排好序了,而且是以我们自定义的规则拍好序了。
image
网友评论