MapReduce示例

作者: kangapp | 来源:发表于2018-10-28 15:58 被阅读0次
    MapReduce执行流程图

    核心概念

    • Split:MapReduce作业处理的数据块,是MapReduce中最小的计算单元。和HDFS中的block默认是一一对应的,也可以手动设置他们之间的比值关系(不建议)
    • InputFormat:将输入数据进行分片(split)
    package org.apache.hadoop.mapred;
    
    import java.io.IOException;
    
    import org.apache.hadoop.classification.InterfaceAudience;
    import org.apache.hadoop.classification.InterfaceStability;
    import org.apache.hadoop.fs.FileSystem;
    
    /** 
     * <code>InputFormat</code> describes the input-specification for a 
     * Map-Reduce job. 
     * 
     * <p>The Map-Reduce framework relies on the <code>InputFormat</code> of the
     * job to:<p>
     * <ol>
     *   <li>
     *   Validate the input-specification of the job. 
     *   <li>
     *   Split-up the input file(s) into logical {@link InputSplit}s, each of 
     *   which is then assigned to an individual {@link Mapper}.
     *   </li>
     *   <li>
     *   Provide the {@link RecordReader} implementation to be used to glean
     *   input records from the logical <code>InputSplit</code> for processing by 
     *   the {@link Mapper}.
     *   </li>
     * </ol>
     * 
     * <p>The default behavior of file-based {@link InputFormat}s, typically 
     * sub-classes of {@link FileInputFormat}, is to split the 
     * input into <i>logical</i> {@link InputSplit}s based on the total size, in 
     * bytes, of the input files. However, the {@link FileSystem} blocksize of  
     * the input files is treated as an upper bound for input splits. A lower bound 
     * on the split size can be set via 
     * <a href="{@docRoot}/../hadoop-mapreduce-client/hadoop-mapreduce-client-core/mapred-default.xml#mapreduce.input.fileinputformat.split.minsize">
     * mapreduce.input.fileinputformat.split.minsize</a>.</p>
     * 
     * <p>Clearly, logical splits based on input-size is insufficient for many 
     * applications since record boundaries are to be respected. In such cases, the
     * application has to also implement a {@link RecordReader} on whom lies the
     * responsibilty to respect record-boundaries and present a record-oriented
     * view of the logical <code>InputSplit</code> to the individual task.
     *
     * @see InputSplit
     * @see RecordReader
     * @see JobClient
     * @see FileInputFormat
     */
    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public interface InputFormat<K, V> {
    
      /** 
       * Logically split the set of input files for the job.  
       * 
       * <p>Each {@link InputSplit} is then assigned to an individual {@link Mapper}
       * for processing.</p>
       *
       * <p><i>Note</i>: The split is a <i>logical</i> split of the inputs and the
       * input files are not physically split into chunks. For e.g. a split could
       * be <i>&lt;input-file-path, start, offset&gt;</i> tuple.
       * 
       * @param job job configuration.
       * @param numSplits the desired number of splits, a hint.
       * @return an array of {@link InputSplit}s for the job.
       */
      InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
    
      /** 
       * Get the {@link RecordReader} for the given {@link InputSplit}.
       *
       * <p>It is the responsibility of the <code>RecordReader</code> to respect
       * record boundaries while processing the logical split to present a 
       * record-oriented view to the individual task.</p>
       * 
       * @param split the {@link InputSplit}
       * @param job the job that this split belongs to
       * @return a {@link RecordReader}
       */
      RecordReader<K, V> getRecordReader(InputSplit split,
                                         JobConf job, 
                                         Reporter reporter) throws IOException;
    }
    

    -OutputFormat:将job的内容输出到文件系统

    package org.apache.hadoop.mapred;
    
    import java.io.IOException;
    
    import org.apache.hadoop.classification.InterfaceAudience;
    import org.apache.hadoop.classification.InterfaceStability;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.util.Progressable;
    
    /** 
     * <code>OutputFormat</code> describes the output-specification for a 
     * Map-Reduce job.
     *
     * <p>The Map-Reduce framework relies on the <code>OutputFormat</code> of the
     * job to:<p>
     * <ol>
     *   <li>
     *   Validate the output-specification of the job. For e.g. check that the 
     *   output directory doesn't already exist. 
     *   <li>
     *   Provide the {@link RecordWriter} implementation to be used to write out
     *   the output files of the job. Output files are stored in a 
     *   {@link FileSystem}.
     *   </li>
     * </ol>
     * 
     * @see RecordWriter
     * @see JobConf
     */
    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public interface OutputFormat<K, V> {
    
      /** 
       * Get the {@link RecordWriter} for the given job.
       *
       * @param ignored
       * @param job configuration for the job whose output is being written.
       * @param name the unique name for this part of the output.
       * @param progress mechanism for reporting progress while writing to file.
       * @return a {@link RecordWriter} to write the output for the job.
       * @throws IOException
       */
      RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
                                         String name, Progressable progress)
      throws IOException;
    
      /** 
       * Check for validity of the output-specification for the job.
       *  
       * <p>This is to validate the output specification for the job when it is
       * a job is submitted.  Typically checks that it does not already exist,
       * throwing an exception when it already exists, so that output is not
       * overwritten.</p>
       *
       * @param ignored
       * @param job job configuration.
       * @throws IOException when output should not be attempted
       */
      void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;
    }
    
    • Mapper
    package org.apache.hadoop.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.classification.InterfaceAudience;
    import org.apache.hadoop.classification.InterfaceStability;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.io.RawComparator;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.mapreduce.task.MapContextImpl;
    
    /** 
     * Maps input key/value pairs to a set of intermediate key/value pairs.  
     * 
     * <p>Maps are the individual tasks which transform input records into a 
     * intermediate records. The transformed intermediate records need not be of 
     * the same type as the input records. A given input pair may map to zero or 
     * many output pairs.</p> 
     * 
     * <p>The Hadoop Map-Reduce framework spawns one map task for each 
     * {@link InputSplit} generated by the {@link InputFormat} for the job.
     * <code>Mapper</code> implementations can access the {@link Configuration} for 
     * the job via the {@link JobContext#getConfiguration()}.
     * 
     * <p>The framework first calls 
     * {@link #setup(org.apache.hadoop.mapreduce.Mapper.Context)}, followed by
     * {@link #map(Object, Object, Context)} 
     * for each key/value pair in the <code>InputSplit</code>. Finally 
     * {@link #cleanup(Context)} is called.</p>
     * 
     * <p>All intermediate values associated with a given output key are 
     * subsequently grouped by the framework, and passed to a {@link Reducer} to  
     * determine the final output. Users can control the sorting and grouping by 
     * specifying two key {@link RawComparator} classes.</p>
     *
     * <p>The <code>Mapper</code> outputs are partitioned per 
     * <code>Reducer</code>. Users can control which keys (and hence records) go to 
     * which <code>Reducer</code> by implementing a custom {@link Partitioner}.
     * 
     * <p>Users can optionally specify a <code>combiner</code>, via 
     * {@link Job#setCombinerClass(Class)}, to perform local aggregation of the 
     * intermediate outputs, which helps to cut down the amount of data transferred 
     * from the <code>Mapper</code> to the <code>Reducer</code>.
     * 
     * <p>Applications can specify if and how the intermediate
     * outputs are to be compressed and which {@link CompressionCodec}s are to be
     * used via the <code>Configuration</code>.</p>
     *  
     * <p>If the job has zero
     * reduces then the output of the <code>Mapper</code> is directly written
     * to the {@link OutputFormat} without sorting by keys.</p>
     * 
     * <p>Example:</p>
     * <p><blockquote><pre>
     * public class TokenCounterMapper 
     *     extends Mapper&lt;Object, Text, Text, IntWritable&gt;{
     *    
     *   private final static IntWritable one = new IntWritable(1);
     *   private Text word = new Text();
     *   
     *   public void map(Object key, Text value, Context context) throws IOException, InterruptedException {
     *     StringTokenizer itr = new StringTokenizer(value.toString());
     *     while (itr.hasMoreTokens()) {
     *       word.set(itr.nextToken());
     *       context.write(word, one);
     *     }
     *   }
     * }
     * </pre></blockquote></p>
     *
     * <p>Applications may override the {@link #run(Context)} method to exert 
     * greater control on map processing e.g. multi-threaded <code>Mapper</code>s 
     * etc.</p>
     * 
     * @see InputFormat
     * @see JobContext
     * @see Partitioner  
     * @see Reducer
     */
    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    
      /**
       * The <code>Context</code> passed on to the {@link Mapper} implementations.
       */
      public abstract class Context
        implements MapContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
      }
      
      /**
       * Called once at the beginning of the task.
       */
      protected void setup(Context context
                           ) throws IOException, InterruptedException {
        // NOTHING
      }
    
      /**
       * Called once for each key/value pair in the input split. Most applications
       * should override this, but the default is the identity function.
       */
      @SuppressWarnings("unchecked")
      protected void map(KEYIN key, VALUEIN value, 
                         Context context) throws IOException, InterruptedException {
        context.write((KEYOUT) key, (VALUEOUT) value);
      }
    
      /**
       * Called once at the end of the task.
       */
      protected void cleanup(Context context
                             ) throws IOException, InterruptedException {
        // NOTHING
      }
      
      /**
       * Expert users can override this method for more complete control over the
       * execution of the Mapper.
       * @param context
       * @throws IOException
       */
      public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        try {
          while (context.nextKeyValue()) {
            map(context.getCurrentKey(), context.getCurrentValue(), context);
          }
        } finally {
          cleanup(context);
        }
      }
    }
    
    • Reducer
    package org.apache.hadoop.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.classification.InterfaceAudience;
    import org.apache.hadoop.classification.InterfaceStability;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.mapreduce.task.annotation.Checkpointable;
    
    import java.util.Iterator;
    
    /** 
     * Reduces a set of intermediate values which share a key to a smaller set of
     * values.  
     * 
     * <p><code>Reducer</code> implementations 
     * can access the {@link Configuration} for the job via the 
     * {@link JobContext#getConfiguration()} method.</p>
    
     * <p><code>Reducer</code> has 3 primary phases:</p>
     * <ol>
     *   <li>
     *   
     *   <h4 id="Shuffle">Shuffle</h4>
     *   
     *   <p>The <code>Reducer</code> copies the sorted output from each 
     *   {@link Mapper} using HTTP across the network.</p>
     *   </li>
     *   
     *   <li>
     *   <h4 id="Sort">Sort</h4>
     *   
     *   <p>The framework merge sorts <code>Reducer</code> inputs by 
     *   <code>key</code>s 
     *   (since different <code>Mapper</code>s may have output the same key).</p>
     *   
     *   <p>The shuffle and sort phases occur simultaneously i.e. while outputs are
     *   being fetched they are merged.</p>
     *      
     *   <h5 id="SecondarySort">SecondarySort</h5>
     *   
     *   <p>To achieve a secondary sort on the values returned by the value 
     *   iterator, the application should extend the key with the secondary
     *   key and define a grouping comparator. The keys will be sorted using the
     *   entire key, but will be grouped using the grouping comparator to decide
     *   which keys and values are sent in the same call to reduce.The grouping 
     *   comparator is specified via 
     *   {@link Job#setGroupingComparatorClass(Class)}. The sort order is
     *   controlled by 
     *   {@link Job#setSortComparatorClass(Class)}.</p>
     *   
     *   
     *   For example, say that you want to find duplicate web pages and tag them 
     *   all with the url of the "best" known example. You would set up the job 
     *   like:
     *   <ul>
     *     <li>Map Input Key: url</li>
     *     <li>Map Input Value: document</li>
     *     <li>Map Output Key: document checksum, url pagerank</li>
     *     <li>Map Output Value: url</li>
     *     <li>Partitioner: by checksum</li>
     *     <li>OutputKeyComparator: by checksum and then decreasing pagerank</li>
     *     <li>OutputValueGroupingComparator: by checksum</li>
     *   </ul>
     *   </li>
     *   
     *   <li>   
     *   <h4 id="Reduce">Reduce</h4>
     *   
     *   <p>In this phase the 
     *   {@link #reduce(Object, Iterable, Context)}
     *   method is called for each <code>&lt;key, (collection of values)&gt;</code> in
     *   the sorted inputs.</p>
     *   <p>The output of the reduce task is typically written to a 
     *   {@link RecordWriter} via 
     *   {@link Context#write(Object, Object)}.</p>
     *   </li>
     * </ol>
     * 
     * <p>The output of the <code>Reducer</code> is <b>not re-sorted</b>.</p>
     * 
     * <p>Example:</p>
     * <p><blockquote><pre>
     * public class IntSumReducer&lt;Key&gt; extends Reducer&lt;Key,IntWritable,
     *                                                 Key,IntWritable&gt; {
     *   private IntWritable result = new IntWritable();
     * 
     *   public void reduce(Key key, Iterable&lt;IntWritable&gt; values,
     *                      Context context) throws IOException, InterruptedException {
     *     int sum = 0;
     *     for (IntWritable val : values) {
     *       sum += val.get();
     *     }
     *     result.set(sum);
     *     context.write(key, result);
     *   }
     * }
     * </pre></blockquote></p>
     * 
     * @see Mapper
     * @see Partitioner
     */
    @Checkpointable
    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public class Reducer<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
    
      /**
       * The <code>Context</code> passed on to the {@link Reducer} implementations.
       */
      public abstract class Context 
        implements ReduceContext<KEYIN,VALUEIN,KEYOUT,VALUEOUT> {
      }
    
      /**
       * Called once at the start of the task.
       */
      protected void setup(Context context
                           ) throws IOException, InterruptedException {
        // NOTHING
      }
    
      /**
       * This method is called once for each key. Most applications will define
       * their reduce class by overriding this method. The default implementation
       * is an identity function.
       */
      @SuppressWarnings("unchecked")
      protected void reduce(KEYIN key, Iterable<VALUEIN> values, Context context
                            ) throws IOException, InterruptedException {
        for(VALUEIN value: values) {
          context.write((KEYOUT) key, (VALUEOUT) value);
        }
      }
    
      /**
       * Called once at the end of the task.
       */
      protected void cleanup(Context context
                             ) throws IOException, InterruptedException {
        // NOTHING
      }
    
      /**
       * Advanced application writers can use the 
       * {@link #run(org.apache.hadoop.mapreduce.Reducer.Context)} method to
       * control how the reduce task works.
       */
      public void run(Context context) throws IOException, InterruptedException {
        setup(context);
        try {
          while (context.nextKey()) {
            reduce(context.getCurrentKey(), context.getValues(), context);
            // If a back up store is used, reset it
            Iterator<VALUEIN> iter = context.getValues().iterator();
            if(iter instanceof ReduceContext.ValueIterator) {
              ((ReduceContext.ValueIterator<VALUEIN>)iter).resetBackupStore();        
            }
          }
        } finally {
          cleanup(context);
        }
      }
    }
    
    • Combiner
    • Partitioner

    wordcount:词频统计分析

    • wordcount 1.0
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class WordCount {
    
      /**
      *Mapper实现类通过map()方法一次处理一行数据,数据由指定的的TextInputFormat提供。
      */
      public static class TokenizerMapper
           extends Mapper<Object, Text, Text, IntWritable>{
    
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
    
        public void map(Object key, Text value, Context context
                        ) throws IOException, InterruptedException {
          StringTokenizer itr = new StringTokenizer(value.toString());//默认以空格分割
          while (itr.hasMoreTokens()) {
            word.set(itr.nextToken());
            context.write(word, one);
          }
        }
      }
    /*
    *Reducer实现类通过reduce方法对出现的key值的value进行累加
    */
      public static class IntSumReducer
           extends Reducer<Text,IntWritable,Text,IntWritable> {
        private IntWritable result = new IntWritable();
    
        public void reduce(Text key, Iterable<IntWritable> values,
                           Context context
                           ) throws IOException, InterruptedException {
          int sum = 0;
          for (IntWritable val : values) {
            sum += val.get();
          }
          result.set(sum);
          context.write(key, result);
        }
      }
    
      public static void main(String[] args) throws Exception {
        Configuration conf = new Configuration();
        Job job = Job.getInstance(conf, "word count");
        job.setJarByClass(WordCount.class);
        job.setMapperClass(TokenizerMapper.class);
    //对于每个map输出的键值对的“key”排序后,通过本地Combiner进行本地聚合
        job.setCombinerClass(IntSumReducer.class);
        job.setReducerClass(IntSumReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        System.exit(job.waitForCompletion(true) ? 0 : 1);
      }
    }
    
    • Mapper
      MapReduce为作业的InputFormat生成的每一个InputSplit产生map任务
      通过调用context.write(WritableComparable,Writable)来收集输出对
      用户可以通过Job.setGroupingComparatorClass(Class)来指定Comparator从而控制分组,通常需要继承WritableCompator
      用户可以通过Job.setSortComparatorClass(Class)来指定Comparator从而控制传递给Reduce前如何对key进行排序,通常需要继承WritableCompator
      用户可以通过Job.setPartitionerClass(Class)来指定Partitioner从而控制key和reduce的映射关系,partition的数量和reduce任务的数量是相同的
      用户可以通过Job.setCombinerClass(Class)对map的中间输出进行本地聚合,减少map向reduce的数据传输量,需继承Reducer
    • Reducer
      Reducer有3个主要的阶段:shuffle、sort和reduce
    public static class GroupingComparator extends WritableComparator
        {
            protected GroupingComparator()
            {
                super(IntPair.class, true);
            }
            @Override
            //Compare two WritableComparables.
            public int compare(WritableComparable w1, WritableComparable w2)
            {
                IntPair ip1 = (IntPair) w1;
                IntPair ip2 = (IntPair) w2;
                int l = ip1.getFirst();
                int r = ip2.getFirst();
                return l == r ? 0 : (l < r ? -1 : 1);
            }
        }
    
    public class KeyPartitioner extends Partitioner<TextInt, IntWritable> {
        @Override
        public int getPartition(TextInt key, IntWritable value, int numPartitions) {
            // TODO Auto-generated method stub
            return (key.getFirstKey().hashCode()&Integer.MAX_VALUE)%numPartitions;
        }
    }
    

    运行:$ bin/hadoop jar wc.jar WordCount /user/joe/wordcount/input /user/joe/wordcount/output

    • -file:以逗号分割的路径列表,这些路径会出现在当前任务的工作路径中
    • -libjars:以逗号分割的jar包,添加到map和reduce的类路径中
    • -archives:以逗号分割的压缩包列表,压缩包未归档,并且在当前工作路径创建了和压缩包同名的链接;
      下面的myarchive.zip会解压在名为“myarchive.zip”的目录中
      bin/hadoop jar hadoop-mapreduce-examples-<ver>.jar wordcount -files cachefile.txt -libjars mylib.jar -archives myarchive.zip input output
    • WordCount2.0
    package cn.test.mapreduce;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Counter;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.GenericOptionsParser;
    import org.apache.hadoop.util.StringUtils;
    
    import java.io.BufferedReader;
    import java.io.FileReader;
    import java.io.IOException;
    import java.net.URI;
    import java.util.*;
    
    public class WordCount2 {
    
        public static class TokenizerMapper extends Mapper<Object, Text,Text, IntWritable>{
    
            static enum CountersEnum {INPUT_WORDS}
    
            private final static IntWritable one = new IntWritable(1);
            private Text word = new Text();
    
            private boolean caseSensitive;
            private Set<String> patternsToSkip = new HashSet<String>();
    
            private Configuration conf;
            private BufferedReader fis;
    
            @Override
            protected void setup(Context context) throws IOException, InterruptedException {
                conf = context.getConfiguration();
                //获取设置的属性值,并指定默认值
                caseSensitive = conf.getBoolean("wordcount.case.sensitive",true);
                if(conf.getBoolean("wordcount.skip.patterns",false)){
                    URI[] patternsURIs = Job.getInstance(conf).getCacheFiles();
                    for(URI patternsURI : patternsURIs){
                        Path patternsPath = new Path(patternsURI.getPath());
                        String patternsFileName = patternsPath.getName().toString();
                        parseSkipFile(patternsFileName);
                    }
                }
            }
    
            private void parseSkipFile(String fileName) {
                try{
                    fis = new BufferedReader(new FileReader(fileName));
                    String pattern = null;
                    while((pattern = fis.readLine())!=null){
                        patternsToSkip.add(pattern);
                    }
                } catch (IOException ioe){
                    System.err.println("Caught exception while parsing the cached file'"
                    + StringUtils.stringifyException(ioe));
                }
            }
    
            @Override
            protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                String line = caseSensitive ? value.toString(): value.toString().toLowerCase();
                for (String pattern : patternsToSkip){
                    line = line.replaceAll(pattern, "");
                }
                StringTokenizer itr = new StringTokenizer(line);
                while (itr.hasMoreTokens()){
                    word.set(itr.nextToken());
                    context.write(word,one);
                    Counter counter = context.getCounter(CountersEnum.class.getName(),CountersEnum.INPUT_WORDS.toString());
                    counter.increment(1);
    
                }
            }
        }
    
        public static class IntSumReducer extends Reducer<Text,IntWritable,Text,IntWritable>{
    
            private IntWritable result = new IntWritable();
    
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int sum = 0;
                for(IntWritable val : values){
                    sum += val.get();
                }
                result.set(sum);
                context.write(key,result);
            }
        }
    
        public static void main(String args[]) throws Exception{
            Configuration conf = new Configuration();
            GenericOptionsParser optionsParser = new GenericOptionsParser(conf, args);
            //返回仅包含特定于应用程序的参数的字符串数组。
            String[] remainingArgs = optionsParser.getRemainingArgs();
            if (remainingArgs.length!=2&&remainingArgs.length!=4){
                System.out.println("Usage: wordcount <in> <out> [-skip skipPatternFile]");
                System.exit(2);
            }
            Job job = Job.getInstance(conf,"wordCount");
            job.setJarByClass(WordCount2.class);
            job.setMapperClass(TokenizerMapper.class);
            job.setCombinerClass(IntSumReducer.class);
            job.setReducerClass(IntSumReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            List<String> otherArgs = new ArrayList<String>();
            for (int i=0;i<remainingArgs.length;i++){
                if("-skip".equals(remainingArgs[i])){
                    job.addCacheFile(new Path(remainingArgs[++i]).toUri());
                    //设置配置自定义参数属性值
                    job.getConfiguration().setBoolean("wordcount.skip.patterns", true);
                } else{
                    otherArgs.add(remainingArgs[i]);
                }
            }
            FileInputFormat.addInputPath(job, new Path(otherArgs.get(0)));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs.get(1)));
    
            System.exit(job.waitForCompletion(true)?0:1);
        }
    }
    
    • GenericOptionsParser
      解析hadoop框架通用命令行参数,使应用程序可以轻松指定namenode,ResourceManager,其他配置资源等。
      支持的命令行参数:bin/hadoop command [genericOptions] [commandOptions]
      -conf<配置文件> 指定配置文件$ bin/hadoop dfs -conf core-site.xml -conf hdfs-site.xml -ls /data
      -D <property=value> 使用给定属性的值$ bin/hadoop dfs -D fs.default.name=darwin:8020 -ls /data
      -fs <local|namenode:port> 指定namenode$ bin/hadoop dfs -fs darwin:8020 -ls /data
      -jt <local|resourcemanager:port>指定ResourceManager$ bin/hadoop job -jt local -submit job.xml
      -files
      -libjars
      -archives
    • DistributedCache
      有效地分发特定于应用程序的大型只读文件,用于缓存应用程序所需的文件(文本,存档,jar等)。
      文件/归档文件分发可以通过设置属性 mapreduce.job.cache.{files |archives},以逗号进行分割;
      应用程序中通过API Job.addCacheFile(URI)/ Job.addCacheArchive(URI)(URI默认是HDFS系统上的文件)
      Streaming可以在命令行通过 -cacheFile/-cacheArchive分发文件
    • Counter
      使用enum类名作为Counter的组名,enum的成员为Counter的名字
    //清理已存在的输出目录
            Path outputPath = new Path(args[1]);
            FileSystem fileSystem = FileSystem.get(conf);
            if(fileSystem.exists(outputPath)){
                fileSystem.delete(outputPath, true);
                System.out.println("existed file has deleted");
            }
    
    • jobhistory
      记录已运行完的MapReduce信息到指定的HDFS目录下,默认没有开启该功能。
      mapred-site.xml加上下列配置
      mr-jobhistory-daemon.sh start historyserver 启动
      jps -----》 JobHistoryServer
        <property>
            <name>mapreduce.jobhistory.address</name>
            <value>master:10020</value>
        </property>
    
        <property>
            <name>mapreduce.jobhistory.webapp.address</name>
            <value>master:19888</value>
        </property>
    
        <property>
            <name>mapreduce.jobhistory.done-dir</name>
            <value>/history/done</value>
        </property>
    
        <property>
            <name>mapreduce.jobhisory.intermediate-done-dir</name>
            <value>/history/done_intermediate</value>
        </property>
    

    yarn-site.xml 加上下列配置开启聚合功能,可以查看日志信息

        <property>
            <name>yarn.log-aggregation-enable</name>
            <value>true</value>
        </property>
    

    相关文章

      网友评论

        本文标题:MapReduce示例

        本文链接:https://www.haomeiwen.com/subject/ouaptqtx.html