美文网首页我爱编程
hadoop 学习笔记(5)-- MapReduce 开发(1)

hadoop 学习笔记(5)-- MapReduce 开发(1)

作者: millions_chan | 来源:发表于2017-05-08 23:05 被阅读172次

    1 configuration api

    Hadoop 组件的配置使用 XML 形式的配置文件,并且可以使用 ${变量名} 的形式来使用其他属性的值,例如:

    <?xml version="1.0"?>
    <configuration>
      <property>
         <name>color</name> 
         <value>yellow</value>
         <description>Color</description>
      </property>
      <property>
        <name>size</name>
        <value>10</value>         
        <description>Size</description>
      </property>
    
      <property>
        <name>weight</name>
        <value>heavy</value>
        <final>true</final>     
        <description>Weight</description>
      </property>
    
      <property>
        <name>size-weight</name>
        <value>${size},${weight}</value>       
        <description>Size and weight</description>
      </property>
    </configuration>
    

    这样可以使用 Configuration 类来读取数据:

    Configuration conf = new Configuration();
    conf.addResource("configuration-1.xml");
    assertThat(conf.get("color"), is("yellow")); 
    assertThat(conf.getInt("size", 0), is(10)); 
    assertThat(conf.get("breadth", "wide"), is("wide"));
    

    也可以添加多个配置文件:

    Configuration conf = new Configuration(); 
    conf.addResource("configuration-1.xml");
    conf.addResource("configuration-2.xml");
    

    2 开发环境的搭建

    hadoop 的 IO 可以使用多种文件系统,所以可以允许在开发环境、本地环境以及集群环境。开发环境下可以使用 Maven 方便获取相关的库:

     <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <hadoop.version>2.5.1</hadoop.version>
      </properties>
      <dependencies>
        <!-- Hadoop main client artifact -->
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <version>${hadoop.version}</version>
        </dependency>
        <!-- Unit test artifacts -->
        <dependency>
          <groupId>junit</groupId>
          <artifactId>junit</artifactId>
          <version>4.11</version>
          <scope>test</scope>
        </dependency>
    
        <dependency>
          <groupId>org.apache.mrunit</groupId>
          <artifactId>mrunit</artifactId>
          <version>1.1.0</version>
          <classifier>hadoop2</classifier>
          <scope>test</scope>
        </dependency>
        <!-- Hadoop test artifact for running mini clusters -->
        <dependency>
          <groupId>org.apache.hadoop</groupId>
          <artifactId>hadoop-minicluster</artifactId>
          <version>${hadoop.version}</version>
          <scope>test</scope>
        </dependency>
      </dependencies>
      <build>
        <finalName>hadoop-examples</finalName>
        <plugins>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-compiler-plugin</artifactId>
            <version>3.1</version>
            <configuration>
              <source>1.6</source>
              <target>1.6</target>
            </configuration>
          </plugin>
          <plugin>
            <groupId>org.apache.maven.plugins</groupId>
            <artifactId>maven-jar-plugin</artifactId>
            <version>2.5</version>
            <configuration>
              <outputDirectory>${basedir}</outputDirectory>
            </configuration>
          </plugin>
        </plugins>
      </build>
    

    为了切换开发、本地和集群环境,我们来创建三个配置文件:

    (1)使用本地文件系统的开发环境:

    <?xml version="1.0"?>
    <configuration>
        <property> 
          <name>fs.defaultFS</name>
          <value>file:///</value>
        </property>
        <property>
          <name>mapreduce.framework.name</name>       
          <value>local</value>
        </property>
    </configuration>
    

    (2)本地伪单机:

    <?xml version="1.0"?>
    <configuration>
        <property> 
          <name>fs.defaultFS</name>
          <value>hdfs://localhost:9000/</value>
        </property>
        <property>
          <name>mapreduce.framework.name</name>       
          <value>yarn</value>
        </property>
        <property>
           <name>yarn.resourcemanager.address</name>         
           <value>localhost:8032</value>
        </property>
    </configuration>
    

    (3) 集群:

    <configuration>
        <property> 
          <name>fs.defaultFS</name>
          <value>hdfs://namenode/</value>
        </property>
        <property>
          <name>mapreduce.framework.name</name>       
          <value>yarn</value>
        </property>
        <property>
           <name>yarn.resourcemanager.address</name>         
           <value>resourcemanager:8032</value>
        </property>
    </configuration>
    

    这样,就可以使用 -conf 选项选择使用的配置文件了:

    hadoop fs -conf conf/hadoop-localhost.xml -ls .

    如果没有 -conf,则会读取 HADOOP_HOME 下的文件夹 etc/hadoop 下的配置文件。

    另外一种方式是将 $HADOOP_HOME/etc/hadoop 下的文件拷贝到其他文件夹,然后设置 HADOOP_CONF_DIR 来切换环境。

    3 使用 mrunit 开发单元测试

    在 test 目录下生成测试类,先来测试一个不应该输出任何结果的,这里 MapperDriver 类不带任何withOutput,就是指没有输出,有几个输出就对应几个withOutput:

    import java.io.IOException;
    import java.util.Arrays;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mrunit.mapreduce.MapDriver;
    import org.apache.hadoop.mrunit.mapreduce.ReduceDriver;
    import org.junit.Test;
    
    public class MaxTemperatureMapperTest {
      @Test
      public void ignoresMissingTemperatureRecord() throws IOException, InterruptedException {
        Text value = new Text("0043011990999991950051518004+68750+023550FM-12+0382" + // Year
            "99999V0203201N00261220001CN9999999N9+99991+99999999999"); // Temperature
        // 没有 withOutput 所以该测试要通过必须没有输出
        new MapDriver<LongWritable, Text, Text, IntWritable>()
            .withMapper(new v2.MaxTemperatureMapper())
            .withInput(new LongWritable(0), value)
            .runTest();
      }
    }
    

    然后加上一个 reducer 的测试:

    
    @Test
    @Test
      public void returnsMaximumIntegerInValues() throws IOException, InterruptedException {
        new ReduceDriver<Text, IntWritable, Text, IntWritable>().withReducer(
            new v2.MaxTemperatureReducer())
            .withInput(new Text("1950"),
                Arrays.asList(new IntWritable(10), new IntWritable(5)))
            .withOutput(new Text("1950"), new IntWritable(10))
            .runTest();
      }
    

    给 1950 年输入 10 和 5,应该输出 1950 10 这样的结果(mapper 和 reducer 就是前面计算天气的)。

    通过测试后,我们就可以在本地的小数据集上运行程序测试了,这样比较好 debug。这里我们继承了 Configured 类,实现了 Tool 接口,并使用 ToolRunner 运行:

    public class MaxTemperatureDriver extends Configured implements Tool {
      @Override
      public int run(String[] args) throws Exception {
        if (args.length != 2) {
          System.err.printf("Usage: %s [generic options] <input> <output>\n",
              getClass().getSimpleName());
          ToolRunner.printGenericCommandUsage(System.err);
          return -1;
        }
        Job job = Job.getInstance(getConf(), "Max temperature");
        job.setJarByClass(getClass());
        FileInputFormat.addInputPath(job, new Path(args[0]));
        FileOutputFormat.setOutputPath(job, new Path(args[1]));
        job.setMapperClass(MaxTemperatureMapper.class);
        job.setCombinerClass(MaxTemperatureReducer.class);
        job.setReducerClass(MaxTemperatureReducer.class);
        job.setOutputKeyClass(Text.class);
        job.setOutputValueClass(IntWritable.class);
        return job.waitForCompletion(true) ? 0 : 1;
      }
    
      public static void main(String[] args) throws Exception {
        int exitCode = ToolRunner.run(new MaxTemperatureDriver(), args);
        System.exit(exitCode);
      }
    }
    

    这里 ToolRunner 主要根据命令行参数解析出 Configuration,Configured 使该类可以 getConf 和 setConf,Tool 主要是提供了 run 方法。下面的代码是 ToolRunner 调用 GenericOptionsParser 来解析配置文件:

    public static int run(Configuration conf, Tool tool, String[] args) 
        throws Exception{
        if(conf == null) {
          conf = new Configuration();
        }
        GenericOptionsParser parser = new GenericOptionsParser(conf, args);
        //set the configuration back, so that Tool can configure itself
        tool.setConf(conf);
        
        //get the args w/o generic hadoop args
        String[] toolArgs = parser.getRemainingArgs();
        return tool.run(toolArgs);
      }
    

    然后就来执行了:

    % mvn compile
    % export HADOOP_CLASSPATH=target/classes/
    % hadoop v2.MaxTemperatureDriver -conf conf/hadoop-local.xml \
          input/ncdc/micro output
    

    或不指定配置文件,使用 -fs 指定文件系统, -jt 指定 yarn执行:

    //local 指使用本地开发环境
    hadoop v2.MaxTemperatureDriver -fs file:/// -jt local input/ncdc/micro output
    

    测试

    @Test
      public void test() throws Exception {
        Configuration conf = new Configuration();
        conf.set("fs.defaultFS", "file:///");
        conf.set("mapreduce.framework.name", "local");
        conf.setInt("mapreduce.task.io.sort.mb", 1);
        Path input = new Path("input/sample");
        Path output = new Path("output");
        FileSystem fs = FileSystem.getLocal(conf);
        fs.delete(output, true); // delete old output
        MaxTemperatureDriver driver = new MaxTemperatureDriver();
        driver.setConf(conf);
        int exitCode = driver.run(new String[] {input.toString(), output.toString()});
        assertThat(exitCode, is(0));
        //一行一行检查输出是否正确
        checkOutput(conf, output);
      }
    

    4 在集群上运行 hadoop

    通过了测试,得到了正确的结果之后,就要真刀真枪的在集群上运行程序了。要在集群上运行,必须先打成一个 jar 包,上面的 pom 文件中使用 maven-jar-plugin 来进行打包,使用 maven 命令 mvn package -DskipTests 即可。如果不在 manifest 中 指定 main class 的话,记得运行时在命令行中要指定主类。为了方便,可以像 war 包一样,把依赖的 jar 全部打包到 运行 jar 包的 lib 目录下,把配置文件打包到运行 jar 包的 classes 目录下,这些用 maven 插件都很简单。

    用户的 classpath 由下面三部分组成:

    1. job JAR;
    2. job JAR 下面的 lib 目录和 classes 目录;
    3. 环境变量 HADOOP_CLASSPATH 指定的目录。

    在集群中,有所变化,HADOOP_CLASSPATH 不再生效,因为它仅仅对 driver 运行的 JVM 有效。:

    1. job JAR;
    2. job JAR 下面的 lib 目录和 classes 目录;
    3. 任何使用命令行参数 -libjars 加入分布式缓存的文件

    所以运行时有三种方式:

    1. 把 jar 包解压缩然后打包进运行的 jar 中;
    2. 把 jar 包打包进 运行 jar 包的 lib 目录下;
    3. 使用 HADOOP_CLASSPATH 将依赖加入 client 的 classpath 中,然后用 -libjars 命令将其加入分布式缓存中。

    在用户侧,可以设置环境变量 HADOOP_USER_CLASSPATH_FIRST 来让用户选择的库被优先使用;在集群中,可以设置 mapreduce.job.user.classpath.first 为 true 来让用户的库被优先使用。

    运行下面的命令可以提交一个任务到 hadoop 集群运行:

    hadoop jar hadoop-examples.jar v2.MaxTemperatureDriver -fs hdfs://192.168.0.133:9000 -jt 192.168.0.133:8032 file:///home/hadoop/input max-temp

    由于 input 是本地文件,所以加上了 file:/// 前缀。

    最终输出:

        File System Counters
            FILE: Number of bytes read=46341975
            FILE: Number of bytes written=317163
            FILE: Number of read operations=0
            FILE: Number of large read operations=0
            FILE: Number of write operations=0
            HDFS: Number of bytes read=176
            HDFS: Number of bytes written=27
            HDFS: Number of read operations=7
            HDFS: Number of large read operations=0
            HDFS: Number of write operations=2
        Job Counters
            Launched map tasks=2
            Launched reduce tasks=1
            Data-local map tasks=2
            Total time spent by all maps in occupied slots (ms)=27621
            Total time spent by all reduces in occupied slots (ms)=13410
            Total time spent by all map tasks (ms)=27621
            Total time spent by all reduce tasks (ms)=13410
            Total vcore-seconds taken by all map tasks=27621
            Total vcore-seconds taken by all reduce tasks=13410
            Total megabyte-seconds taken by all map tasks=28283904
            Total megabyte-seconds taken by all reduce tasks=13731840
        Map-Reduce Framework
            Map input records=211054
            Map output records=208834
            Map output bytes=1879506
            Map output materialized bytes=56
            Input split bytes=176
            Combine input records=208834
            Combine output records=4
            Reduce input groups=3
            Reduce shuffle bytes=56
            Reduce input records=4
            Reduce output records=3
            Spilled Records=8
            Shuffled Maps =2
            Failed Shuffles=0
            Merged Map outputs=2
            GC time elapsed (ms)=456
            CPU time spent (ms)=8530
            Physical memory (bytes) snapshot=660267008
            Virtual memory (bytes) snapshot=5742837760
            Total committed heap usage (bytes)=464519168
        Shuffle Errors
            BAD_ID=0
            CONNECTION=0
            IO_ERROR=0
            WRONG_LENGTH=0
            WRONG_MAP=0
            WRONG_REDUCE=0
        File Input Format Counters
            Bytes Read=46341925
        File Output Format Counters
            Bytes Written=27
    

    相关文章

      网友评论

        本文标题:hadoop 学习笔记(5)-- MapReduce 开发(1)

        本文链接:https://www.haomeiwen.com/subject/anwptxtx.html