美文网首页
4.MapReduce框架原理2 - shuffle combi

4.MapReduce框架原理2 - shuffle combi

作者: 我爱阿桑 | 来源:发表于2021-10-11 21:54 被阅读0次

1.Shuffle机制

Map方法之后,Reduce方法之前的数据处理过程称之为Shuffle。


image.png

2.Shuffle排序机制

  • hadoop排序是使用WritableComparator对象
  • 实现排序的方法:
  • 1.直接让参与对比的对象实现WritableComparable 接口,并指定泛型,实现compareTo方法,实现比较规则
  • 2.自定义一个比较器对象,需要继承WritableComparator类,重写compare的方法。注意在构造器中调用父类对当前要参与比较的对象进行实例化。当前要参与比较的对象必须要实现WritableComparable接口,最后在driver类中指定自定义的比较器对象
//自定义的比较器对象
public class FlowBeanWritableComparator extends WritableComparator {

    // 指定当前自定义的比较器对象为谁服务
   // 注意在构造器中调用父类对当前要参与比较的对象进行实例化。
    public FlowBeanWritableComparator() {
        super(FlowBean.class, true);
    }

    /**
     * 自定义比较规则
     * @param a
     * @param b
     * @return
     */
    @Override
    public int compare(WritableComparable a, WritableComparable b) {
        FlowBean abean = (FlowBean) a;
        FlowBean bbean = (FlowBean) b;
        System.out.println("aa"+abean);
        System.out.println("bb"+bbean);
        return -abean.getSumFlow().compareTo(((FlowBean) b).getSumFlow());
    }
}


// 指定自定义的比较器对象
 job.setSortComparatorClass(FlowBeanWritableComparator.class);

3.Shuffle排序源码分析

 // 为当前Job中的对象获取比较器对象
    comparator = job.getOutputKeyComparator();
    // 获取比较器对象的核心逻辑
    public RawComparator getOutputKeyComparator() {
    // 在当前Job中获取比较器对象的class文件--> mapreduce.job.output.key.comparator.class
    Class<? extends RawComparator> theClass = getClass(
    JobContext.KEY_COMPARATOR, null, RawComparator.class);
    // 如果通过JobContext.KEY_COMPARATOR 获取到比较器对象
    if (theClass != null){
         return ReflectionUtils.newInstance(theClass, this);
    }
     // 如果通过JobContext.KEY_COMPARATOR 获取不到比较器对象
     // Hadoop 会默认获取比较器对象 通过调用WritableComparator对象的get方法获取,
     // 在获取之前有个前提 判断当前job的MapOutputKeyClass 是否实现了WritableComparable接口
    return WritableComparator.get(getMapOutputKeyClass().asSubclass(WritableComparable.class), this);
            }
            

4.hadoop如何给自身的数据类型获取比较器

1). 自身的数据类型已经实现WritableComparable接口
2). 自身的数据类型对象中 已经通过构造函数创建比较器对象

 // 以Text为例
   public static class Comparator extends WritableComparator {
    public Comparator() {
    super(Text.class);
    }

3). 自身的数据类型对象中 通过静态代码块把 当前对象的class 和 它的比较器对象
放入一个Map进行了维护。

static {
    // register this comparator
    WritableComparator.define(Text.class, new Comparator());
}
                  
 public static void define(Class c, WritableComparator comparator) {
    comparators.put(c, comparator);
}

5.Shuffle的combiner流程使用和注意事项

概念:是Shuffle过程中的一个可选流程(优化手段)
可以为Map阶段计算完的数据进行提前汇总,主要考虑到 减少 从Map阶段到
Reduce阶段的数据传输的大小控制以及减少Reduce端的计算压力。
使用场景:当不考虑多个MapTask的整体数据关联关系的时候才使用。

相关文章

网友评论

      本文标题:4.MapReduce框架原理2 - shuffle combi

      本文链接:https://www.haomeiwen.com/subject/sunholtx.html