美文网首页
第6章 MapReduce应用开发-MRUnit编写

第6章 MapReduce应用开发-MRUnit编写

作者: 主君_05c4 | 来源:发表于2019-05-29 11:09 被阅读0次

    MRUnit是一个测试库,可以方便地测试Mapper与Reducer运行逻辑是否符合预期.

    1、Mapper单元测试

    如下为V1版本Mapper:

    public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            if (value == null) {
                return;
            }
            String line = value.toString();
            String year = line.substring(15, 19);
            int temperature = Integer.parseInt(line.substring(87, 92));
            context.write(new Text(year), new IntWritable(temperature));
        }
    }
    
    public class MaxTemperatureMapperTest {
    
        @Test
        public void processValidRecord() throws IOException {
            String line = "0043011990999991950051518004+68750+023550FM-12+0382" +
                                  // year ^^^^
                    "99999V0203201N00261220001CN9999999N9-00111+99999999999";
                                           //Temperature ^^^^^
            Text input = new Text(line);
    
            MaxTemperatureMapper mapper = new MaxTemperatureMapper();
    
            MapDriver<LongWritable, Text, Text, IntWritable> driver = new MapDriver<>();
            driver.withMapper(mapper)
                .withInput(new LongWritable(0), input)
                .withOutput(new Text("1950"), new IntWritable(-11))
                .runTest();
        }
    
        //@Ignore
        @Test
        public void ignoreMissingTemperatureRecord() throws IOException {
            String line = "0043011990999991950051518004+68750+023550FM-12+0382" +
                                  // year ^^^^
                    "99999V0203201N00261220001CN9999999N9+99991+99999999999";
                                           //Temperature ^^^^^
            Text input = new Text(line);
    
            MaxTemperatureMapper mapper = new MaxTemperatureMapper();
    
            MapDriver<LongWritable, Text, Text, IntWritable> driver = new MapDriver<>();
            driver.withMapper(mapper)
                    .withInput(new LongWritable(0), input)
                    .runTest();
        }
    
        @Test
        public void processMalformedTemperatureRecord() throws IOException {
            Text value = new Text("0335999999433181957042302005+37950+139117SAO  +0004" +
                                          // Year ^^^^
                    "RJSN V02011359003150070356999999433201957010100005+353");
                                           // Temperature ^^^^^
            Text input = new Text(value);
    
            MaxTemperatureMapper mapper = new MaxTemperatureMapper();
    
            MapDriver<LongWritable, Text, Text, IntWritable> driver = new MapDriver<>();
            driver.withMapper(mapper)
                    .withInput(new LongWritable(0), input)
                    .withOutput(new Text("1957"), new IntWritable(1957))
                    .runTest();
        }
    }
    

    根据withOutput调用次数,MapDriver能用来检查0、1、多个输出记录。+9999代表记录气温值缺失,由于mapper未考虑,ignoreMissingTemperatureRecord将测试不通过。

    可以将Mapper中的记录解析逻辑抽取为一个解析类,如下为V2版本Mapper:

    public class NcdcRecordParse {
    
        private static final int MISSING_TEMPERATURE = 9999;
    
        private String year;
        private int temperature;
        private String quality;
    
        public NcdcRecordParse() {
            super();
        }
    
        public NcdcRecordParse(String record) {
            parse(record);
        }
    
        public NcdcRecordParse(Text record) {
            parse(record);
        }
    
        public void parse(Text record) {
            parse(record.toString());
        }
    
    
        public void parse(String record) {
            if (record == null || record.trim().equals("")) {
                this.year = "";
                this.temperature = 0;
                this.quality = "0";
            }
            this.year = record.substring(15, 19);
    
            if (record.charAt(87) == '+') {
                this.temperature = Integer.parseInt(record.substring(88, 92));
            } else {
                this.temperature = Integer.parseInt(record.substring(87, 92));
            }
    
            this.quality = record.substring(92, 93);
        }
    
        public boolean isValidTemperature() {
            return this.temperature != 0 && this.temperature != MISSING_TEMPERATURE && quality.matches("[01459]");
        }
    
        public String getYear() {
            return year;
        }
    
        public int getTemperature() {
            return temperature;
        }
    }
    
    public class MaxTemperatureMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
        private final NcdcRecordParse ncdcRecordParse = new NcdcRecordParse();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            ncdcRecordParse.parse(value);
            if (ncdcRecordParse.isValidTemperature()) {
                String year = ncdcRecordParse.getYear();
                int temperature = ncdcRecordParse.getTemperature();
                context.write(new Text(year), new IntWritable(temperature));
            }
        }
    }
    
    @Test
        public void processingValidRecord() throws IOException {
            ...
        }
    
        @Test
        public void processPositiveTemperatureRecord() throws IOException {
            String line = "0043011990999991950051518004+68750+023550FM-12+0382" +
                                  // Year ^^^^
                    "99999V0203201N00261220001CN9999999N9+00111+99999999999";
                                          // Temperature ^^^^^
            Text input = new Text(line);
            MaxTemperatureMapper mapper = new MaxTemperatureMapper();
            new MapDriver<LongWritable, Text, Text, IntWritable>()
                    .withMapper(mapper)
                    .withInput(new LongWritable(0), input)
                    .withOutput(new Text("1950"), new IntWritable(11))
                    .runTest();
        }
    
        @Test
        public void ignoreMissingTemperatureRecord() throws IOException {
            ...
        }
    
        @Test
        public void ignoreSuspectRecord() throws IOException {
            String line = "0043011990999991950051518004+68750+023550FM-12+0382" +
                                  // Year ^^^^
                    "99999V0203201N00261220001CN9999999N9+12302+99999999999";
                                          // Temperature ^^^^^
            Text input = new Text(line);
            MaxTemperatureMapper mapper = new MaxTemperatureMapper();
    
            new MapDriver<LongWritable, Text, Text, IntWritable>()
                    .withMapper(mapper)
                    .withInput(new LongWritable(0), input)
                    //.withOutput(new Text("1950"), new IntWritable(1230))
                    .runTest();
        }
    
    2、Reducer单元测试
    public class MaxTemperatureReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context)
           throws IOException, InterruptedException {
            int max = Integer.MIN_VALUE;
            for (Iterator<IntWritable> it = values.iterator(); it.hasNext(); ) {
                int value = it.next().get();
                if (max < value) {
                    max = value;
                }
            }
            context.write(key, new IntWritable(max));
        }
    }
    
        @Test
        public void returnsMaximumIntegerInValues() throws IOException {
            MaxTemperatureReducer reducer = new MaxTemperatureReducer();
    
            Text key = new Text("1950");
            List<IntWritable> values = new ArrayList() {
                {
                    add(new IntWritable(30));
                    add(new IntWritable(25));
                    add(new IntWritable(33));
                }
            };
    
            ReduceDriver driver = new ReduceDriver();
            driver.withReducer(reducer)
                    .withInput(key, values)
                    .withOutput(key, new IntWritable(30))
                    .runTest();
        }
    
    3、Driver测试

    任务启动入口:

    public class MaxTemperatureDriver extends Configured implements Tool {
    
        @Override
        public int run(String[] strings) throws Exception {
            if (strings.length != 2) {
                System.err.printf("Usage: %s [generic options] <input> <output>\n",
                        getClass().getSimpleName());
                ToolRunner.printGenericCommandUsage(System.err);
                return -1;
            }
            Job job = Job.getInstance(getConf());
    
            job.setJobName("MaxTemperature");
            job.setJarByClass(MaxTemperatureDriver.class);
    
            FileInputFormat.addInputPath(job, new Path(strings[0]));
            FileOutputFormat.setOutputPath(job, new Path(strings[1]));
    
            job.setMapperClass(MaxTemperatureMapper.class);
            job.setCombinerClass(MaxTemperatureReducer.class);
            job.setReducerClass(MaxTemperatureReducer.class);
    
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputKeyClass(IntWritable.class);
    
            return job.waitForCompletion(true) ? 0 : 1;
        }
    
        public static void main(String[] args) throws Exception {
            int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
            System.exit(exitCode);
        }
    }
    

    1)使用本地作业运行器

    @Test
        public void test() throws Exception {
            Configuration conf = new Configuration();
            conf.set("fs.defaultFS", "file:///");
            conf.set("mapreduce.framework.name", "local");
            conf.setInt("mapreduce.task.io.sort.mb", 1);
    
            Path input = new Path("input/ncdc");
            Path output = new Path("output");
    
            FileSystem fileSystem = FileSystem.get(conf);
            if (fileSystem.exists(output)) {
                fileSystem.delete(output, true);
            }
    
            MaxTemperatureDriver driver = new MaxTemperatureDriver();
            driver.setConf(conf);
    
            int exitCode = driver.run(new String[]{input.toString(), output.toString()});
            Assert.assertThat(exitCode, CoreMatchers.is(0));
    
            checkOutput(conf, output);
        }
    
        static class OutputLogFilter implements PathFilter {
    
            @Override
            public boolean accept(Path path) {
                return !path.getName().startsWith("_");
            }
        }
    
        public void checkOutput(Configuration conf, Path output) throws IOException {
            FileSystem fileSystem = FileSystem.get(conf);
            FileStatus[] status = fileSystem.listStatus(output, new OutputLogFilter());
            Assert.assertThat(status.length, CoreMatchers.is(1));
    
            Path[] paths = FileUtil.stat2Paths(status);
            BufferedReader outputBufferedReader = asBufferedReader(fileSystem.open(paths[0]));
            BufferedReader expectedBufferedReader = asBufferedReader(getClass().getResourceAsStream("/expected.txt"));
    
            String line;
            while((line = expectedBufferedReader.readLine()) != null) {
                Assert.assertEquals(line, outputBufferedReader.readLine());
            }
    
            Assert.assertNull(outputBufferedReader.readLine());
    
            expectedBufferedReader.close();
            outputBufferedReader.close();
        }
    
        private BufferedReader asBufferedReader(InputStream is) {
            return new BufferedReader(new InputStreamReader(is));
        }
    

    2)使用mini集群运行
    mini集群上节点管理器启动不同的JVM来执行任务,调试会更困难。

    public class MaxTemperatureDriverMiniTest extends ClusterMapReduceTestCase {
        @Override
        protected void setUp() throws Exception {
            if (System.getProperty("test.build.data") == null) {
                System.setProperty("test.build.data", "D:\\temp\\hadoop-testdata");
            }
            if (System.getProperty("hadoop.log.dir") == null) {
                System.setProperty("hadoop.log.dir", "D:\\temp\\hadoop-testdata");
            }
    
            super.setUp();
        }
    
        public void test() throws Exception {
            FileSystem fileSystem = getFileSystem();
            Path local = new Path("input/ncdc");
            Path input = getInputDir();
            Path output = getOutputDir();
            fileSystem.copyFromLocalFile(local, input);
    
            JobConf conf = createJobConf();
            MaxTemperatureDriver driver = new MaxTemperatureDriver();
            driver.setConf(conf);
            int exitCode = ToolRunner.run(driver, new String[]{input.toString(), output.toString()});
            Assert.assertEquals(exitCode, 0);
    
            FileStatus[] status = fileSystem.listStatus(output, new OutputLogFilter());
            Assert.assertEquals(status.length, 1);
    
            Path[] paths = FileUtil.stat2Paths(status);
            BufferedReader reader = new BufferedReader(new InputStreamReader(fileSystem.open(paths[0])));
            Assert.assertEquals(reader.readLine(), "1949\t111");
            Assert.assertEquals(reader.readLine(), "1950\t22");
            Assert.assertNull(reader.readLine());
            reader.close();
        }
    
        static class OutputLogFilter implements PathFilter {
    
            @Override
            public boolean accept(Path path) {
                return !path.getName().startsWith("_");
            }
        }
    }
    

    注意:windows环境下,需要winutils支持程序运行,可在github上下载,将bin替换本地hadoop/bin。winutils用于在windows环境运行Linux命令,如:
    winutils.exe ls -F D:\...\dfs\data\data1。在win7下,MaxTemperatureDriverMiniTest运行失败,执行创建容器命令winutils.exe task create -m -1 -c -1 container_1559097916661_0001_01_000001 "cmd /c C:/.../default_container_executor.cmd",会报CreateTask error (5),搜索了下没找到解决方案,default_container_executor.cmd调用了同目录下launch_container.cmd脚本,launch_container.cmd启动JVM运行org.apache.hadoop.mapreduce.v2.app.MRAppMaster

    相关文章

      网友评论

          本文标题:第6章 MapReduce应用开发-MRUnit编写

          本文链接:https://www.haomeiwen.com/subject/chxetctx.html