美文网首页我爱编程
Hadoop应用开发:MapReduce

Hadoop应用开发:MapReduce

作者: Vechace | 来源:发表于2018-06-21 16:29 被阅读8次

    Hadoop应用开发

    以气象数据分析为例,分析年份气温最高值

    MapReduce编程流程
    MapReduce编程遵循特定流程

    • 首先编程map函数和reduce函数,并使用单元测试来确保函数的运行是否符合预期;
    • 然后编写驱动程序来运行作业,可以从本地IDE中用一个小的数据集来运行,观察结果
    • 如果驱动程序不能正确运行,则用IDE调试器找出问题根源,根据调试信息,扩展单元测试,从而改进mapper或reducer
    • 当程序按预期通过数据集测试后,则可以考虑把它放在集群上运行。

    开发环境配置

    常见maven依赖:

    • hadoop-client:构建MapReduce程序
    • mrunit:运行MapReduce单元测试
    • hadoop-minicluster:运行Hadoop集群测试

    单元测试

    使用MRUnit测试库

    mapper测试范例:

    import java,io,IOException;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mrunit.mapreduce.MapDriver;
    import org.junit.*
    
    public class MaxTemperatureMapperTest{
        @Test
        public void processesValidRecord()throws IOException,InterruptedException{
            //模拟输入数据集
            Text value = new Text("");
            new MapDirver<LongWritable,Text,Text,IntWritable>()
                .withMapper(new MaxTemperatureMapper())
                .withInput(new LongWritable(0),value)
                .withOutput(new Text("1950"),new IntWritable(-11))
                .runTest();
        }
    }
    

    当mapper不通过测试时,在考虑往mapper中新增逻辑处理代码时,考虑是否能够将逻辑处理代码独立出来,用一个解析类来封装解析逻辑,如下:

    解析NCDC格式的气温记录(对数据的预处理)

    public class NcdcRecordParser{
        private statis final int MISSING_TEMPERATURE = 9999;
        
        private String year;
        private int airTemperature;
        private String quality;
        
        public void parse(String record){
            year = record.substring(15,19);
            String airTemperatureString;
            if(record.charAt(87) == '+'){
                airTemperatureString = record.substring(88,92);
            }else{
                airTemperatureString = record.substring(87,92);
            }
            airTemperature = Integer.parseInt(airTemperatureStirng);
            quality = record.substring(92,93);
        }
        
        public void parse(Text record){
            parse(racord.toString);
        }
        public boolean isValidTemperature(){
            return airTemperature != MISSING_TEMPERATURE && quality.matches([01459]);
        }
        
        public String getYear(){
            return year;
        }
        
        public int getAirTemperature(){
            return airTemperature;
        }
    }
    

    改进mapper;

    public class MaxTemperatureMapper 
        extend Mapper<LongWritable,Text,Text,IntWritable>{
            private NcdcRecordParser parser = new NcdcRecordParser();
            
            @Override
            public void map(LongWritable key,Text value,Context context)
                throws IOException,InterruptedException {
                
                parser.parse(value);
                if(parser.isValidTemperature()){
                    context.write(new Text(parser.getYear()),
                        new IntWritable(parser.getAirTemperature()));
                }
            }
        }
    

    测试完mapper后,开始编写reducer及其测试用例

    reducer:找出指定键的最大值

    public class MaxTemperatureReducer
        extends Reducer<Text,IntWritable,Text,IntWritable>{
            @Override
            public void reduce(Text key,Iterable<IntWritable> values,
                Context context) throws IOException,InterruptedException {
                int maxValue = Integer.MIN_VALUE;
                for(IntWritable value:values){
                    maxValue = math.max(maxValue,value.get());
                }
                context.write(key,new IntWritable(maxValue));
            }
    }
    

    reducer的test实例

    public MaxTemperatureReducerTest{
        @Test
        public void returnsMaxIntegerInValues()throws IOException,InterruptedException{
            new ReducerDriver<Text,IntWritable,Text,IntWritable>()
                .withReducer(new MaxTemperatureReducer())
                .withInputKey(new Text("1950"),
                    Arrays.asList(new IntWritable(10),new IntWritable(5)))
                .withOutput(new Text("1950"),new IntWritable(10))
                .runTest();
        }
    }
    

    在本地作业运行器上运行作业,编写MapReduce作业的驱动程序

    public class MaxTemperatureDriver extends Configured implements Tool{
        @Override
        public int run(Stringp[] args)throws Exception{
            if(args.length !=2){
                System.err.printf("Usage: %s [generic options] <input> <output>\n",
                    getClass().getSimpleName());
                ToolRunner.printGenericCommandUsage(System.err);
                return -1;
            }
            
            Job job = new Job(getConf(),"Max temperature");
            job.setJarByClass(getClass());
            
            FileInputFormat.addInputPath(job,new Path(args[0]));
            FileOutputFormat.setOutputPath(job,new Path(args[1]));
            
            job.setMapperClass(MaxTemperatureMapper.class);
            //Combiner的作用是对map端输出先做合并,以减少传输到reducer的数据量
            job.setCombinerClass(MaxTemperatureReducer.class);
            job.setReducerClass(MaxtemperatureReducer.class);
            
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            
            return job.waiForCompletion(true) ?0:1;
        }
        public static void main(String[] args)throws Exception{
            int exitCode = ToolRunner.run(new MaxTemperatureDriver(),args);
            System.exit(exitCode);
        }
    }
    

    参考资料:《Hadoop权威指南》

    相关文章

      网友评论

        本文标题:Hadoop应用开发:MapReduce

        本文链接:https://www.haomeiwen.com/subject/yrccyftx.html