美文网首页
离线计算组件篇-MapReduce-自定义排序与分组排序

离线计算组件篇-MapReduce-自定义排序与分组排序

作者: CoderInsight | 来源:发表于2022-11-29 16:24 被阅读0次

6.MapReduce 实现自定义排序 ★★★

  • 排序是MapReduce框架中最重要的操作之一。

  • MapTask和ReduceTask均会对数据按照key进行排序。该操作属于Hadoop的默认行为。任何应用程序中的数据均会被排序,而不管逻辑上是否需要。

  • 默认排序是按照字典顺序排序,且实现该排序的方法是快速排序。

  • 对于MapTask,它会将处理的结果暂时放到环形缓冲区中,当环形缓冲区使用率达到一定阈值后,再对缓冲区中的数据进行一次快速排序,并将这些有序数据溢写到磁盘上,而当数据处理完毕后,它会对磁盘上所有文件进行归并排序。

  • 对于ReduceTask,它从每个MapTask上远程拷贝相应的数据文件,如果文件大小超过一定阈值,则溢写磁盘上,否则存储在内存中。如果磁盘上文件数目达到一定阈值,则进行一次归并排序以生成一个更大文件;如果内存中文件大小或者数目超过一定阈值,则进行一次合并后将数据溢写到磁盘上。当所有数据拷贝完毕后,ReduceTask统一对内存和磁盘上的所有数据进行一次归并排序。

  • 各种排序的分类:

    • 1、部分排序

      MapReduce根据输入记录的键对数据集排序。保证输出的每个文件内部有序

    • 2、全排序

      最终输出结果只有一个文件,且文件内部有序。实现方式是只设置一个ReduceTask。但该方法在处理大型文件时效率极低,因为一台机器处理所有文件,完全丧失了MapReduce所提供的并行架构

    • 3、辅助排序

      在Reduce端对key进行分组。应用于:在接收的key为bean对象时,想让一个或几个字段相同(全部字段比较不相同)的key进入到同一个reduce方法时,可以采用分组排序。

    • 4、二次排序

      在自定义排序过程中,如果compareTo中的判断条件为两个即为二次排序。

(1).Pojo

package top.wangyq.mySort;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

import org.apache.hadoop.io.WritableComparable;

public class Pojo implements WritableComparable<Pojo>{

    String name;
    Integer age;
    Double high;

    public Pojo() {
        super();
    }

    public Pojo(String name, int age, double high) {
        super();
        this.name = name;
        this.age = age;
        this.high = high;
    }


    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(name);
        out.writeInt(age);
        out.writeDouble(high);
    }


    @Override
    public void readFields(DataInput in) throws IOException {
        this.name=in.readUTF();
        this.age=in.readInt();
        this.high=in.readDouble();
    }

    @Override
    public int compareTo(Pojo o) {
        // 先按年龄升序排序,再按身高降序排序
        int i = this.age.compareTo(o.age);

        if (i == 0){
            // 当前判断中指定的就是当age相同的时候,再去比较high,然后对升序取反就是降序
            i = -(this.high.compareTo(o.high));
        }

        return i;
    }


    public String getName() {
        return name;
    }

    public void setName(String name) {
        this.name = name;
    }

    public Integer getAge() {
        return age;
    }

    public void setAge(Integer age) {
        this.age = age;
    }

    public Double getHigh() {
        return high;
    }

    public void setHigh(Double high) {
        this.high = high;
    }

    @Override
    public String toString() {
        return "Pojo [name=" + name + ", age=" + age + ", high=" + high + "]";
    }


}

(2).MyMapper

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

public class MyMapper extends Mapper<Object, Text, Pojo, NullWritable>{
    
    @Override
    protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
        String words = value.toString();
        String []lines = words.split(" ");

        String name = lines[0];
        int age = Integer.parseInt(lines[1]);
        double high = Double.parseDouble(lines[2]);
        
        Pojo pojo = new Pojo(name,age,high);
        
        context.write(pojo,NullWritable.get());
    }


}

