美文网首页
Hadoop introduction

Hadoop introduction

作者: SeanC52111 | 来源:发表于2017-09-23 19:33 被阅读0次

    Example 2-3. Mapper for the maximum temperature example

    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    public class MaxTemperatureMapper
    extends Mapper<LongWritable, Text, Text, IntWritable> {
    private static final int MISSING = 9999;
    @Override
    public void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
    String line = value.toString();
    String year = line.substring(15, 19);
    int airTemperature;
    if (line.charAt(87) == '+') { // parseInt doesn't like leading plus signs
    airTemperature = Integer.parseInt(line.substring(88, 92));
    } else {
    airTemperature = Integer.parseInt(line.substring(87, 92));
    }
    String quality = line.substring(92, 93);
    if (airTemperature != MISSING && quality.matches("[01459]")) {
    context.write(new Text(year), new IntWritable(airTemperature));
    }
    }
    }
    

    The Mapper class is a generic type, with four formal type parameters that specify the input key, input value, output key, and output value types of the map function. For the present example, the input key is a long integer offset, the input value is a line of text, the output key is a year, and the output value is an air temperature (an integer). Rather than using built-in Java types, Hadoop provides its own set of basic types that are optimized for network serialization. These are found in the org.apache.hadoop.io package. Here we use LongWritable, which corresponds to a Java Long, Text (like Java String), and IntWritable (like Java Integer). The map() method is passed a key and a value. We convert the Text value containing the line of input into a Java String, then use its substring() method to extract the columns we are interested in. The map() method also provides an instance of Context to write the output to. In this case, we write the year as a Text object (since we are just using it as a key), and the temperature is wrapped in an IntWritable. We write an output record only if the temperature is present and the quality code indicates the temperature reading is OK. The reduce function is similarly defined using a Reducer, as illustrated in Example 2-4.
    Example 2-4. Reducer for the maximum temperature example

    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    public class MaxTemperatureReducer
    extends Reducer<Text, IntWritable, Text, IntWritable> {
    @Override
    public void reduce(Text key, Iterable<IntWritable> values, Context context)
    throws IOException, InterruptedException {
    int maxValue = Integer.MIN_VALUE;
    for (IntWritable value : values) {
    maxValue = Math.max(maxValue, value.get());
    }
    context.write(key, new IntWritable(maxValue));
    }
    }
    

    Again, four formal type parameters are used to specify the input and output types, this time for the reduce function. The input types of the reduce function must match the output types of the map function: Text and IntWritable. And in this case, the output types of the reduce function are Text and IntWritable, for a year and its maximum temperature,
    which we find by iterating through the temperatures and comparing each with a record of the highest found so far.
    The third piece of code runs the MapReduce job (see Example 2-5).
    Example 2-5. Application to find the maximum temperature in the weather dataset

    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    public class MaxTemperature {
    public static void main(String[] args) throws Exception {
    if (args.length != 2) {
    System.err.println("Usage: MaxTemperature <input path> <output path>");
    System.exit(-1);
    }
    Job job = new Job();
    job.setJarByClass(MaxTemperature.class);
    job.setJobName("Max temperature")
    FileInputFormat.addInputPath(job, new Path(args[0]));
    FileOutputFormat.setOutputPath(job, new Path(args[1]));
    job.setMapperClass(MaxTemperatureMapper.class);
    job.setReducerClass(MaxTemperatureReducer.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(IntWritable.class);
    System.exit(job.waitForCompletion(true) ? 0 : 1);
    }
    }
    

    A Job object forms the specification of the job and gives you control over how the job is
    run. When we run this job on a Hadoop cluster, we will package the code into a JAR file
    (which Hadoop will distribute around the cluster). Rather than explicitly specifying the
    name of the JAR file, we can pass a class in the Job’s setJarByClass() method, which
    Hadoop will use to locate the relevant JAR file by looking for the JAR file containing this
    class.
    Having constructed a Job object, we specify the input and output paths. An input path is
    specified by calling the static addInputPath() method on FileInputFormat, and it can be
    a single file, a directory (in which case, the input forms all the files in that directory), or a
    file pattern. As the name suggests, addInputPath() can be called more than once to use
    input from multiple paths.
    The output path (of which there is only one) is specified by the static setOutputPath()
    method on FileOutputFormat. It specifies a directory where the output files from the
    reduce function are written. The directory shouldn’t exist before running the job because
    Hadoop will complain and not run the job. This precaution is to prevent data loss (it can
    be very annoying to accidentally overwrite the output of a long job with that of another).
    Next, we specify the map and reduce types to use via the setMapperClass() and
    setReducerClass() methods.
    The setOutputKeyClass() and setOutputValueClass() methods control the output types
    for the reduce function, and must match what the Reduce class produces. The map output
    types default to the same types, so they do not need to be set if the mapper produces the
    same types as the reducer (as it does in our case). However, if they are different, the map
    output types must be set using the setMapOutputKeyClass() and
    setMapOutputValueClass() methods.
    The input types are controlled via the input format, which we have not explicitly set
    because we are using the default TextInputFormat.
    After setting the classes that define the map and reduce functions, we are ready to run the
    job. The waitForCompletion() method on Job submits the job and waits for it to finish.
    The single argument to the method is a flag indicating whether verbose output is
    generated. When true, the job writes information about its progress to the console.
    The return value of the waitForCompletion() method is a Boolean indicating success
    (true) or failure (false), which we translate into the program’s exit code of 0 or 1.

    相关文章

      网友评论

          本文标题:Hadoop introduction

          本文链接:https://www.haomeiwen.com/subject/pgthextx.html