找最高气温
MRunit写测试单元
(1)版本一mapper 函数
MaxTemperatureMapper
package v1;
// cc MaxTemperatureMapperV1 First version of a Mapper that passes MaxTemperatureMapperTest
import java.io.IOException;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.*;
//vv MaxTemperatureMapperV1
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
//LongWritable key Map函数的输入键
//Text value Map函数的输入值
//Text,IntWritable 输出键,值
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
String line = value.toString();
String year = line.substring(15, 19);
int airTemperature = Integer.parseInt(line.substring(87, 92));
context.write(new Text(year), new IntWritable(airTemperature));
//context应该是用来传递数据以及其他运行状态信息,map中的key、value写入context,让它传递给Reducer进行reduce,而reduce进行处理之后数据继续写入context,继续交给Hadoop写入hdfs系统。
}
}
//^^ MaxTemperatureMapperV1
(2)解析NCDC格式的气温记录
// cc NcdcRecordParserV2 A class for parsing weather records in NCDC format
package v2;
import org.apache.hadoop.io.Text;
// vv NcdcRecordParserV2
public class NcdcRecordParser {
private static final int MISSING_TEMPERATURE = 9999;
private String year;
private int airTemperature;
private String quality;
//解析
public void parse(String record) {
year = record.substring(15, 19);
String airTemperatureString;
// Remove leading plus sign as parseInt doesn't like them (pre-Java 7)
if (record.charAt(87) == '+') {
airTemperatureString = record.substring(88, 92);
} else {
airTemperatureString = record.substring(87, 92);
}
airTemperature = Integer.parseInt(airTemperatureString);
quality = record.substring(92, 93);
}
public void parse(Text record) {
parse(record.toString());
}
public boolean isValidTemperature() {
return airTemperature != MISSING_TEMPERATURE && quality.matches("[01459]");
}
public String getYear() {
return year;
}
public int getAirTemperature() {
return airTemperature;
}
}
// ^^ NcdcRecordParserV2
(3)版本二mapper函数,使用utility类解析函数
package v2;
// cc MaxTemperatureMapperV2 A Mapper that uses a utility class to parse records
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import v2.NcdcRecordParser;
// vv MaxTemperatureMapperV2
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
/*[*/private NcdcRecordParser parser = new NcdcRecordParser();/*]*/
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
/*[*/parser.parse(value);/*]*/
if (/*[*/parser.isValidTemperature()/*]*/) {
context.write(new Text(/*[*/parser.getYear()/*]*/),
new IntWritable(/*[*/parser.getAirTemperature()/*]*/));
}
}
}
// ^^ MaxTemperatureMapperV2
(4)计算最高气温reducer
package v1;
//cc MaxTemperatureReducerV1 Reducer for maximum temperature example
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
// vv MaxTemperatureReducerV1
public class MaxTemperatureReducer
extends Reducer<Text, IntWritable, Text, IntWritable> {
//reduce 函数有四个形式参数类型用于指定输入和输出类型。
@Override
public void reduce(Text key, Iterable<IntWritable> values,
Context context)
throws IOException, InterruptedException {
int maxValue = Integer.MIN_VALUE;
for (IntWritable value : values) {
maxValue = Math.max(maxValue, value.get());
}
context.write(key, new IntWritable(maxValue));
}
}
// ^^ MaxTemperatureReducerV1
(5)本地运行测试数据
查找最高气温
package v2;
// cc MaxTemperatureDriverV2 Application to find the maximum temperature
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;
import v1.MaxTemperatureReducer;
// vv MaxTemperatureDriverV2
public class MaxTemperatureDriver extends Configured implements Tool {
@Override
public int run(String[] args) throws Exception {
if (args.length != 2) {
System.err.printf("Usage: %s [generic options] <input> <output>\n",
getClass().getSimpleName());
ToolRunner.printGenericCommandUsage(System.err);
return -1;
}
Job job = new Job(getConf(), "Max temperature");
job.setJarByClass(getClass());
//输入输出路径
FileInputFormat.addInputPath(job, new Path(args[0]));
FileOutputFormat.setOutputPath(job, new Path(args[1]));
//map、combine、reduce
job.setMapperClass(MaxTemperatureMapper.class);
job.setCombinerClass(MaxTemperatureReducer.class);
job.setReducerClass(MaxTemperatureReducer.class);
//输出<Text,IntWritable>
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(IntWritable.class);
//waitForCompletion 工作等待完成
return job.waitForCompletion(true) ? 0 : 1;
}
public static void main(String[] args) throws Exception {
int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
System.exit(exitCode);
}
}
// ^^ MaxTemperatureDriverV2
测试驱动程序
package v2;
// cc MaxTemperatureDriverTestV2 A test for MaxTemperatureDriver that uses a local,
// in-process job runner
import static org.hamcrest.CoreMatchers.is;
import static org.hamcrest.CoreMatchers.nullValue;
import static org.junit.Assert.assertThat;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.PathFilter;
import org.apache.hadoop.mapreduce.JobContext;
import org.junit.Test;
public class MaxTemperatureDriverTest {
public static class OutputLogFilter implements PathFilter {
public boolean accept(Path path) {
return !path.getName().startsWith("_");
}
}
//vv MaxTemperatureDriverTestV2
@Test
public void test() throws Exception {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "file:///");
conf.set("mapreduce.framework.name", "local");
conf.setInt("mapreduce.task.io.sort.mb", 1);
Path input = new Path("input/ncdc/micro");
Path output = new Path("output");
FileSystem fs = FileSystem.getLocal(conf);
fs.delete(output, true); // delete old output
MaxTemperatureDriver driver = new MaxTemperatureDriver();
driver.setConf(conf);
int exitCode = driver.run(new String[] {
input.toString(), output.toString() });
assertThat(exitCode, is(0));
checkOutput(conf, output);
}
//^^ MaxTemperatureDriverTestV2
private void checkOutput(Configuration conf, Path output) throws IOException {
FileSystem fs = FileSystem.getLocal(conf);
Path[] outputFiles = FileUtil.stat2Paths(
fs.listStatus(output, new OutputLogFilter()));
assertThat(outputFiles.length, is(1));
BufferedReader actual = asBufferedReader(fs.open(outputFiles[0]));
BufferedReader expected = asBufferedReader(
getClass().getResourceAsStream("/expected.txt"));
String expectedLine;
while ((expectedLine = expected.readLine()) != null) {
assertThat(actual.readLine(), is(expectedLine));
}
assertThat(actual.readLine(), nullValue());
actual.close();
expected.close();
}
private BufferedReader asBufferedReader(InputStream in) throws IOException {
return new BufferedReader(new InputStreamReader(in));
}
}
(6)mapper版本三,找到异常的输入数据源
// == MaxTemperatureMapperV3
package v3;
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import v2.NcdcRecordParser;
//vv MaxTemperatureMapperV3
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
/*[*/enum Temperature {
OVER_100
}/*]*/
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
parser.parse(value);
if (parser.isValidTemperature()) {
int airTemperature = parser.getAirTemperature();
/*[*/if (airTemperature > 1000) {
System.err.println("Temperature over 100 degrees for input: " + value);
context.setStatus("Detected possibly corrupt record: see logs.");
context.getCounter(Temperature.OVER_100).increment(1);
}/*]*/
context.write(new Text(parser.getYear()), new IntWritable(airTemperature));
}
}
}
//^^ MaxTemperatureMapperV3
(7)处理不合理的数据
mapper查找最高气温
package v4;
// cc MaxTemperatureMapperV4 Mapper for maximum temperature example
import java.io.IOException;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
// vv MaxTemperatureMapperV4
public class MaxTemperatureMapper
extends Mapper<LongWritable, Text, Text, IntWritable> {
enum Temperature {
MALFORMED
}
private NcdcRecordParser parser = new NcdcRecordParser();
@Override
public void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
parser.parse(value);
if (parser.isValidTemperature()) {
int airTemperature = parser.getAirTemperature();
context.write(new Text(parser.getYear()), new IntWritable(airTemperature));
} else if (parser.isMalformedTemperature()) {
System.err.println("Ignoring possibly corrupt input: " + value);
context.getCounter(Temperature.MALFORMED).increment(1);
}
}
}
// ^^ MaxTemperatureMapperV4
Map reduce工作流
(1)将问题分解成map reduce作业
(2)关于JobControl
(3)关于ApacheOoize----运行工作流的系统
网友评论