(3).MyReducer

package top.wangyq.mySort;

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class MyReducer extends Reducer<Pojo, NullWritable, Pojo, NullWritable> {

    @Override
    protected void reduce(Pojo key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        super.reduce(key, values, context);
    }
}

(4).Main

import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

public class Main {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {

        Configuration conf = new Configuration();
        // 设置连接信息
        conf.set("fs.defaultFS","hdfs://master:9000");

        System.setProperty("HADOOP_USER_NAME", "root");

        Job job = Job.getInstance(conf);
        job.setJarByClass(Main.class);
        job.setMapperClass(MyMapper.class);

        job.setMapOutputKeyClass(Pojo.class);
        job.setMapOutputValueClass(NullWritable.class);

        Path inPath = new Path("/sort/in");
        Path outPath = new Path("/sort/out");

        FileSystem fSystem = FileSystem.get(conf);
        if (fSystem.exists(outPath)) {
            fSystem.delete(outPath, true);
        }

        FileInputFormat.addInputPath(job, inPath);
        FileOutputFormat.setOutputPath(job, outPath);

        System.exit(job.waitForCompletion(true) ? 0 : 1);   
    }
    
}

7. mapreduce当中的"分组排序"详解

GroupingComparator(分组排序)是mapreduce当中reduce端的一个功能组件,==主要的作用是决定哪些key对应的数据作为一组,调用一次reduce的逻辑==,默认是每个不同的key,作为多个不同的组,每个组调用一次reduce逻辑,我们可以自定义GroupingComparator实现不同的key作为同一个组,调用一次reduce逻辑.

(1). 分组排序步骤:

  • (1),自定义类(Patitioner)继承 WritableComparable?<T>
    重写compareTo()方法,实现关于排序的逻辑
@Override
public int compareTo(WritableComparable a, WritableComparable b) {
        // 比较的业务逻辑
        return result;
}
  • (2),自定义类(Group)继承 WritableComparator
    重写 compare() 方法,实现关于分组的逻辑,将同一个订单的KV作为一组

    WritableComparator 作为父类,使用构造方法可以通过反射的方式创建两个比较的对象。

protected MyGroup() {
      //分组类:要对OrderBean类型的k进行分组
      // 在父类的构造方法中会 通过反射的方式创建orderBean对象
      super(OrderBean.class, true);
}

@Override
public int compare(WritableComparable a, WritableComparable b) {
        // 比较的业务逻辑
        return result;
}
  • (3),在Main方法中指定自定义分区类 和 自定义分组类

    //第三步:分区
    //此时是有3个分区
    job.setPartitionerClass(GroupPartitioner.class);
    //如果设置reduce任务数为多个,必须打包到集群运行,默认的分区数量是1
    job.setNumReduceTasks(3);
    
    //第四步:排序  已经做了
    
    //第五步:规约  combiner  省掉
    
    //第六步:分组   自定义分组逻辑
    job.setGroupingComparatorClass(MyGroup.class);
    

(2). 需求:现在有订单数据如下

订单id 商品id 成交金额
Order_0000001 Pdt_01 222.8
Order_0000001 Pdt_05 25.8
Order_0000002 Pdt_03 522.8
Order_0000002 Pdt_04 122.4
Order_0000002 Pdt_05 722.4
Order_0000003 Pdt_01 222.8
  • 现在需要求取每个订单当中金额最大的商品

(3). 自定义OrderBean对象

import org.apache.hadoop.io.WritableComparable;

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;

public class OrderBean implements WritableComparable<OrderBean> {
    private String orderId;
    private Double price;

    /**
     * key间的比较规则
     *
     * @param o
     * @return
     */
    @Override
    public int compareTo(OrderBean o) {
        //注意:如果是不同的订单之间,金额不需要排序,没有可比性
        int orderIdCompare = this.orderId.compareTo(o.orderId);
        if (orderIdCompare == 0) {
            //比较金额,按照金额进行倒序排序
            int priceCompare = this.price.compareTo(o.price);
            return -priceCompare;
        } else {
            //如果订单号不同,没有可比性,直接返回订单号的升序排序即可
            return orderIdCompare;
        }
    }

