美文网首页
hadoop(8)Shuffle之Partitioner

hadoop(8)Shuffle之Partitioner

作者: Tomy_Jx_Li | 来源:发表于2018-10-20 14:25 被阅读11次

1 什么是Partitioner

前面讲过在map数据传入reduce之前需要进行数据的排序、分组。而分组就是由Partitioner。用户可以继承org.apache.hadoop.mapreduce.Partitioner基类,重写getPartition方法。方法中按照自定义逻辑返回不同的int值。

2 实战

自定义的Partitioner

package com.jiyx.test.mapred.flowStatistics;

import com.jiyx.test.mapred.flowStatistics.bo.DataBean;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Partitioner;

import java.util.HashMap;
import java.util.Map;

/**
 * 自定义Partitioner,根据不同的前缀
 * @author jiyx
 * @create 2018-10-17-21:33
 */
public class FlowStatisticsPartitioner extends Partitioner<LongWritable, DataBean> {

    private static Map<Integer, Integer> providerMap = new HashMap<>();

    static {
        // 这只分区映射
        providerMap.put(135, 1);
        providerMap.put(136, 1);
        providerMap.put(137, 1);
        providerMap.put(138, 1);
        providerMap.put(139, 1);
        providerMap.put(150, 2);
        providerMap.put(159, 2);
        providerMap.put(182, 3);
        providerMap.put(183, 3);
    }

    /**
     * 总共分了4个分区
     * @param longWritable map之后的出参key
     * @param dataBean map之后的出参value
     * @param i 共多少个分区,设置的NumReduceTasks
     * @return
     */
    @Override
    public int getPartition(LongWritable longWritable, DataBean dataBean, int i) {
        int key = Integer.parseInt(String.valueOf(longWritable.get()).substring(0, 3));
        Integer code = providerMap.get(key);
        if (code == null) {
            code = 0;
        }
        return code;
    }
}

Job主函数修改:

package com.jiyx.test.mapred.flowStatistics;

import com.jiyx.test.mapred.flowStatistics.bo.DataBean;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

/**
 * Job
 * @author jiyx
 * @create 2018-10-15-19:21
 */
public class FlowStatistics {
    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Job job = Job.getInstance();

        job.setJarByClass(FlowStatistics.class);

        job.setMapperClass(FlowStatisticsMapper.class);
        FileInputFormat.setInputPaths(job, new Path(args[0]));

        job.setReducerClass(FlowStatisticsReducer.class);
        // 这块需要注意的是自己踩了一个坑,就是将key和value整反了
        // 然后就会出现异常java.io.IOException: Initialization of all the collectors failed. Error in last collector was:java.lang.ClassCastException: class com.jiyx.test.mapred.flowStatistics.bo.DataBean
        // 所以这里最好注意下
        job.setOutputKeyClass(LongWritable.class);
        job.setOutputValueClass(DataBean.class);
        FileOutputFormat.setOutputPath(job, new Path(args[1]));

        job.setPartitionerClass(FlowStatisticsPartitioner.class);
        // 这个指定的数值必须大于等于分区数,1除外。指定1的话就是默认的伪集群分区。
        // 除了1以外,不能比FlowStatisticsPartitioner中划分的分区数量少
        // 如本例中分了4个分区(从0到3共4个),那么这里指定为2的话,只有前两个分区的数据知道去哪个reduce处理数据
        // 后两个不知道去哪个reduce处理,会报错,但是这里测试了下,有中情况是不会报错的
        // 就是当我的数据只有前两个的分区数据时,也就是虽然我分了4个分区,
        // 但是实际只会产生两个分区的数据,就不会报错,不知道算不算bug
        job.setNumReduceTasks(Integer.parseInt(args[2]));

        job.waitForCompletion(true);
    }
}

然后还是以前的流程,打jar包,上传到hadoop服务器,执行hadoop jar ......

相关文章

网友评论

      本文标题:hadoop(8)Shuffle之Partitioner

      本文链接:https://www.haomeiwen.com/subject/unxmzftx.html