美文网首页
【2019-05-05】map reduce应用开发

【2019-05-05】map reduce应用开发

作者: BigBigFlower | 来源:发表于2019-05-06 10:22 被阅读0次

    找最高气温
    MRunit写测试单元
    (1)版本一mapper 函数
    MaxTemperatureMapper

    package v1;
    // cc MaxTemperatureMapperV1 First version of a Mapper that passes MaxTemperatureMapperTest
    import java.io.IOException;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.*;
    //vv MaxTemperatureMapperV1
    public class MaxTemperatureMapper
      extends Mapper<LongWritable, Text, Text, IntWritable> {
      //LongWritable key   Map函数的输入键
      //Text value          Map函数的输入值
      //Text,IntWritable     输出键,值
      @Override
      public void map(LongWritable key, Text value, Context context)
          throws IOException, InterruptedException {
        
        String line = value.toString();
        String year = line.substring(15, 19);
        int airTemperature = Integer.parseInt(line.substring(87, 92));
        context.write(new Text(year), new IntWritable(airTemperature));
        //context应该是用来传递数据以及其他运行状态信息,map中的key、value写入context,让它传递给Reducer进行reduce,而reduce进行处理之后数据继续写入context,继续交给Hadoop写入hdfs系统。
      }
    }
    //^^ MaxTemperatureMapperV1
    
    

    (2)解析NCDC格式的气温记录

    // cc NcdcRecordParserV2 A class for parsing weather records in NCDC format
    package v2;
    import org.apache.hadoop.io.Text;
    
    // vv NcdcRecordParserV2
    public class NcdcRecordParser {
      
      private static final int MISSING_TEMPERATURE = 9999;
      
      private String year;
      private int airTemperature;
      private String quality;
      //解析
      public void parse(String record) {
        year = record.substring(15, 19);
        String airTemperatureString;
        // Remove leading plus sign as parseInt doesn't like them (pre-Java 7)
        if (record.charAt(87) == '+') { 
          airTemperatureString = record.substring(88, 92);
        } else {
          airTemperatureString = record.substring(87, 92);
        }
        airTemperature = Integer.parseInt(airTemperatureString);
        quality = record.substring(92, 93);
      }
      
      public void parse(Text record) {
        parse(record.toString());
      }
    
      public boolean isValidTemperature() {
        return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]");
      }
      
      public String getYear() {
        return year;
      }
    
      public int getAirTemperature() {
        return airTemperature;
      }
    }
    // ^^ NcdcRecordParserV2
    
    

    (3)版本二mapper函数,使用utility类解析函数

    package v2;
    // cc MaxTemperatureMapperV2 A Mapper that uses a utility class to parse records
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import v2.NcdcRecordParser;
    
    // vv MaxTemperatureMapperV2
    public class MaxTemperatureMapper
        extends Mapper<LongWritable, Text, Text, IntWritable> {
    
      /*[*/private NcdcRecordParser parser = new NcdcRecordParser();/*]*/
    
      @Override
      public void map(LongWritable key, Text value, Context context)
          throws IOException, InterruptedException {
    
        /*[*/parser.parse(value);/*]*/
        if (/*[*/parser.isValidTemperature()/*]*/) {
          context.write(new Text(/*[*/parser.getYear()/*]*/),
              new IntWritable(/*[*/parser.getAirTemperature()/*]*/));
        }
      }
    }
    // ^^ MaxTemperatureMapperV2
    
    

    (4)计算最高气温reducer

    package v1;
    //cc MaxTemperatureReducerV1 Reducer for maximum temperature example
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    // vv MaxTemperatureReducerV1
    public class MaxTemperatureReducer
      extends Reducer<Text, IntWritable, Text, IntWritable> {
    //reduce 函数有四个形式参数类型用于指定输入和输出类型。
      @Override
      public void reduce(Text key, Iterable<IntWritable> values,
          Context context)
          throws IOException, InterruptedException {
        
        int maxValue = Integer.MIN_VALUE;
        for (IntWritable value : values) {
          maxValue = Math.max(maxValue, value.get());
        }
        context.write(key, new IntWritable(maxValue));
      }
    }
    // ^^ MaxTemperatureReducerV1
    
    

    (5)本地运行测试数据
    查找最高气温

    package v2;
    
    // cc MaxTemperatureDriverV2 Application to find the maximum temperature
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    import v1.MaxTemperatureReducer;
    
    // vv MaxTemperatureDriverV2
    public class MaxTemperatureDriver extends Configured implements Tool {
    
      @Override
      public int run(String[] args) throws Exception {
        if (args.length != 2) {
          System.err.printf("Usage: %s [generic options] <input> <output>\n",
              getClass().getSimpleName());
          ToolRunner.printGenericCommandUsage(System.err);
          return -1;
        }
        
        Job job = new Job(getConf(), "Max temperature");
        job.setJarByClass(getClass());
    //输入输出路径
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
      //map、combine、reduce
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);
    //输出<Text,IntWritable>
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        //waitForCompletion 工作等待完成
        return job.waitForCompletion(true) ? 0 : 1;
      }
      
      public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
        System.exit(exitCode);
      }
    }
    // ^^ MaxTemperatureDriverV2
    
    

    测试驱动程序

    package v2;
    // cc MaxTemperatureDriverTestV2 A test for MaxTemperatureDriver that uses a local,
    // in-process job runner
    import static org.hamcrest.CoreMatchers.is;
    import static org.hamcrest.CoreMatchers.nullValue;
    import static org.junit.Assert.assertThat;
    
    import java.io.BufferedReader;
    import java.io.IOException;
    import java.io.InputStream;
    import java.io.InputStreamReader;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.FileUtil;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.fs.PathFilter;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.junit.Test;
    
    public class MaxTemperatureDriverTest {
      
      public static class OutputLogFilter implements PathFilter {
        public boolean accept(Path path) {
          return !path.getName().startsWith("_");
        }
      }
      
    //vv MaxTemperatureDriverTestV2
      @Test
      public void test() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "file:///");
        conf.set("mapreduce.framework.name", "local");
        conf.setInt("mapreduce.task.io.sort.mb", 1);
        
        Path input = new Path("input/ncdc/micro");
        Path output = new Path("output");
        
        FileSystem fs = FileSystem.getLocal(conf);
        fs.delete(output, true); // delete old output
        
        MaxTemperatureDriver driver = new MaxTemperatureDriver();
        driver.setConf(conf);
        
        int exitCode = driver.run(new String[] {
            input.toString(), output.toString() });
        assertThat(exitCode, is(0));
        
        checkOutput(conf, output);
      }
    //^^ MaxTemperatureDriverTestV2
    
      private void checkOutput(Configuration conf, Path output) throws IOException {
        FileSystem fs = FileSystem.getLocal(conf);
        Path[] outputFiles = FileUtil.stat2Paths(
            fs.listStatus(output, new OutputLogFilter()));
        assertThat(outputFiles.length, is(1));
        
        BufferedReader actual = asBufferedReader(fs.open(outputFiles[0]));
        BufferedReader expected = asBufferedReader(
            getClass().getResourceAsStream("/expected.txt"));
        String expectedLine;
        while ((expectedLine = expected.readLine()) != null) {
          assertThat(actual.readLine(), is(expectedLine));
        }
        assertThat(actual.readLine(), nullValue());
        actual.close();
        expected.close();
      }
      
      private BufferedReader asBufferedReader(InputStream in) throws IOException {
        return new BufferedReader(new InputStreamReader(in));
      }
    }
    
    

    (6)mapper版本三,找到异常的输入数据源

    // == MaxTemperatureMapperV3
    package v3;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import v2.NcdcRecordParser;
    
    //vv MaxTemperatureMapperV3
    public class MaxTemperatureMapper
      extends Mapper<LongWritable, Text, Text, IntWritable> {
    
      /*[*/enum Temperature {
        OVER_100
      }/*]*/
      
      private NcdcRecordParser parser = new NcdcRecordParser();
    
      @Override
      public void map(LongWritable key, Text value, Context context)
          throws IOException, InterruptedException {
        
        parser.parse(value);
        if (parser.isValidTemperature()) {
          int airTemperature = parser.getAirTemperature();
          /*[*/if (airTemperature > 1000) {
            System.err.println("Temperature over 100 degrees for input: " + value);
            context.setStatus("Detected possibly corrupt record: see logs.");
            context.getCounter(Temperature.OVER_100).increment(1);
          }/*]*/
          context.write(new Text(parser.getYear()), new IntWritable(airTemperature));
        }
      }
    }
    //^^ MaxTemperatureMapperV3
    

    (7)处理不合理的数据
    mapper查找最高气温

    package v4;
    // cc MaxTemperatureMapperV4 Mapper for maximum temperature example
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    // vv MaxTemperatureMapperV4
    public class MaxTemperatureMapper
      extends Mapper<LongWritable, Text, Text, IntWritable> {
      
      enum Temperature {
        MALFORMED
      }
    
      private NcdcRecordParser parser = new NcdcRecordParser();
      
      @Override
      public void map(LongWritable key, Text value, Context context)
          throws IOException, InterruptedException {
        
        parser.parse(value);
        if (parser.isValidTemperature()) {
          int airTemperature = parser.getAirTemperature();
          context.write(new Text(parser.getYear()), new IntWritable(airTemperature));
        } else if (parser.isMalformedTemperature()) {
          System.err.println("Ignoring possibly corrupt input: " + value);
          context.getCounter(Temperature.MALFORMED).increment(1);
        }
      }
    }
    // ^^ MaxTemperatureMapperV4
    
    

    Map reduce工作流
    (1)将问题分解成map reduce作业
    (2)关于JobControl
    (3)关于ApacheOoize----运行工作流的系统

    相关文章

      网友评论

          本文标题:【2019-05-05】map reduce应用开发

          本文链接:https://www.haomeiwen.com/subject/lskcoqtx.html