    /**
     * 序列化方法
     *
     * @param out
     * @throws IOException
     */
    @Override
    public void write(DataOutput out) throws IOException {
        out.writeUTF(orderId);
        out.writeDouble(price);
    }

    /**
     * 反序列化方法
     *
     * @param in
     * @throws IOException
     */
    @Override
    public void readFields(DataInput in) throws IOException {
        this.orderId = in.readUTF();
        this.price = in.readDouble();
    }

    public String getOrderId() {
        return orderId;
    }

    public void setOrderId(String orderId) {
        this.orderId = orderId;
    }

    public Double getPrice() {
        return price;
    }

    public void setPrice(Double price) {
        this.price = price;
    }

    @Override
    public String toString() {
        return orderId + "\t" + price;
    }
}

(4). 自定义mapper类:

import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

import java.io.IOException;

public class GroupMapper extends Mapper<LongWritable, Text, OrderBean, NullWritable> {

    /**
     * Order_0000001    Pdt_01  222.8
     * Order_0000001    Pdt_05  25.8
     * Order_0000002    Pdt_03  322.8
     * Order_0000002    Pdt_04  522.4
     * Order_0000002    Pdt_05  822.4
     * Order_0000003    Pdt_01  222.8
     * Order_0000003    Pdt_03  322.8
     * Order_0000003    Pdt_04  522.4
     *
     * @param key
     * @param value
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
        String[] fields = value.toString().split("\t");

        OrderBean orderBean = new OrderBean();
        orderBean.setOrderId(fields[0]);
        orderBean.setPrice(Double.valueOf(fields[2]));

        //输出orderBean
        context.write(orderBean, NullWritable.get());
    }
}

(5). 自定义分区(Partitioner)类:

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Partitioner;

public class GroupPartitioner extends Partitioner<OrderBean, NullWritable> {
    @Override
    public int getPartition(OrderBean orderBean, NullWritable nullWritable, int numPartitions) {
        // 将每个订单的所有的记录,传入到一个reduce当中
        return orderBean.getOrderId().hashCode() % numPartitions;
    }
}

(6). 自定义分组(Group)类:

import org.apache.hadoop.io.WritableComparable;
import org.apache.hadoop.io.WritableComparator;

public class MyGroup extends WritableComparator {
    public MyGroup() {
        //分组类:要对OrderBean类型的k进行分组
        // 在父类的构造方法中会 通过反射的方式创建orderBean对象
        super(OrderBean.class, true);
    }

    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        OrderBean a1 = (OrderBean) a;
        OrderBean b1 = (OrderBean) b;
        // 需要将同一订单的kv作为一组
        return a1.getOrderId().compareTo(b1.getOrderId());
    }
}

(7). 自定义reduce类

import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Reducer;

import java.io.IOException;

public class GroupReducer extends Reducer<OrderBean, NullWritable, OrderBean, NullWritable> {

    /**
     * Order_0000002    Pdt_03  322.8
     * Order_0000002    Pdt_04  522.4
     * Order_0000002    Pdt_05  822.4
     * => 这一组中有3个kv
     * 并且是排序的
     * Order_0000002    Pdt_05  822.4
     * Order_0000002    Pdt_04  522.4
     * Order_0000002    Pdt_03  322.8
     *
     * @param key
     * @param values
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    @Override
    protected void reduce(OrderBean key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
        //Order_0000002 Pdt_05  822.4 获得了当前订单中进而最高的商品
        // 这里没有遍历,只是输出的第一个,又因为前边排序了,所以输出的就是第一个值,恰好是最大值
        context.write(key, NullWritable.get());
        
//        如果要保存TopN的数据(此时 N = 3),只需要采用遍历的方式实现即可        
//        for(int i = 0; i<3; i++){
//            context.write(key, NullWritable.get());
//        }
        
    }
}

(8). 定义程序入口(Main)类

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

/**

 分组求:top 1;
 在本地运行的时候,可以直接在右上角的application中添加新的Application,然后在其中动态指定输入输出路径;
 如果想求:top 2,那么在reduce 遍历数据的时候,遍历前两个,然后写入到文件中即可。
 */
public class GroupMain extends Configured implements Tool {
    @Override
    public int run(String[] args) throws Exception {
        //获取job对象
        Job job = Job.getInstance(super.getConf(), "group");
        job.setJarByClass(GroupMain.class);

        //第一步:读取文件,解析成为key,value对
        job.setInputFormatClass(TextInputFormat.class);
        TextInputFormat.addInputPath(job, new Path(args[0]));

        //第二步:自定义map逻辑
        job.setMapperClass(GroupMapper.class);
        job.setMapOutputKeyClass(OrderBean.class);
        job.setMapOutputValueClass(NullWritable.class);

        //第三步:分区
        job.setPartitionerClass(GroupPartitioner.class);

        //第四步:排序  已经做了

        //第五步:规约  combiner  省掉

        //第六步:分组   自定义分组逻辑
        job.setGroupingComparatorClass(MyGroup.class);

        //第七步:设置reduce逻辑
        job.setReducerClass(GroupReducer.class);
        job.setOutputKeyClass(OrderBean.class);
        job.setOutputValueClass(NullWritable.class);

        //第八步:设置输出路径
        job.setOutputFormatClass(TextOutputFormat.class);
        TextOutputFormat.setOutputPath(job, new Path(args[1]));

        //如果设置reduce任务数为多个,必须打包到集群运行
        job.setNumReduceTasks(3);

        boolean b = job.waitForCompletion(true);
        return b ? 0 : 1;
    }

