MapReduce中分组组件
一、GroupComparator
定义 分组:分组是mapreduce中shuffle组件当中reduce端的一个功能组件,主要的作用是决定哪些数据作 为一组。
我们可以自定义分组实现不同的key作为同一个组
二、分组 案例
需求
经典案例:求出每一个订单中成交金额最大的一笔交易
示例数据如下
订单编号 商品编号 金额
order_001 goods_001 100
order_001 goods_002 200
order_002 goods_003 300
order_002 goods_004 400
order_002 goods_005 500
order_003 goods_001 100
预期结果:
order_001 goods_002 200
order_002 goods_005 500
order_003 goods_001 100
步骤:
1、定义实体类
2、定义Mapper
3、定义分区
4、定义分组
5、定义Reducer
6、定义主类
OrderBean.java
/**
* 订单实体类
*/
public class OrderBean implements WritableComparable<OrderBean>{
private String orderId; //订单编号
private Double price; //订单中某个商品的价格
public String getOrderId() {
return orderId;
}
public void setOrderId(String orderId) {
this.orderId = orderId;
}
public Double getPrice() {
return price;
}
public void setPrice(Double price) {
this.price = price;
}
@Override
public String toString() {
return "OrderBean{" +
"orderId='" + orderId + '\'' +
", price=" + price +
'}';
}
/**
* @param o 实体参数
* @return
* 指定排序的规则
*/
@Override
public int compareTo(OrderBean o) {
//1、先比较订单的id,如果id一样,则将订单的金额排序(降序)
int i = this.orderId.compareTo(o.orderId);
if (i == 0){
//因为是降序,所以有-1
i = this.price.compareTo(o.price) * -1;
}
return i;
}
/**
* 实现对象的序列化
* @param out
* @throws IOException
*/
@Override
public void write(DataOutput out) throws IOException {
out.writeUTF(orderId);
out.writeDouble(price);
}
/**
* 实现对象的反序列化
* @param in
* @throws IOException
*/
@Override
public void readFields(DataInput in) throws IOException {
this.orderId = in.readUTF();
this.price = in.readDouble();
}
}
OrderMapper.java
/**
* Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
* KEYIN:偏移量
* VALUEIN:一行文本
* KEYOUT:k2 OrderBean
* VALUEOUT:v2 文本
*/
public class OrderMapper extends Mapper<LongWritable,Text,OrderBean,Text> {
@Override
protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
//1、拆分行文本数据,得到订单的id和订单的金额
String[] split = value.toString().split("\t");
//2、封装OrderBean实体类
OrderBean orderBean = new OrderBean();
orderBean.setOrderId(split[0]);
orderBean.setPrice(Double.parseDouble(split[2]));
//3、写入上下文
context.write(orderBean,value);
}
}
OrderPartition.java
/**
* Partitioner<KEY, VALUE>
* KEY:k2
* VALUE:v2
*/
public class OrderPartition extends Partitioner<OrderBean,Text> {
/**
*
* @param orderBean k2
* @param text v2
* @param numPartitions ReduceTask的个数
* @return 返回的是分区的编号:比如说:ReduceTask的个数3个,返回的编号是 0 1 2
*/
@Override
public int getPartition(OrderBean orderBean, Text text, int numPartitions) {
return (orderBean.getOrderId().hashCode() & Integer.MAX_VALUE) % numPartitions;
}
}
OrderGroup.java
/**
* 订单的分组类
* 实现分组有固定的步骤:
* 1、继承WritableComparator
* 2、调用父类的构造器
* 3、指定分组的规则,重写一个方法
*/
public class OrderGroup extends WritableComparator {
//1、继承WritableComparator类
//2、调用父类的构造器
public OrderGroup(){
//第一个参数就是分组使用的javabean,第二个参数就是布尔类型,表示是否可以创建这个类的实例
super(OrderBean.class,true);
}
// 3、指定分组的规则,需要重写一个方法
/**
* @param a WritableComparable是接口,Orderbean实现了这个接口
* @param b WritableComparable是接口,Orderbean实现了这个接口
* @return
*/
@Override
public int compare(WritableComparable a, WritableComparable b) {
//1、对形参a b 做强制类型转换
OrderBean first = (OrderBean) a;
OrderBean second = (OrderBean) b;
//2、指定分组的规则
return first.getOrderId().compareTo(second.getOrderId());
}
}
OrderReducer.java
/**
* Reducer
* Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
* KEYIN:k2
* VALUEIN:v2
* KEYOUT :k3 一行文本
* VALUEOUT:v3 NullWritable
*
*/
public class OrderReducer extends Reducer<OrderBean,Text,Text,NullWritable> {
@Override
protected void reduce(OrderBean key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
int i = 0;
//获取top N ,下面的代码就是取出来top1。
for (Text value : values) {
context.write(value,NullWritable.get());
i++;
if (i >= 1){
break;
}
}
}
}
JobMain.java
/**
* 求订单最大值的主类
*/
public class JobMain {
public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
//一、初始化一个job
Configuration configuration = new Configuration();
Job job = Job.getInstance(configuration, "mygroup");
//二、配置Job信息
//1、设置输入信息
job.setInputFormatClass(TextInputFormat.class);
TextInputFormat.addInputPath(job,new Path("D://input/orders.txt"));
//2、设置mapper
job.setMapperClass(OrderMapper.class);
job.setMapOutputKeyClass(OrderBean.class);
job.setMapOutputValueClass(Text.class);
//3 4 5 6 shuffle
//分区设置
job.setPartitionerClass(OrderPartition.class);
//分组设置
job.setGroupingComparatorClass(OrderGroup.class);
//7、设置Reducer
job.setReducerClass(OrderReducer.class);
job.setOutputKeyClass(OrderBean.class);
job.setOutputValueClass(NullWritable.class);
//8、设置输出
job.setOutputFormatClass(TextOutputFormat.class);
TextOutputFormat.setOutputPath(job,new Path("D://mygroup_out"));
//三、等待完成
boolean b = job.waitForCompletion(true);
System.out.println(b);
System.exit(b ? 0 : 1);
}
}
网友评论