6.MapReduce 实现自定义排序 ★★★
排序是MapReduce框架中最重要的操作之一。
MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。
默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。
对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。
对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。
各种排序的分类:
1、部分排序
MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序
2、全排序
最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构
3、辅助排序
在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。
4、二次排序
在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。
(1).Pojo
package top.wangyq.mySort;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import org.apache.hadoop.io.WritableComparable;
public class Pojo implements WritableComparable<Pojo>{
String name;
Integer age;
Double high;
public Pojo() {
super();
}
public Pojo(String name, int age, double high) {
super();
this.name = name;
this.age = age;
this.high = high;
}
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeInt(age);
out.writeDouble(high);
}
@Override
public void readFields(DataInput in) throws IOException {
this.name=in.readUTF();
this.age=in.readInt();
this.high=in.readDouble();
}
@Override
public int compareTo(Pojo o) {
// 先按年龄升序排序,再按身高降序排序
int i = this.age.compareTo(o.age);
if (i == 0){
// 当前判断中指定的就是当age相同的时候,再去比较high,然后对升序取反就是降序
i = -(this.high.compareTo(o.high));
}
return i;
}
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public Integer getAge() {
return age;
}
public void setAge(Integer age) {
this.age = age;
}
public Double getHigh() {
return high;
}
public void setHigh(Double high) {
this.high = high;
}
@Override
public String toString() {
return "Pojo [name=" + name + ", age=" + age + ", high=" + high + "]";
}
}
(2).MyMapper
import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
public class MyMapper extends Mapper<Object, Text, Pojo, NullWritable>{
@Override
protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
String words = value.toString();
String []lines = words.split(" ");
String name = lines[0];
int age = Integer.parseInt(lines[1]);
double high = Double.parseDouble(lines[2]);
Pojo pojo = new Pojo(name,age,high);
context.write(pojo,NullWritable.get());
}
}
(3).MyReducer
package top.wangyq.mySort;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class MyReducer extends Reducer<Pojo, NullWritable, Pojo, NullWritable> {
@Override
protected void reduce(Pojo key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
super.reduce(key, values, context);
}
}
(4).Main
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
public class Main {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
Configuration conf = new Configuration();
// 设置连接信息
conf.set("fs.defaultFS","hdfs://master:9000");
System.setProperty("HADOOP_USER_NAME", "root");
Job job = Job.getInstance(conf);
job.setJarByClass(Main.class);
job.setMapperClass(MyMapper.class);
job.setMapOutputKeyClass(Pojo.class);
job.setMapOutputValueClass(NullWritable.class);
Path inPath = new Path("/sort/in");
Path outPath = new Path("/sort/out");
FileSystem fSystem = FileSystem.get(conf);
if (fSystem.exists(outPath)) {
fSystem.delete(outPath, true);
}
FileInputFormat.addInputPath(job, inPath);
FileOutputFormat.setOutputPath(job, outPath);
System.exit(job.waitForCompletion(true) ? 0 : 1);
}
}
7. mapreduce当中的"分组排序"详解
GroupingComparator(分组排序)是mapreduce当中reduce端的一个功能组件,==主要的作用是决定哪些key对应的数据作为一组,调用一次reduce的逻辑==,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义GroupingComparator实现不同的key作为同一个组,调用一次reduce逻辑.
(1). 分组排序步骤:
- (1),自定义类(Patitioner)继承 WritableComparable?<T>
重写compareTo()方法,实现关于排序的逻辑
@Override
public int compareTo(WritableComparable a, WritableComparable b) {
// 比较的业务逻辑
return result;
}
-
(2),自定义类(Group)继承 WritableComparator
重写 compare() 方法,实现关于分组的逻辑,将同一个订单的KV作为一组WritableComparator 作为父类,使用构造方法可以通过反射的方式创建两个比较的对象。
protected MyGroup() {
//分组类:要对OrderBean类型的k进行分组
// 在父类的构造方法中会 通过反射的方式创建orderBean对象
super(OrderBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
// 比较的业务逻辑
return result;
}
-
(3),在Main方法中指定自定义分区类 和 自定义分组类
//第三步:分区 //此时是有3个分区 job.setPartitionerClass(GroupPartitioner.class); //如果设置reduce任务数为多个,必须打包到集群运行,默认的分区数量是1 job.setNumReduceTasks(3); //第四步:排序 已经做了 //第五步:规约 combiner 省掉 //第六步:分组 自定义分组逻辑 job.setGroupingComparatorClass(MyGroup.class);
(2). 需求:现在有订单数据如下
订单id | 商品id | 成交金额 |
---|---|---|
Order_0000001 | Pdt_01 | 222.8 |
Order_0000001 | Pdt_05 | 25.8 |
Order_0000002 | Pdt_03 | 522.8 |
Order_0000002 | Pdt_04 | 122.4 |
Order_0000002 | Pdt_05 | 722.4 |
Order_0000003 | Pdt_01 | 222.8 |
- 现在需要求取每个订单当中金额最大的商品
(3). 自定义OrderBean对象
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class OrderBean implements WritableComparable<OrderBean> {
private String orderId;
private Double price;
/**
* key间的比较规则
*
* @param o
* @return
*/
@Override
public int compareTo(OrderBean o) {
//注意:如果是不同的订单之间,金额不需要排序,没有可比性
int orderIdCompare = this.orderId.compareTo(o.orderId);
if (orderIdCompare == 0) {
//比较金额,按照金额进行倒序排序
int priceCompare = this.price.compareTo(o.price);
return -priceCompare;
} else {
//如果订单号不同,没有可比性,直接返回订单号的升序排序即可
return orderIdCompare;
}
}
/**
* 序列化方法
*
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeDouble(price);
}
/**
* 反序列化方法
*
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.price = in.readDouble();
}
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
@Override
public String toString() {
return orderId + "\t" + price;
}
}
(4). 自定义mapper类:
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class GroupMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {
/**
* Order_0000001 Pdt_01 222.8
* Order_0000001 Pdt_05 25.8
* Order_0000002 Pdt_03 322.8
* Order_0000002 Pdt_04 522.4
* Order_0000002 Pdt_05 822.4
* Order_0000003 Pdt_01 222.8
* Order_0000003 Pdt_03 322.8
* Order_0000003 Pdt_04 522.4
*
* @param key
* @param value
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
String[] fields = value.toString().split("\t");
OrderBean orderBean = new OrderBean();
orderBean.setOrderId(fields[0]);
orderBean.setPrice(Double.valueOf(fields[2]));
//输出orderBean
context.write(orderBean, NullWritable.get());
}
}
(5). 自定义分区(Partitioner)类:
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;
public class GroupPartitioner extends Partitioner<OrderBean, NullWritable> {
@Override
public int getPartition(OrderBean orderBean, NullWritable nullWritable, int numPartitions) {
// 将每个订单的所有的记录,传入到一个reduce当中
return orderBean.getOrderId().hashCode() % numPartitions;
}
}
(6). 自定义分组(Group)类:
import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;
public class MyGroup extends WritableComparator {
public MyGroup() {
//分组类:要对OrderBean类型的k进行分组
// 在父类的构造方法中会 通过反射的方式创建orderBean对象
super(OrderBean.class, true);
}
@Override
public int compare(WritableComparable a, WritableComparable b) {
OrderBean a1 = (OrderBean) a;
OrderBean b1 = (OrderBean) b;
// 需要将同一订单的kv作为一组
return a1.getOrderId().compareTo(b1.getOrderId());
}
}
(7). 自定义reduce类
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class GroupReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {
/**
* Order_0000002 Pdt_03 322.8
* Order_0000002 Pdt_04 522.4
* Order_0000002 Pdt_05 822.4
* => 这一组中有3个kv
* 并且是排序的
* Order_0000002 Pdt_05 822.4
* Order_0000002 Pdt_04 522.4
* Order_0000002 Pdt_03 322.8
*
* @param key
* @param values
* @param context
* @throws IOException
* @throws InterruptedException
*/
@Override
protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
//Order_0000002 Pdt_05 822.4 获得了当前订单中进而最高的商品
// 这里没有遍历,只是输出的第一个,又因为前边排序了,所以输出的就是第一个值,恰好是最大值
context.write(key, NullWritable.get());
// 如果要保存TopN的数据(此时 N = 3),只需要采用遍历的方式实现即可
// for(int i = 0; i<3; i++){
// context.write(key, NullWritable.get());
// }
}
}
(8). 定义程序入口(Main)类
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
/**
分组求:top 1;
在本地运行的时候,可以直接在右上角的application中添加新的Application,然后在其中动态指定输入输出路径;
如果想求:top 2,那么在reduce 遍历数据的时候,遍历前两个,然后写入到文件中即可。
*/
public class GroupMain extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
//获取job对象
Job job = Job.getInstance(super.getConf(), "group");
job.setJarByClass(GroupMain.class);
//第一步:读取文件,解析成为key,value对
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job, new Path(args[0]));
//第二步:自定义map逻辑
job.setMapperClass(GroupMapper.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(NullWritable.class);
//第三步:分区
job.setPartitionerClass(GroupPartitioner.class);
//第四步:排序 已经做了
//第五步:规约 combiner 省掉
//第六步:分组 自定义分组逻辑
job.setGroupingComparatorClass(MyGroup.class);
//第七步:设置reduce逻辑
job.setReducerClass(GroupReducer.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
//第八步:设置输出路径
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job, new Path(args[1]));
//如果设置reduce任务数为多个,必须打包到集群运行
job.setNumReduceTasks(3);
boolean b = job.waitForCompletion(true);
return b ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int run = ToolRunner.run(new Configuration(), new GroupMain(), args);
System.exit(run);
}
}
网友评论