美文网首页
大数据(5):MapReduce 编程

大数据(5):MapReduce 编程

作者: 小道萧兮 | 来源:发表于2020-06-25 16:17 被阅读0次

    在上一篇《大数据(4):MapReduce 简介》中简单说了一些 MapReduce 的组成及其工作原理。这篇文章将从编程方面来看看其具体使用。

    一、单词统计

    单词统计在 MapReduce 中相当于 Hello World。目的是从一个文件中,统计每个单词出现的次数。例如有一个文件有如下内容:

    hello world tom
    tom hello world
    how are you tom
    tom how old are you
    

    统计该文件每个单词出现了多少次。

    1、导入 maven 依赖。

    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-client</artifactId>
        <version>3.2.1</version>
    </dependency>
    

    2、Map 端编程
    对每一行用空格切割,把切割后的单词按照 (word, 1) 的形式发给 Reduce。

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class WCMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            for (String str : value.toString().split(" ")) {
                context.write(new Text(str), new IntWritable(1));
            }
        }
    }
    

    3、Reduce 端编程
    reduce 中已经按 key 聚合了(shuffle 阶段按 key 聚合),因此遍历 values 并统计,就能知道每个单词的个数。

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class WCReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
    
            int count = 0;
            for (IntWritable i : values) {
                count += i.get();
            }
            context.write(key, new IntWritable(count));
        }
    }
    

    4、主函数类

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class WordCount {
    
        public static void main(String[] args) throws Exception {
            Job job = Job.getInstance();
    
            job.setJobName("WCApp");
            job.setJarByClass(WordCount.class);
            job.setInputFormatClass(TextInputFormat.class);
    
            // 文件输入/输出路径
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            // combiner
            job.setCombinerClass(WCReducer.class);
    
            // map 设置
            job.setMapperClass(WCMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            // reduce 设置
            job.setNumReduceTasks(1);
            job.setReducerClass(WCReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            job.waitForCompletion(true);
        }
    }
    

    二、年份气温统计

    已知一些年份每月的气温,需要统计这些年份的最高气温,例如:

    1900 30°C
    1900 34°C
    1900 35°C
    1900 35°C
    ...
    1901 32°C
    1901 34°C
    1901 35°C
    1901 36°C
    1901 28°C
    

    输出:(1900, 35), (1901, 36)

    这里使用二次排序的方法。为了实现这一目标,我们将 key 变为复合:年份和温度的组合。然后把 key 的排序顺序是按年份递增,然后按温度递减:

    1900 35°C
    1900 35°C
    1900 34°C
    ...
    1901 36°C
    1901 35°C
    

    这样只需要取出每一年的第一条记录就是该年的最高气温。

    为实现这样的 Key,我们自定义一个 ComboKey,将年份和气温组合起来。然后实现 WritableComparable 接口,并重写 compareTo 方法。

    import org.apache.hadoop.io.WritableComparable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    public class ComboKey implements WritableComparable<ComboKey> {
    
        private int year;
        private int temp;
    
        public int compareTo(ComboKey o) {
            int y0 = o.getYear();
            int t0 = o.getTemp();
            // 年份相同(升序)
            if (this.year == y0) {
                //气温降序
                return -(temp - t0);
            } else {
                return year - y0;
            }
        }
    
        public void write(DataOutput dataOutput) throws IOException {
            dataOutput.writeInt(year);
            dataOutput.writeInt(temp);
        }
    
        public void readFields(DataInput dataInput) throws IOException {
            year = dataInput.readInt();
            temp = dataInput.readInt();
        }
    }
    

    如果只是使用 ComboKey,是还不够,因为同一年的记录将具有不同的 key,因此通常不会进到同一个 reducer。例如,key1 = (1900, 35°C) 和 key2 = (1900, 34°C) 会进入不同的 reducer。

    因此需要自定义分区器,设置为按 key 的年份分区,这样可以保证同一年的记录进入同一个 reducer。

    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class YearPartitioner extends Partitioner<ComboKey, NullWritable> {
    
        public int getPartition(ComboKey key, NullWritable nullWritable, int numPartitions) {
            return key.getYear() % numPartitions;
        }
    }
    

    然而,这仍然不足以实现我们的目标。分区器只确保一个 reducer 接收一年内的所有记录;它不会更改 reduce 在分区内按键分组:

    解决办法是自定义分组。如果按 key 的年份对 reducer 中的值进行分组,同一年的所有记录将在一个 reduce 组中。因为它们是按温度降序排列的,所以第一个是最高温度:

    年份分组的代码如下:

    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class YearGroupComparator extends WritableComparator {
    
        protected YearGroupComparator() {
            super(ComboKey.class, true);
        }
    
        @Override
        public int compare(WritableComparable a, WritableComparable b) {
            ComboKey k1 = (ComboKey) a;
            ComboKey k2 = (ComboKey) b;
            return k1.getYear() - k2.getYear();
        }
    }
    

    自定义 ComboKey 的比较方法,即按照年份升序,气温降序。

    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    public class ComboKeyComparator extends WritableComparator {
    
        protected ComboKeyComparator() {
            super(ComboKey.class, true);
        }
    
        public int compare(WritableComparable a, WritableComparable b) {
            ComboKey k1 = (ComboKey) a;
            ComboKey k2 = (ComboKey) b;
            return k1.compareTo(k2);
        }
    }
    

    Mapper 端代码实现,将输入按空格分隔,组合 key 的第 0 个值是年份,第 1 个值是气温,value 为空(NullWritable)。

    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    import java.util.Arrays;
    
    public class MaxTempMapper extends Mapper<LongWritable, Text, ComboKey, NullWritable> {
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] arr = value.toString().split(" ");
            context.write(new ComboKey(Integer.parseInt(arr[0]), Integer.parseInt(arr[1])), NullWritable.get());
        }
    }
    

    Reducer 中直接取第一个结果即可。

    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    public class MaxTempReducer extends Reducer<ComboKey, NullWritable, IntWritable, IntWritable> {
    
        @Override
        protected void reduce(ComboKey key, Iterable<NullWritable> values, Context context) throws IOException, InterruptedException {
            context.write(new IntWritable(key.getYear()), new IntWritable(key.getTemp()));
        }
    }
    

    main 函数中,设置输入/输出格式,配置 maper 和 reducer 的参数等。

    package com.hezongjiang.maxtemp;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class MaxTempApp {
    
        public static void main(String[] args) throws Exception {
    
            Job job = Job.getInstance();
    
            job.setJobName("MaxTempApp");
            job.setJarByClass(MaxTempApp.class);
            job.setInputFormatClass(TextInputFormat.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            job.setMapperClass(MaxTempMapper.class);
            job.setMapOutputKeyClass(ComboKey.class);
            job.setMapOutputValueClass(NullWritable.class);
    
            // 分区
            job.setPartitionerClass(YearPartitioner.class);
            // 分组
            job.setGroupingComparatorClass(YearGroupComparator.class);
            // 设置排序对比器
            job.setSortComparatorClass(ComboKeyComparator.class);
    
            job.setReducerClass(MaxTempReducer.class);
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(IntWritable.class);
    
            job.setNumReduceTasks(3);
    
            job.waitForCompletion(true);
        }
    }
    

    相关文章

      网友评论

          本文标题:大数据(5):MapReduce 编程

          本文链接:https://www.haomeiwen.com/subject/zuxsfktx.html