美文网首页大数据BigData
hadoop入门-MapReduce实例(三)

hadoop入门-MapReduce实例(三)

作者: 文贞武毅 | 来源:发表于2019-07-31 20:18 被阅读0次

    这次尝试区分器的使用
    很多证件的号码会根据前缀的几个字符来确定一些信息,比如省份等,手机号也有这样的特征,通过前缀来区分是移动还是电信联通等,将号码根据不通的前缀汇总到不通的文件输出,这就是区分器的用途,还和上次一样,咱们先来创建一个pom.xml,然后创建FlowBean对象用来保存上传下载流量。接下来做的就不同了,我们要自己定义一个区分器:

    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    import java.util.HashMap;
    
    /**
     * 区分器,
     */
    public class ProvincePartitioner extends Partitioner<Text, FlowBean> {
        public static HashMap<String, Integer> proviceDict = new HashMap<String, Integer>();
    
        static{
            proviceDict.put("137",0);
            proviceDict.put("133",1);
            proviceDict.put("138",2);
            proviceDict.put("135",3);
        }
    
        @Override
        public int getPartition(Text key, FlowBean value, int numPartitions){
            String prefix = key.toString().substring(0,3);
            Integer province = proviceDict.get(prefix);
            return province==null?4:province;
        }
    }
    

    区分器一般都是通过一个hashMap完成的,这里我们分成5个区,为啥不是4个?因为有其他前缀的号码会被归为最后一类,然后就可以来写mapreduce主程序了:

    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class FlowCount {
        static class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean>{
            @Override
            protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context) throws IOException, InterruptedException {
                String line = value.toString();
                String[] fields = line.split("\t");
                String phone = fields[0];
                long upFlow = Long.parseLong(fields[1]);
                long dFlow = Long.parseLong(fields[2]);
    
                context.write(new Text(phone), new FlowBean(upFlow,dFlow));
            }
        }
    
        static class FlowCountReducer extends Reducer<Text, FlowBean, Text,Text>{
            @Override
            protected void reduce(Text key, Iterable<FlowBean> value, Context context ) throws IOException, InterruptedException {
                long sum_upFlow = 0;
                long sum_dFlow = 0;
                for (FlowBean bean:value){
                    sum_upFlow+=bean.getUpFlow();
                    sum_dFlow+=bean.getdFlow();
                }
    
                context.write(key, new Text((new FlowBean(sum_upFlow,sum_dFlow)).toString()));
            }
        }
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
            job.setJarByClass(FlowBean.class);
            job.setMapperClass(FlowCountMapper.class);
            job.setReducerClass(FlowCountReducer.class);
            //指定分区器
            job.setPartitionerClass(ProvincePartitioner.class);
            //指定相应分区数量的reduceTask
            job.setNumReduceTasks(5);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            boolean res = job.waitForCompletion(true);
            System.exit(res?0:1);
        }
    }
    

    需要注意,主程序里面指定了区分器和数量
    然后打包上传,让我们来看看效果怎么样吧

    ubuntu@hadoop1:~/text$ hdfs dfs -ls /output/partitioner1
    Found 6 items
    -rw-r--r--   3 ubuntu supergroup          0 2019-07-31 20:06 /output/partitioner1/_SUCCESS
    -rw-r--r--   3 ubuntu supergroup         28 2019-07-31 20:06 /output/partitioner1/part-r-00000
    -rw-r--r--   3 ubuntu supergroup         84 2019-07-31 20:06 /output/partitioner1/part-r-00001
    -rw-r--r--   3 ubuntu supergroup         28 2019-07-31 20:06 /output/partitioner1/part-r-00002
    -rw-r--r--   3 ubuntu supergroup         28 2019-07-31 20:06 /output/partitioner1/part-r-00003
    -rw-r--r--   3 ubuntu supergroup         80 2019-07-31 20:06 /output/partitioner1/part-r-00004
    
    

    输出文件从之前的1个变成了5个,可见是成功了的,然后查看其中的一个,果然只保存他的分区的内容

    ubuntu@hadoop1:~/text$ hdfs dfs -cat /output/partitioner1/part-r-00004
    14838244322 (900    500 1400)
    18273723427 (300    800 1100)
    19283413241 (500    200 700)
    
    

    相关文章

      网友评论

        本文标题:hadoop入门-MapReduce实例(三)

        本文链接:https://www.haomeiwen.com/subject/qslxdctx.html