美文网首页大数据程序员
编写WordCount程序之一固定格式讲解

编写WordCount程序之一固定格式讲解

作者: 明明德撩码 | 来源:发表于2018-02-12 15:32 被阅读63次

    WordCount因果图


    MapReduce中 map和reduce函数格式

    MapReduce中,map和reduce函数遵循如下常规格式:
    map: (K1, V1) → list(K2, V2)
    reduce: (K2, list(V2)) → list(K3, V3)
    
    Mapper的基类:
    protected void map(KEY key, VALUE value, 
        Context context) throws     IOException, InterruptedException {   
     }
    
    Reducer的基类:
    protected void reduce(KEY key, Iterable<VALUE> values,
         Context context) throws IOException, InterruptedException { 
     }
    

    Context是上下文对象

    代码模板

    wordcount 代码

    代码编写依据,也就是固定写法
    input-->map--->reduce->output
    以下java代码实现此命令的功能bin/hdfs dfs jar share/hadoop/mapreduce/hadoop-mapreduce-examples-2.5.0.jar input output

    package com.lizh.hadoop.mapreduce;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class WordCount {
    
        private static Log logger = LogFactory.getLog(WordCount.class);
        //step1 Mapper class
        
        public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
            private Text mapoutputKey = new Text();
            private static final IntWritable mapOutputValues =  new IntWritable(1);//全局只有一个
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stubl
                
                
                String linevalue = value.toString();
                StringTokenizer stringTokenizer = new StringTokenizer(linevalue);
                while(stringTokenizer.hasMoreTokens()){
                    String workvalue = stringTokenizer.nextToken();
                    mapoutputKey.set(workvalue);
                    context.write(mapoutputKey, mapOutputValues);
                    logger.info("-----WordCountMapper-----"+mapOutputValues.get());
                }
            }
            
        }
        
        //step2 Reduce class
        public static class WordCountReduces extends Reducer<Text, IntWritable, Text, IntWritable>{
    
            private IntWritable reduceOutputValues =  new IntWritable();
            
            @Override
            protected void reduce(Text key, Iterable<IntWritable> vaues,Context context)
                    throws IOException, InterruptedException {
                int sum =0;
                for(IntWritable iv:vaues){
                    sum=sum+iv.get();
                }
                reduceOutputValues.set(sum);
                context.write(key, reduceOutputValues);
            }
            
        }
        
        //step3 driver component job 
        
        public int run(String[] args) throws Exception{
            //1 get configration file core-site.xml hdfs-site.xml 
            Configuration configuration = new Configuration();
            
            //2 create job
            Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
            //3 run jar
            job.setJarByClass(this.getClass());
            
            //4 set job
            //input-->map--->reduce-->output
            //4.1 input
            Path path = new Path(args[0]);
            FileInputFormat.addInputPath(job, path);
            
            //4.2 map
            job.setMapperClass(WordCountMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            
            //4.3 reduce
            job.setReducerClass(WordCountReduces.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            
            //4.4 output
            Path outputpath = new Path(args[1]);
            FileOutputFormat.setOutputPath(job, outputpath);
            
            //5 submit job
            boolean rv = job.waitForCompletion(true);
            
            return rv ? 0:1;
            
        }
        
        public static void main(String[] args) throws Exception{
            
            int rv = new WordCount().run(args);
            System.exit(rv);
        }
    }
    
    
    

    map类业务处理

    map 业务处理逻辑
    --------------input--------
    <0,hadoop yarn>
    --------------处理---------
    hadoop yarn -->split->hadoop,yarn
    -------------output-------
    <hadoop,1>
    <yarn,1>

    public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
            private Text mapoutputKey = new Text();
            //全局只有一个
            private static final IntWritable mapOutputValues =  new IntWritable(1);
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stubl
                
                
                String linevalue = value.toString();
                StringTokenizer stringTokenizer = new StringTokenizer(linevalue);
                while(stringTokenizer.hasMoreTokens()){
                    String workvalue = stringTokenizer.nextToken();
                    mapoutputKey.set(workvalue);
                    context.write(mapoutputKey, mapOutputValues);
                    logger.info("-----WordCountMapper-----"+mapOutputValues.get());
                }
            }
            
        }
    

    reduce类业务处理过程

    reduce 业务处理过程 map-->shuffle-->mapreduce

    ------------input(map的输出结果)-----------------
    <hadoop,1>
    <hadoop,1>
    <hadoop,1>
    ----------------分组----------------
    将相同key的值合并到一起,放到一个集合
    <hadoop,1>
    <hadoop,1>    ->  <hadoop,list(1,1,1)>
    <hadoop,1>
    
        //step2 Reduce class
        public static class WordCountReduces extends Reducer<Text, IntWritable, Text, IntWritable>{
    
            private IntWritable reduceOutputValues =  new IntWritable();
            
            @Override
            protected void reduce(Text key, Iterable<IntWritable> vaues,Context context)
                    throws IOException, InterruptedException {
                int sum =0;
                for(IntWritable iv:vaues){
                    sum=sum+iv.get();
                }
                reduceOutputValues.set(sum);
                context.write(key, reduceOutputValues);
            }
            
        }
    

    优化MapReduce写法

    mapReduce 继承configured类, 并实现 Tool接口
    tool接口类中的run方法重写
    configured 提供初始化工作。

    package com.lizh.hadoop.mapreduce;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class WordCountMapReduce extends Configured implements Tool {
    
        private static Log logger = LogFactory.getLog(WordCountMapReduce.class);
        //step1 Mapper class
        
        public static class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
            private Text mapoutputKey = new Text();
            private static final IntWritable mapOutputValues =  new IntWritable(1);//全局只有一个
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                // TODO Auto-generated method stubl
                
                
                String linevalue = value.toString();
                StringTokenizer stringTokenizer = new StringTokenizer(linevalue);
                while(stringTokenizer.hasMoreTokens()){
                    String workvalue = stringTokenizer.nextToken();
                    mapoutputKey.set(workvalue);
                    context.write(mapoutputKey, mapOutputValues);
                    logger.info("-----WordCountMapper-----"+mapOutputValues.get());
                }
            }
            
        }
        
        //step2 Reduce class
        public static class WordCountReduces extends Reducer<Text, IntWritable, Text, IntWritable>{
    
            private IntWritable reduceOutputValues =  new IntWritable();
            
            @Override
            protected void reduce(Text key, Iterable<IntWritable> vaues,Context context)
                    throws IOException, InterruptedException {
                int sum =0;
                for(IntWritable iv:vaues){
                    sum=sum+iv.get();
                }
                reduceOutputValues.set(sum);
                context.write(key, reduceOutputValues);
            }
            
        }
        
        //step3 driver component job 
        
        public int run(String[] args) throws Exception{
            //1 get configration file core-site.xml hdfs-site.xml 
            Configuration configuration = super.getConf();//优化
            
            //2 create job
            Job job = Job.getInstance(configuration, this.getClass().getSimpleName());
            //3 run jar
            job.setJarByClass(this.getClass());
            
            //4 set job
            //input-->map--->reduce-->output
            //4.1 input
            Path path = new Path(args[0]);
            FileInputFormat.addInputPath(job, path);
            
            //4.2 map
            job.setMapperClass(WordCountMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            
            //4.3 reduce
            job.setReducerClass(WordCountReduces.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            
            //4.4 output
            Path outputpath = new Path(args[1]);
            FileOutputFormat.setOutputPath(job, outputpath);
            
            //5 submit job
            boolean rv = job.waitForCompletion(true);//true的时候打印日志
            
            return rv ? 0:1;
            
        }
        
        public static void main(String[] args) throws Exception{
            
            //int rv = new WordCountMapReduce().run(args);
            Configuration configuration = new Configuration();
            //使用工具类运行
            int rv  = ToolRunner.run(configuration, new WordCountMapReduce(), args);
            System.exit(rv);
        }
    }
    
    

    抽象出模板

    package org.apache.hadoop.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class WordCountMapReduce extends Configured implements Tool {
    
        /**
         * Mapper Class : public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT>
         * 
         * @param args
         */
        public static class WordCountMapper extends //
                Mapper<LongWritable, Text, Text, LongWritable> {
    
            private Text mapOutputKey = new Text();
            private LongWritable mapOutputValue = new LongWritable(1);
    
            @Override
            protected void map(LongWritable key, Text value, Context context)
                    throws IOException, InterruptedException {
                
            }
        }
    
        /**
         * Reducer Class : public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT>
         * 
         * @param args
         */
        public static class WordCountReducer extends //
                Reducer<Text, LongWritable, Text, LongWritable> {
    
            private LongWritable outputValue = new LongWritable();
    
            @Override
            protected void reduce(Text key, Iterable<LongWritable> values,
                    Context context) throws IOException, InterruptedException {
                // temp sum
                
            }
        }
    
        /**
         * Driver : Create\set\submit Job
         * 
         * @param args
         * @throws Exception
         */
        public int run(String[] args) throws Exception {
            // 1.Get Configuration
            Configuration conf = super.getConf();
    
            // 2.Create Job
            Job job = Job.getInstance(conf);
            job.setJarByClass(getClass());
    
            // 3.Set Job
            // Input --> map --> reduce --> output
            // 3.1 Input
            Path inPath = new Path(args[0]);
            FileInputFormat.addInputPath(job, inPath);
    
            // 3.2 Map class
            job.setMapperClass(WordCountMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
    
            // 3.3 Reduce class
            job.setReducerClass(WordCountReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
    
            // 3.4 Output
            Path outPath = new Path(args[1]);
    
            FileSystem dfs = FileSystem.get(conf);
            if (dfs.exists(outPath)) {
                dfs.delete(outPath, true);
            }
    
            FileOutputFormat.setOutputPath(job, outPath);
    
            // 4.Submit Job
            boolean isSuccess = job.waitForCompletion(true);
            return isSuccess ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            
    
            Configuration conf = new Configuration();
        
            
            // run job
            int status = ToolRunner.run(//
                    conf,//
                    new WordCountMapReduce(),//
                    args);
    
            // exit program
            System.exit(status);
        }
    }
    

    相关文章

      网友评论

        本文标题:编写WordCount程序之一固定格式讲解

        本文链接:https://www.haomeiwen.com/subject/otrrtftx.html