MapReduce

作者: 歌哥居士 | 来源:发表于2019-03-29 16:02 被阅读0次

词频示例

文件 wcFile

baozi hello
baozi hi
baozi chi
baozi roubaozi

代码

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class WordCount {

    /**
     * Map
     */
    public static class MapClass extends Mapper<LongWritable, Text, Text, LongWritable> {
        LongWritable one = new LongWritable(1);
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            // 切分
            String[] worlds = value.toString().split(" ");
            for (String world : worlds) {
                // (world,1)
                context.write(new Text(world), one);
            }
        }
    }

    /**
     * Reduce
     */
    public static class ReduceClass extends Reducer<Text, LongWritable, Text, LongWritable> {
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            // 累加 world,{1,1,1,1,1...}
            long sum = 0;
            for (LongWritable v : values) {
                sum += v.get();
            }
            // (world,N)
            context.write(key, new LongWritable(sum));
        }
    }


    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        // 输出文件路径已存在删除
        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);
        FileSystem fileSystem = FileSystem.get(conf);
        if (fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath, true);
        }

        // Job处理类
        Job job = Job.getInstance(conf,"wordCount");
        job.setJarByClass(WordCount.class);

        // 输入输出文件路径
        FileInputFormat.addInputPath(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        // 设置map相关参数
        job.setMapperClass(MapClass.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        // 设置reduce相关参数
        job.setReducerClass(ReduceClass.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 退出
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }

}

运行

$ mvn clean package -DskipTests
$ scp target/hadoop-spring-learning-1.0-SNAPSHOT.jar  user000@host000:~/doc
$ hadoop jar ~/doc/hadoop-spring-learning-1.0-SNAPSHOT.jar  \
       WordCount \
       hdfs://host000:8020/wcFile \
       hdfs://host000:8020/output/
$ hdfs dfs -cat /output/part-r-00000

Partitioner

Partitioner:结果一样的统一输出到相同地方。
文件 salesFile

xiaomi 200
huawei 100
xiaomi 300
iphone7 200
huawei 200
xiaomi 300
others 100

代码

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.Mapper;
import org.apache.hadoop.mapreduce.Partitioner;
import org.apache.hadoop.mapreduce.Reducer;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

import java.io.IOException;

public class PartitionerApp {

    public static class MapClass extends Mapper<LongWritable, Text, Text, LongWritable> {
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            String[] words = value.toString().split(" ");
            // {xiaomi,200}
            context.write(
                    new Text(words[0]),
                    new LongWritable(Long.parseLong(words[1]))
            );
        }
    }

    public static class ReduceClass extends Reducer<Text, LongWritable, Text, LongWritable> {
        @Override
        protected void reduce(Text key, Iterable<LongWritable> values, Context context) throws IOException, InterruptedException {
            // xiaomi,{200,300,400...}
            long sum = 0;
            for (LongWritable v : values) {
                sum += v.get();
            }
            // (xiaomi, N)
            context.write(key, new LongWritable(sum));
        }
    }

    public static class PartitionerClass extends Partitioner<Text, LongWritable> {
        @Override
        public int getPartition(Text text, LongWritable longWritable, int numPartitions) {
            if ("xiaomi".equals(text.toString())) {
                return 0;
            } else if ("huawei".equals(text.toString())) {
                return 1;
            } else if ("iphone7".equals(text.toString())) {
                return 2;
            }
            return 3;
        }
    }

    public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
        Configuration conf = new Configuration();
        // 输出文件路径已存在删除
        Path inputPath = new Path(args[0]);
        Path outputPath = new Path(args[1]);
        FileSystem fileSystem = FileSystem.get(conf);
        if (fileSystem.exists(outputPath)) {
            fileSystem.delete(outputPath, true);
        }

        // Job处理类
        Job job = Job.getInstance(conf,"wordCount");
        job.setJarByClass(WordCount.class);

        // 输入输出文件路径
        FileInputFormat.addInputPath(job, inputPath);
        FileOutputFormat.setOutputPath(job, outputPath);

        // 设置map相关参数
        job.setMapperClass(MapClass.class);
        job.setMapOutputKeyClass(Text.class);
        job.setMapOutputValueClass(LongWritable.class);

        // 设置reduce相关参数
        job.setReducerClass(ReduceClass.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(LongWritable.class);

        // 设置Partitioner
        job.setPartitionerClass(PartitionerClass.class);
        //设置4个reducer,每个分区一个,不加体现不出Partitioner
        job.setNumReduceTasks(4);

        // 退出
        System.exit(job.waitForCompletion(true) ? 0 : 1);
    }


}

Combiner

combiner在求和、次数的等汇总统计可以用。combiner设置了也未必会执行。
例如,求平均数,一边是2、7,一边是3、5。 左边使用了combiner,右边没有使用,得到的平均数与本应该得到的不符。
例如,求合汇总,一边是2、7,一边是3、5。 左边使用了combiner,右边没有使用,得到的和都不会改变。

job.setCombinerClass(Reducer.class); // combiner逻辑上和reduce一样

相关文章

网友评论

      本文标题:MapReduce

      本文链接:https://www.haomeiwen.com/subject/aofxvqtx.html