WordCount

作者: hipeer | 来源:发表于2018-09-20 20:08 被阅读0次

WordCountMapper.java


import java.io.IOException;
import java.util.StringTokenizer;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;

/**
 * LongWritable   Text   --input   value/key
 * Text  IntWritable       --output  value/key
 */
public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
        
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
        
        /**
         * @throws InterruptedException 
         * @throws IOException 
         * 
         */
        public void map(LongWritable key, Text lineText, Context context) throws IOException, InterruptedException {
            StringTokenizer iter = new StringTokenizer(lineText.toString());
            while(iter.hasMoreTokens()) {
                word.set(iter.nextToken());
                context.write(word, one);
            }
        }
}

WordCountReducer.java


import java.io.IOException;

import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;


public class WordCountReducer extends Reducer<Text, IntWritable, Text, IntWritable> {

    //
    private IntWritable res = new IntWritable();
    
    /**
     * 
     * @param key
     * @param values
     * @param context
     * @throws IOException
     * @throws InterruptedException
     */
    public void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
        int sum = 0;
        for(IntWritable value : values) {
            sum +=  value.get();
        }
        res.set(sum);
        context.write(key, res);
    }
}

WordCountRunner.java


import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;

public class WordCountRunner extends Configured implements Tool {

    public static void main(String[] args) {
        try {
            System.exit(ToolRunner.run(new WordCountRunner(), args));
        } catch (Exception e) {
            e.printStackTrace();
        }
    }

    @Override
    public int run(String[] args) throws Exception {
        Job job = Job.getInstance(getConf(), "WordCountMR");
        job.setJarByClass(getClass());

        // input path
        FileInputFormat.addInputPath(job, new Path(args[0]));
        // input format class
        job.setInputFormatClass(TextInputFormat.class);

        // output path
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        // output format class
        job.setOutputFormatClass(TextOutputFormat.class);

        // set mapper class
        job.setMapperClass(WordCountMapper.class);
        // set mapper output key class
        job.setMapOutputKeyClass(Text.class);
        // set mapper output value class
        job.setMapOutputValueClass(IntWritable.class);
        
        job.setCombinerClass(WordCountReducer.class );
        
        // set reducer class
        job.setReducerClass(WordCountReducer.class);
        // set reducer output key class
        job.setOutputKeyClass(Text.class);
        // set reducer output value class
        job.setOutputValueClass(IntWritable.class);

        return job.waitForCompletion(true) ? 0 : 1;
    }

}


写完上面几个类之后,需要把他们打成jar包(我的是WordCountMR.jar),上传到hadoop服务器。打包时不要忘了添加Main class (就是 WordCountRunner),否则无法执行job。提交作业的命令为(其中的路径都为HDFS中的):

hadoop jar WordCountMR.jar /temp/usr/install.log /temp/out/res1

设置多个reducer

hadoop jar WordCountMR.jar /temp/usr/install.log /temp/out/res1 -D mapred.reduce.tasks=3

总结

在我第一次写完这些文件,打包到hadoop中运行的时候,出现了一个警告,但并没有终止计算程序,最后查看结果时发现,统计结果并没有合并。出现了下面的情况:

*** 1
*** 1
/usr/share/info/gnupg.info  1
/usr/share/info/grep.info.gz    1
/usr/share/info/groff.gz    1
/usr/share/info/grub.info.gz    1
/usr/share/info/ipc.info    1
/usr/share/info/libidn.info.gz  1
/usr/share/info/multiboot.info.gz   1
/usr/share/info/sed.info.gz 1
/usr/share/info/which.info.gz   1
FINISHED    1
Header  1
ID  1
INSTALLING  1
Installing  1
Installing  1
Installing  1
...

很明显这是一个bug,不过是什么原因造成的呢?我第一反应就是reduce阶段并没有发生,最后发现在WordCountReducer.java文件中我把reduce方法写成了reducer导致reduce并没有执行。修改之后的最终结果为:

*** 2
/usr/share/info/gnupg.info  1
/usr/share/info/grep.info.gz    1
/usr/share/info/groff.gz    1
/usr/share/info/grub.info.gz    1
/usr/share/info/ipc.info    1
/usr/share/info/libidn.info.gz  1
/usr/share/info/multiboot.info.gz   1
/usr/share/info/sed.info.gz 1
/usr/share/info/which.info.gz   1
FINISHED    1
Header  1
ID  1
INSTALLING  1
Installing  155

相关文章

网友评论

      本文标题:WordCount

      本文链接:https://www.haomeiwen.com/subject/ifznnftx.html