美文网首页自由世界
hadoop2.*学习笔记三(YARN+ MapReduce+E

hadoop2.*学习笔记三(YARN+ MapReduce+E

作者: Notput | 来源:发表于2016-07-19 15:20 被阅读1215次

    关于hadoop hdfs配置请参考hadoop学习笔记二


    内容概括:在笔记二所在的环境上配置 YARNMapReduce。然后在eclipse中远程连接hdfs并运行一个简单的demo。这有一篇IBM的文章写得非常好关于YARN的值得一看。

    个人观点:由于我们平时使用的服务器上大多有多个网卡也就是可能会有多个ip所以在配置关于ip时还是配置特定的ip比较保险,在多网卡环境下不同网卡可能处在不同的网络中对于不明确的配置对远程连接有一定的误导性。所以这里先改下之前带有ip配置的配置文件,改为固定某个ip。

    • 在hosts中添加ip和名字的映射,这样就可以在配置文件中使用同一个名称就可以代表同一个ip,同时方便修改名字所对应的ip地址。
      编辑/etc/hosts添加下面内容到末尾
    172.16.94.128 master
    

    修改core-site.xml

    <configuration>
        <property>
            <name>hadoop.tmp.dir</name>
            <value>file:/usr/local/hadoop/tmp</value>
            <description>Abase for other temporary directories.</description>
        </property>
        <property>
            <name>fs.defaultFS</name>
            <!-- 修改localhost为master-->
            <value>hdfs://master:9000</value>
        </property>
    </configuration>
    

    配置YARN:

    重命名mapred-site.xml.template

    mv ./etc/hadoop/mapred-site.xml.template ./etc/hadoop/mapred-site.xml
    

    修改mapred-site.xml为:

    <configuration>
    <!--新框架支持第三方 MapReduce 开发框架以支持如 SmartTalk/DGSG 等非 Yarn 架构,注意通常情况下这个配置的值都设置为 Yarn,如果没有配置这项,那么提交的 Yarn job 只会运行在 locale 模式,而不是分布式模式。-->
     <property>
        <name>mapreduce.framework.name</name>
        <value>yarn</value>
     </property>
    
    <!--添加此属性以便远程创建MapReduce Job使用-->
     <property>
        <name>mapreduce.jobtracker.address</name>
        <value>master:9001</value>
     </property>
    </configuration>
    

    修改yarn-site.xml为:

    <configuration>
    
    <!-- Site specific YARN configuration properties -->
     <property>
        <name>yarn.nodemanager.aux-services</name>
        <value>mapreduce_shuffle</value>
     </property>
    </configuration>
    

    启动yarn(必须保证hdfs已经启动),查看hadoop启动了哪些可以用jps命令。hadoop启动后有相应的web页面可以查看关于hadoop的hdfs相关信息,在浏览器输入http://172.16.94.128:50070 就可以看到如下页面:

    Paste_Image.png
    $ jps
      4564 NameNode
      4692 DataNode
      4857 SecondaryNameNode
      8861 Jps
    
    --启动yarn
    $ ./sbin/start-yarn.sh
    
    --查看是否启动成,出现ResourceManager和NodeManager表示yarn启动成功
    $ jps
      9042 NodeManager
      9347 Jps
      4564 NameNode
      4692 DataNode
      4857 SecondaryNameNode
      8925 ResourceManager
    
    --如果要记录job执行日志则需要启动historyserver
    $ ./sbin/mr-jobhistory-daemon.sh start historyserver
    
    $ jps
    9042 NodeManager
    4564 NameNode
    4692 DataNode
    9605 JobHistoryServer
    4857 SecondaryNameNode
    8925 ResourceManager
    9647 Jps
    --出现JobHistoryServer就表示启动成,关于historyserver的目录或者ip和端口可以在mapred-site.xml中配置,这里先用默认的配置。
    

    yarn启动后可以用web页面查看任务,在浏览器里面输入http://172.16.94.128:8088 就可看见如下界面:

    Paste_Image.png

    historyServer 也有web界面可以查看记录,在浏览器里面输入http://172.16.94.128:19888 就可以看见如下界面:

    Paste_Image.png

    如果无法访问请查看网络和端口是否被防火墙屏蔽,一开始学习建议关闭防火墙。centos 7防火墙操作

    到这里基础模块都启动完成,下面先跑一个hadoop自带的例子然后再在eclipse上面配置hdfs和一个demo。

    同linux文件系统一样在使用hdfs之前先创建用户目录:

    -- -p创建多层目录
    $ ./bin/hdfs dfs -mkdir -p /user/hadoop
    --创建好用户目录后我们在当前用户目录下面建一个存储mapreduce输入文件的目录
    $ ./bin/hdfs dfs -mkdir input
    

    运行hadoop自带的demo:

    --为了测试我们把hadoop的配置文件导入到 input中
    $ ./bin/hdfs dfs -put ./etc/hadoop/*.xml input
    --查看是否成功导入
    $ ./bin/hdfs dfs -ls input
    -rw-r--r--   1 hadoop   supergroup       4436 2016-07-15 00:50 input/capacity-scheduler.xml
    -rw-r--r--   1 hadoop   supergroup       1071 2016-07-15 00:50 input/core-site.xml
    -rw-r--r--   1 hadoop   supergroup       9683 2016-07-15 00:50 input/hadoop-policy.xml
    -rw-r--r--   1 hadoop   supergroup       1233 2016-07-15 00:50 input/hdfs-site.xml
    -rw-r--r--   1 hadoop   supergroup        620 2016-07-15 00:50 input/httpfs-site.xml
    -rw-r--r--   1 hadoop   supergroup       3518 2016-07-15 00:50 input/kms-acls.xml
    -rw-r--r--   1 hadoop   supergroup       5511 2016-07-15 00:50 input/kms-site.xml
    -rw-r--r--   1 hadoop   supergroup        938 2016-07-15 00:50 input/mapred-site.xml
    -rw-r--r--   1 hadoop   supergroup        792 2016-07-15 00:50 input/yarn-site.xml
    --运行hadoop的一个例子,找出所有符合正则表达式的词语并存储在output(由程序新建,如果已经存在需删除后再运行demo)目录下面
    $ ./bin/hadoop jar ./share/hadoop/mapreduce/hadoop-mapreduce-examples-*.jar grep input output 'dfs[a-z.]+'
    --运行后可以在对应的web界面看到任何的运行状态
    --查看执行后的output目录
    $ ./bin/hdfs dfs -ls output/
    -rw-r--r--   1 hadoop supergroup          0 2016-07-15 00:53 output/_SUCCESS
    -rw-r--r--   1 hadoop supergroup         95 2016-07-15 00:53 output/part-r-00000
    --我们所需要的运行结果存储在part*所在的文件中,打开这些文件就可看到我们需要的结果
    $ ./bin/hdfs dfs -cat output/part-r-00000
    1   dfsadmin
    1   dfs.replication
    1   dfs.permissions
    1   dfs.namenode.name.dir
    1   dfs.datanode.data.dir
    --列出了出现次数和对应的词语,这里可以把在hdfs中的文件提取到系统文件系统中来。
    $ ./bin/hdfs dfs -get output ./output 
    --hdfs还有很多命令用来完成不同的功能这里就不一一说了。
    

    远程eclipse查看当前hadoop hdfs中的文件配置:

    1 在hdfs-site.xml中添加一个关于权限的配置属性并设置成false(因为在eclipse hadoop插件访问hdfs是获取的eclipse所在的计算机的用户名只要不和hadoop hdfs的用户名(hadoop)不一致就无法进行读取和上传操作)

        <property>
             <name>dfs.permissions</name>
             <value>false</value>
         </property>
    

    2 下载eclipse ide和hadoop eclipse 插件,这里我贴出我所使用的eclipse 版本和 hadoop eclipse 插件版本,直接到eclipse官网下载,插件到hadoop github上下载就行:

    Paste_Image.png

    插件:hadoop2x-eclipse-plugin-2.6.0.zip

    3 解压插件并将 release 中的 hadoop-eclipse-kepler-plugin-2.6.0.jar 复制到 Eclipse 安装目录的 plugins 文件夹中,重启eclipse 插件安装成功,并会出现以下图标。

    Paste_Image.png

    4 在eclipse显示出hdfs的配置窗口和配置相应参数
    点击eclipse 菜单Window -> Show View -> Ohter则会出现下面界面并选择Map/Reduce Locations然后ok。

    Paste_Image.png Paste_Image.png

    在上图中右击选择New Hadoop location...

    Paste_Image.png Paste_Image.png

    说明:Location name自定义
    Map/Reduce(V2) Master 同mapred-site.xml mapreduce.jobtracker.address属性配置。
    DFS Master 同core-site.xml fs.defaultFS属性的配置。
    User name 填写hadoop hdfs用户名。

    5 点击Finish并在Project Explorer中查看hdfs的内容,如果没有出现以下内容可以右键刷新

    Paste_Image.png

    在eclipse中实现一个远程MapReduce demo(WordCount)

    先了解下如何提交jobjob的执行过程,以及java对hdfs的CRUD的一些api demo。还有关于hdfs中文件的权限说明多用户对文件的权限设置。在这个测试之前一定要确保hadoop环境中的防火墙是关闭了或者hadoop所使用的端口都加入防火墙端口信任列表了(血的教训啊,为一个端口没有加入信任列表无法远程提交job害我找了好久的问题)。

    1 创建一个maven project并在pom.xml添加如下依赖和编译和打包插件。(这里我们使用maven来管理hadoop的依赖和项目打包)

        --添加依赖
           <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-common</artifactId>
                <version>2.7.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-hdfs</artifactId>
                <version>2.7.2</version>
            </dependency>
            <dependency>
                <groupId>org.apache.hadoop</groupId>
                <artifactId>hadoop-client</artifactId>
                <version>2.7.2</version>
            </dependency>
    
    --添加编译和打包插件
      <build>
      <plugins>
       <!--打包工具-->
      <plugin> 
        <groupId> org.apache.maven.plugins </groupId> 
        <artifactId> maven-jar-plugin </artifactId> 
        <configuration> 
          <archive> 
            <manifest> 
              <addClasspath>true</addClasspath> 
              <!--指定程序入口-->
              <mainClass>WordCount</mainClass> 
            </manifest> 
          </archive> 
        </configuration> 
      </plugin>
       <!--指定编译jdk版本-->
      <plugin>
       <groupId>org.apache.maven.plugins</groupId>
       <artifactId>maven-compiler-plugin</artifactId>
       <version>3.5.1</version>
       <configuration>
        <source>1.8</source>
        <target>1.8</target>
       </configuration>
      </plugin>
      </plugins>
     </build>
    

    2 创建一个M/R job(这里代码是从其他地方拷贝过来修改了一点点):

    import java.text.SimpleDateFormat;
    import java.util.Date;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class WordCount {
    
        public static void main(String[] args) throws Exception {
            /**
             * 连接hadoop集群配置,也可以直接把hadoop的配置文件拷贝到当前项目的resource目录下并修改里面带有IP的配置为服务器的IP地址,然后使用Configuration的api加载那些配置文件(这里就不举例了),在代码中没有配置或者在你复制出来的配置文件中没有配置的属性都是使用的默认配置,也就是说里面包含一些默认IP的配置大多都是0.0.0.0,请把可能用到的带有ip的配置都把ip改为服务器真实ip,以免在测试过程中导致无法连接而失败,这里我通过代码把我所可能用到的都重新配置了。对于这些配置的解释可以看对应配置文件的说明。
             */
            Configuration conf = new Configuration(true);
            conf.set("fs.defaultFS", "hdfs://172.16.94.128:9000");
            conf.set("hadoop.job.user", "hadoop");
            conf.set("mapreduce.framework.name", "yarn");
          //配置job提交的目录 '/user'表示提交到当前用户(这里是hadoop)的目录下
            conf.set("yarn.app.mapreduce.am.staging-dir", "/user");
            conf.set("mapreduce.jobhistory.address", "172.16.94.128:10020");
            conf.set("mapreduce.jobtracker.address", "172.16.94.128:9001");
            conf.set("yarn.resourcemanager.hostname", "172.16.94.128");
            conf.set("yarn.resourcemanager.admin.address", "172.16.94.128:8033");
            conf.set("yarn.resourcemanager.address", "172.16.94.128:8032");
            conf.set("yarn.resourcemanager.resource-tracker.address", "172.16.94.128:8031");
            conf.set("yarn.resourcemanager.scheduler.address", "172.16.94.128:8030");
            //使支持跨系统提交job
            conf.set("mapreduce.app-submission.cross-platform", "true");
            //指定我们打包后的jar包位置,在eclipse中运行此项目时会根据这个路径找到jar包上传到hadoop中yarn.app.mapreduce.am.staging-dir所对应的地方,否则会出现CLass Not Found的错误。
            // 所以在执行前应该先使用maven clean install把当前项目打包,才能在运行时找到jar包。
            //网上还有介绍了一些通过代码实现打包的方法,这里不介绍,因为在正常开发时都是把生成的jar包之间上传到想要的目录,只是测试时我们用得到而已,所以如果是要手动上传jar包下面这句可以注释掉。
            conf.set("mapred.jar","target/baosh-storage-hadoop-0.0.1-SNAPSHOT.jar");
            String[] otherArgs = new String[2];
            otherArgs[0] = "hdfs://172.16.94.128:9000/user/hadoop/input";//计算原文件目录,需提前在里面存入文件
            String time = new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
            otherArgs[1] = "hdfs://172.16.94.128:9000/user/hadoop/test_out/" + time;//计算后的计算结果存储目录,每次程序执行的结果目录不能相同,所以添加时间标签
            /*
             * setJobName()方法命名这个Job。对Job进行合理的命名有助于更快地找到Job,
             * 以便在JobTracker和Tasktracker的页面中对其进行监视
             */
            Job job = Job.getInstance(conf, "word count");
            
            job.setJarByClass(WordCount.class);
            System.out.println(job.getJar());
      // job.setMaxMapAttempts(100);//设置最大试图产生底map数量,该命令不一定会设置该任务运行过车中的map数量
            // job.setNumReduceTasks(5);//设置reduce数量,即最后生成文件的数量
    
            /*
             * Job处理的Map(拆分)、Combiner(中间结果合并)以及Reduce(合并)的相关处理类。
             * 这里用Reduce类来进行Map产生的中间结果合并,避免给网络数据传输产生压力。
             */
            job.setMapperClass(TokenizerMapper.class);// 执行用户自定义map函数
            job.setCombinerClass(IntSumReducer.class);// 对用户自定义map函数的数据处理结果进行合并,可以减少带宽消耗
            job.setReducerClass(IntSumReducer.class);// 执行用户自定义reduce函数
    
            /*
             * 接着设置Job输出结果<key,value>的中key和value数据类型,因为结果是<单词,个数>,
             * 所以key设置为"Text"类型,相当于Java中String类型
             * 。Value设置为"IntWritable",相当于Java中的int类型。
             */
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            /*
             * 加载输入文件夹或文件路径,即输入数据的路径
             * 将输入的文件数据分割成一个个的split,并将这些split分拆成<key,value>对作为后面用户自定义map函数的输入
             * 其中,每个split文件的大小尽量小于hdfs的文件块大小
             * (默认64M),否则该split会从其它机器获取超过hdfs块大小的剩余部分数据,这样就会产生网络带宽造成计算速度影响
             * 默认使用TextInputFormat类型,即输入数据形式为文本类型数据文件
             */
            System.out.println("Job start!");
            FileInputFormat.addInputPath(job, new Path(otherArgs[0]));
    
            /*
             * 设置输出文件路径 默认使用TextOutputFormat类型,即输出数据形式为文本类型文件,字段间默认以制表符隔开
             */
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    
            /*
             * 开始运行上面的设置和算法
             */
            if (job.waitForCompletion(true)) {
                System.out.println("ok!");
            } else {
                System.out.println("error!");
                System.exit(0);
            }
        }
    }
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /*
     * 用户自定义map函数,对以<key, value>为输入的结果文件进行处理
     * Map过程需要继承org.apache.hadoop.mapreduce包中Mapper类,Mapper类是一个泛型类,带有4个参数(输入的键,输入的值,输出的键,输出的值,并重写其map方法。
     * 通过在map方法中添加两句把key值和value值输出到控制台的代码
     * ,可以发现map方法中value值存储的是文本文件中的一行(以回车符为行结束标记),而key值为该行的首字母相对于文本文件的首地址的偏移量。
     * 然后StringTokenizer类将每一行拆分成为一个个的单词
     * ,并将<word,1>作为map方法的结果输出,其余的工作都交有MapReduce框架处理。 每行数据调用一次 Tokenizer:单词分词器
     */
    public  class TokenizerMapper extends
            Mapper<Object, Text, Text, IntWritable> {
        private final static IntWritable one = new IntWritable(1);
        private Text word = new Text();
    
        /*
         * 重写Mapper类中的map方法
         */
        public void map(Object key, Text value, Context context)
                throws IOException, InterruptedException {
            StringTokenizer itr = new StringTokenizer(value.toString());
            //System.out.println(value.toString());
            while (itr.hasMoreTokens()) {
                word.set(itr.nextToken());// 获取下个字段的值并写入文件
                context.write(word, one);
            }
        }
    }
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    /*
     * 用户自定义reduce函数,如果有多个热度测,则每个reduce处理自己对应的map结果数据
     * Reduce过程需要继承org.apache.hadoop.mapreduce包中Reducer类,Reducer类是一个泛型类,带有4个参数(输入的键,输入的值,输出的键,输出的值)。并重写其reduce方法。
     * Map过程输出<key,values>中key为单个单词,而values是对应单词的计数值所组成的列表,Map的输出就是Reduce的输入,
     * 所以reduce方法只要遍历values并求和,即可得到某个单词的总次数。
     */
    public  class IntSumReducer extends
            Reducer<Text, IntWritable, Text, IntWritable> {
        private IntWritable result = new IntWritable();
        public void reduce(Text key, Iterable<IntWritable> values,
                Context context) throws IOException, InterruptedException {
            int sum = 0;
            for (IntWritable val : values) {
                sum += val.get();
            }
            result.set(sum);
            context.write(key, result);
        }
    }
    

    以上就是这个demo的所有代码了。

    3 运行WordCount.java:

    Paste_Image.png

    运行结果:


    Paste_Image.png

    这是job提交后在hdfs中的文件


    Paste_Image.png

    4 查看运行结果:

    Paste_Image.png

    在DFS Locations中可以看到我们输出的文件,在eclipse中双击打开part-r-00000文件会看到类似以下内容:

    Paste_Image.png

    运行成功。

    运行遇到的一些问题:

    1 用户权限问题

    Exception in thread "main" org.apache.hadoop.security.AccessControlException: 
    Permission denied: user=***, access=EXECUTE, inode="/tmp/hadoop-yarn/staging/***/.staging/job_1468660403917_0003":hadoop:supergroup:drwxrwx---
    --这表示user所指的用户在inode所指定的路径下没有执行文件的权限,解决方式有两种:1 要么在运行这个案例时所在环境的环境变量里添加HADOOP_USER_NAME=“有权限的用户名”。2 要么去hdfs中把这个目录的权限改成任何人都可以执行。
    

    这里我们只是在eclipse上跑demo所以我采用第一种方式,而且只是在eclipse中配置了一个环境变量:

    Paste_Image.png

    这里hadoop这个用户是有权限的。

    2 jar包没有上传导致的Class Not found的异常。这个问题在代码中有解释。

    3 默认配置ip问题。在代码中也有解释。

    结束语:由于参考的连接太多了就不一一列出来了,在文章中关键位置都加了连接。笔记粗糙如有不适请多包涵。

    提示:请根据个人的环境修改上面的IP和端口。所有job的运行记录的状态都可以在对应的web 界面可以看见,对于web 界面上面有描述。

    相关文章

      网友评论

      • 89a1e66423b4:想求教下,yarn 框架下没有mapreduce.jobtracker.address,为什么在mapreduce-site.xml文件配置的时候还要设置这个参数?有一点我很矛盾,在map/reduce location 设置的时候需要填写map/reduce端口号,如果不设置,这边又要写什么呢?因为yarn框架把原本的jobtracker功能分开了,请指导下
      • bc47b6bc1b21:你好mapreduce.jobtracker.address设置的9001 有端口监听吗
      • angelosLei:赞一个 :+1:
      • ad3575b0aad7:不敢说话了 :mask:

      本文标题:hadoop2.*学习笔记三(YARN+ MapReduce+E

      本文链接:https://www.haomeiwen.com/subject/ekqsjttx.html