    public static void main(String[] args) throws Exception {
        int run = ToolRunner.run(new Configuration(), new GroupMain(), args);
        System.exit(run);
    }
}

相关文章

  • 离线计算组件篇-MapReduce-自定义InputFormat

    5.自定义 inputFotmat 通过自定义 inputFormat 来实现“将小文件批量合并成 Sequenc...

  • 排序算法 希尔、归并排序

    希尔排序与归并排序都是分组进行比较,但是希尔排序是组间比较,归并排序是组内排序 希尔排序 设置一个增量gap,从第...

  • 13、排序和筛选

    1、排序和自定义排序 数值排序: 点击需要排序的一列一个格子,排序 自定义排序,比如:添加关键...

  • 经典排序算法-希尔排序Shell sort

    一、希尔排序思想 希尔排序是基于插入排序的快速的排序算法,先分组后对每组进行直接插入排序,再分组再直接执行插入排序...

  • Sql 分组自定义排序

    1、实现效果 使用去重或分组,数据库默认按字符排序,特定场景下需自定义排序原本思路,先去重,再联表保证自定义顺序改...

  • Excel 2016 For Mac 数据透视表基础应用四

    Excel 2016 For Mac 数据透视表基础应用四——排序、筛选与切片器一、使用排序筛选1、自定义排序顺序...

  • 推荐Rerank二次重排序算法

    前言 推荐的Rerank排序有两种情况,一个是离线计算的时候为每个用户提前用Rerank排序算法算好推荐结果,另一...

  • C# Lambd表达式

    分组排序-按X分组,将总结果数量少的排在前面 运行结果 多级排序,优先第一列排序,其次第二列排序 运行结果

  • 学习office——Excel 排序与筛选

    一、排序 1、多条件排序使用自定义排序 2、按颜色排序 使用自定义排序 3、工资条 复制表头——下拉复制更多表头—...

  • 2021-08-05

    1、排序1.1 排序可以嵌套,开始栏中选排序,选自定义排序,选择主要排序和次要排序,实现排序的嵌套1.2 排序中可...

网友评论

      本文标题:离线计算组件篇-MapReduce-自定义排序与分组排序

      本文链接:https://www.haomeiwen.com/subject/banffdtx.html