美文网首页
MapReduce八股文

MapReduce八股文

作者: _helloliang | 来源:发表于2016-12-11 08:52 被阅读55次

    Mapper

    
    class MapperModule extends Mapper<LongWritable, Text, Text, IntWritable> {
    
        @Override
        protected void setup(Context context) 
                throws IOException, InterruptedException {
            // TODO
    
        }
    
    
        @Override
        protected void map(LongWritable key, Text value, Context context)
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
    
        }
    
        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            // TODO
    
        }
    }
    

    Reducer

    class ReducerModule extends Reducer<Text, IntWritable, Text, IntWritable> {
    
    
        @Override
        protected void setup(Context context) 
                throws IOException, InterruptedException {
            // TODO
    
        }
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values,
                           Context context) 
                throws IOException, InterruptedException {
            // TODO Auto-generated method stub
    
        }
    
        @Override
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
            // TODO
    
        }
    
    }
    

    Runner

    public class RunnerModule implements Tool {
        private Configuration conf = null;
    
        // setConf
        public void setConf(Configuration conf) {
            conf.set("fs.defaultFS", "hdfs://hive.liangxw.CentOS.com:9000");
            conf.setInt("RUN_TIMES", 1);
            // 设置系统变量,解决hdfs权限问题
            System.setProperty("HADOOP_USER_NAME", "liangxw");
            
            // 设置分隔符
            conf.set("mapreduce.input.keyvaluelinerecordreader.key.value.separator", "#");
            // 设置map输出压缩
            conf.set("mapreduce.map.output.compress", "true");
            conf.set("mapreduce.map.output.compress.codec", "org.apache.hadoop.io.compress.SnappyCodec");
            
            this.conf = conf;
        }
    
        // getConf
        public Configuration getConf() {
            return this.conf;
        }
    
        // run
        public int run(String[] args) throws Exception {
            Configuration conf = this.getConf();
            FileSystem fs = FileSystem.get(conf);
    
            // set job
            Job job = Job.getInstance(conf, this.getClass().getSimpleName());
            job.setJarByClass(WCRunner.class);
            
            // 输入输出路径
            FileInputFormat.addInputPath(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
            // 如果输出路径存在则删除
            if (fs.exists(outpath)) {
                fs.delete(outpath, true);
            }
            
            // 指定输入文件格式
            job.setInputFormatClass(KeyValueTextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);  
                  
            // mapper
            job.setMapperClass(WCMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            // ======================shuffle====================
            // 1: partitioner
            job.setPartitionerClass(partitioner.class);
            // 2: sort
            job.setSortComparatorClass(sorter.class);
            // 3: combiner
            job.setCombinerClass(WCReducer.class);
            // 4: compress
            // 在conf处进行设置
            // 5: group
            job.setGroupingComparatorClass(grouper.class);
            // ======================shuffle====================
    
            // reducer
            job.setReducerClass(WCReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            // 设置reduce输出压缩
            FileOutputFormat.setCompressionOutput(job, true);
            FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
            
            // 设置顺序文件输出压缩        
            SequenceFileOutputFormat.setCompressOutput(job,true);
            SequenceFileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
            SequenceFileOutputFormat.setOutputCompressionType(job, SequenceFile.CompressionType.BLOCK);        
            
            // submit job
            boolean isSucces = job.waitForCompletion(true);
            return isSucces ? 0 : 1;
        }
    
        // main
        public static void main(String[] args) throws Exception {
    
            args = new String[]{
                        "/user/liangxw/wcinput",
                        "/user/liangxw/wcoutput"
            };
    
            // run job
            int status = ToolRunner.run(new WCRunner(), args);
    
            System.exit(status);
        }
    
    }
    

    相关文章

      网友评论

          本文标题:MapReduce八股文

          本文链接:https://www.haomeiwen.com/subject/vhzumttx.html