Hadoop框架基础(四)

作者: Z尽际 | 来源:发表于2017-04-11 11:17 被阅读336次

    ** Hadoop框架基础(四)

    上一节虽然大概了解了一下mapreduce,徒手抓了海胆,不对,徒手写了mapreduce代码,也运行了出来。但是没有做更深入的理解和探讨。

    那么……

    本节目标:

    * 深入了解mapreduce过程

    * 成功部署Hadoop集群

    ** mapreduce原理

    想要了解mapreduce原理,我们必须搞清楚处理数据时的每一个重要阶段,首先,贴上一张官方的图:

    我们依次讨论每一个过程以及该过程对应的作用:

    我先在这里假设一个情景,我现在有一个10G大小的words.txt,里面存放的是N多个英文单词。

    这10G大小的文件分为若干个128M的文件块block分布存储于若干个服务器。

    好,现在我要统计这10G文件中的单词出现频率。

    ** input split

    一个split会对应一个map任务。

    一般来讲,split的个数和block的个数相同,当然也可以多个block属于同一个split,但是后者会产生大量的网络和磁盘IO,原因在于一个split对应一个map任务,一个map任务肯定跑在某一台机器上,如果某个split所包含的多个block分布于不同的机器,首先需要做的操作就是把其他机器的block拷贝到运行map任务的机器上,这会耗费一定时间,所以,默认情况下,一个block对应一个split,源码中设定如下:

    mapreduce.input.fileinputformat.split.minsize == 0

    mapreduce.input.fileinputformat.split.maxsize == 10000

    splitSize=max(minSize,min(maxSize, blockSize)),此为默认split大小

    如果要修改,则如下方式:

    recordSize表示一个记录的大小,分块要保证数据的完整性,所以:

    int blockSize = Integer.parseInt(x); //x表示你希望的split大小

    int splitSize = blockSize / recordSize * recordSize;

    conf.setLong("mapred.max.split.size",splitSize);

    conf.setLong("mapred.min.split.size",splitSize);

    ** map

    此时输入的到map中的数据形式大致为:

    <0, cat one hadoop element...>  ---> 调用一次map

    <30, dog two one hadoop....>  ---> 调用一次map

    ……

    省略号表示后边还有,其中0,30表示的是偏移量,每次从当前split中读取1行数据,比如第一次读取第一行,偏移量为0~29,第二次是第二行数据,偏移量是30~?,以此类推。每次读取都执行一次map任务,并调用一次map方法。map阶段结束,输出结果形式大致为:

    <cat , 1>  <one, 1>  <hadoop, 1>  <element, 1> ……等等


    从此进入shuffle阶段

    ** buffer in memory

    这是一个状态描述,表明此刻在内存中开始操作,buffer在这里是内存中的一个环形数组。

    之所以用环形数组来存放数据,是因为这样可以最大化的利用存储空间。

    这个环形数组中存放数据分为两个类别:

    1、元数据区(Kvmeta):

    里面存放的每组数据都包含:

    ** value的起始位置

    ** key的起始位置

    ** partition值

    ** value的长度

    2、数据区(Kvbuffer):

    里面存放的每组数据都包含:

    ** key值,例如<cat ,1>中的cat

    ** value值,例如<cat, 1>中的1

    注意:

    * 以上两个区域的分界点为0,即0以上存储数据区内容,0以下存储元数据区的内容。

    * 这个环形buffer虽然实际为一个字节数组,但抽象为一个IntBuffer,无论哪个区域中的数据,每组数据中的每个元素都占用4个字节,也就是每组中的每个元素的存储,数组下标都将移动4位(因为一个int为4个字节)。

    * partition

    分区的意义在于把一系列相似的单词分为同一个区。即单词归类处理,这样不同机器上的不同map任务输出的单词可以依据分区递交给相同的reduce做处理。

    注意:

    * 相关类: HashPartitioner

    * 这里的“相似”,指的是:键(此例中为单词)的hash值在某一个范围内

    * sort

    map排序阶段,在buffer中把数据按照partion和key两个关键字做升序排序,这个排序只需要移动“元数据区”中的每组数据顺序即可。排序结果是“元数据区”中的每组数据按照partition分区聚集在一起,同一个partition分区内的key按照字典顺序排序。

    * combine(可选)

    结合阶段,可以在map阶段简化数据输出,减少后边spill溢写过程中,spill溢写文件的大小,例如:可以将<cat, 1> <cat, 1>这样的数据在map阶段合并为<cat, 2>这样的数据作为map输出,默认没有开启。

    * spill

    溢写阶段,当内存中的环形存储结构占用率达到一定程度(默认占用80%时,则开始溢写),则将环形数据区中的所有内容,刷入到当前本地硬盘能够存的下这些数据的目录中,以使内容腾出空间供后边继续使用。

    相同的partition分区的数据写入到同一个文件中,类似:“spill10.out”,“spill11.out”这样的文件,每一个partition分区所产生的文件的存放位置和一些相关信息,存放在另一个“元数据”文件中,类似“spill10.out.index”,“spill11.out.index”(注意,这个元数据文件和刚才说的元数据区不是一码事)。

    这个元数据文件包含:

    ** 起始位置

    ** 原始数据长度

    ** 压缩之后的数据长度

    ** crc32的校验数据

    该文件的作用是:标记某个partition对应的文件在哪个目录,哪个索引中存放。

    注意:

    * spill10.out.index这样的文件不一定会产生,如果内存中放得下(针对这个文件数据的存放,内存只提供1M空间可用),就放在内存中。

    * 内存占用达到80%,开始溢写,那么此时map任务还在进行,还在往内存里添加数据,新的数据的起始点(0点)为剩余空间的中间部分,然后数据区和元数据区分别往两边递增即可,溢写后释放内存后也不必改变什么,继续写入即可。

    ** map merge

    map融合阶段,将溢写阶段产生的多个文件,根据所属分区,把具有相同partition分区的“元数据(从spill10.out.index这样的文件中读取的)”放置于同一个segment列表中,最后根据segment列表,把数据从spill溢写出来的文件一个一个中读取出来,写入到file.out文件中,同时将这一批段的数据索引(元数据分区等)写入到file.out.index文件中,最终生成两个文件,file.out和file.out.index,其中包含了多段数据,每段数据对应一个分区。

    ** compress (可选)

    map压缩阶段,对map merge阶段产生的文件进行压缩处理,以便于在后边的网络传输过程中减少网络IO压力,提升效率。

    至此,map端的shuffle过程结束。

    ** sort merge

    reduce任务会根据分区数据段拉取每个map任务产生的数据,拉取后,因为可能涉及到多个map产生的数据,所以要进行排序,一边copy一边排序,最后把多个map产生的具有相同分区的数据合并为一个分区数据段,这个merge过程和map的merge算法过程一样。

    在此完成shuffle阶段


    ** reduce

    对于本例而言,此时产生的某个分区中的某个单词形式大概如下:

    <cat, [1, 1, 1, 1, 1, 1]>,在调用reduce方法时,进行values各个元素的叠加操作即可。

    ** output

    统计完成后,输出数据到文件目录,文件格式为part-r-00000这样形式的文件,存放于HDFS中。文件中key和value默认的分隔符为:\t

    ** Hadoop集群部署

    之前我们在yarn框架中运行mapreduce任务,或者操作hdfs,其中的各种节点都是运行在一台虚拟机上的,现在我们要将hadoop部署在一个多台虚拟机构成的完全分布式集群中(全部都在一个机器节点上的叫做伪分布式,比如之前的方式)。部署前,我们先勾画一下各个节点的部署结构,如下图所示:

    描述:

    3台机器共有进程:HDFS的datanode,yarn的nodemanager

    其中,HDFS的namenode开在z01这台机器上,secondarynamenode开在z03这台机器上

    YARN的resourcemanager开在z02这台机器上。

    注:SecondaryNameNode是用来协助NameNode整合fsimage和edits的。

    一、准备系统环境

    1、修改主机名

    # vi /etc/hostname

    2、主机名和ip地址的映射

    # vi /etc/hosts,我的机器修改如图,注意,三台机器都要这么设置:

    3、关闭防火墙和selinux

    请跳转至Linux基础04查看相关方法。

    4、创建普通用户

    # useradd 用户名,如果已经有普通用户,则无需再次创建

    # echo 666666 | passwd --stdin 用户名

    5、配置静态IP和DNS

    请参看Linux基础01内容

    6、把后面两个虚拟机的系统启动级别改成“字符模式”(就是没有桌面,这样可以减少虚拟机负担,加速系统启动和运行)

    # cat /etc/inittab,内容如图所示:

    根据文件中的提示,可以使用命令:

    systemctl set-default multi-user.target,来设置无界面启动linux

    systemctl set-default graphical.target,来设置有界面启动linux

    7、卸载服务器JDK

    请参看Linux基础02中的内容

    二、配置NTP时间服务器

    对于我们当前这种案例,主要目标是把z01这台服务器设置为时间服务器,剩下的z02,z03这两台机器同步z01的时间,我们需要这样做的原因是因为,整个集群架构中的时间,要保持一致。

    ** 检查当前系统时区,使用命令:

    # date -R,如图:

    注意这里,如果显示的时区不是+0800,你可以删除localtime文件夹后,再关联一个正确时区的链接过去,命令如下:

    # rm -rf /etc/localtime

    # ln -s /usr/share/zoneinfo/Asia/Shanghai /etc/localtime

    ** 同步时间

    # ntpdate pool.ntp.org

    ** 检查软件包

    查看ntp软件包是否已安装,使用命令:

    # rpm -qa | grep ntp,如图,红框中的内容:

    如果没有红框中的内容,则可以使用命令:

    # yum -y install ntp,来进行安装

    ** 修改ntp配置文件

    # vi /etc/ntp.conf

    去掉下面这行前面的# ,并把网段修改成自己的网段:

    restrict 192.168.122.0 mask 255.255.255.0 nomodify notrap

    注释掉以下几行:

    #server 0.centos.pool.ntp.org

    #server 1.centos.pool.ntp.org

    #server 2.centos.pool.ntp.org

    把下面两行前面的#号去掉,如果没有这两行内容,需要手动添加

    server  127.127.1.0    # local clock

    fudge  127.127.1.0 stratum 10

    最后,如图所示:

    ** 重启ntp服务

    # systemctl start  ntpd.service,注意,如果是centOS7以下的版本,使用命令:service ntpd start

    # systemctl enable ntpd.service,注意,如果是centOS7以下的版本,使用命令:chkconfig ntpd on

    ** z03,z03去同步z01这台时间服务器时间

    首先需要关闭这两台计算机的ntp服务

    # systemctl stop  ntpd.service,centOS7以下,则:service ntpd stop

    # systemctl disable ntpd.service,centOS7以下,则:chkconfig ntpd off

    # systemctl status ntpd,查看ntp服务状态

    # pgrep ntpd,查看ntp服务进程id

    同步第一台服务器z01的时间:

    # ntpdate z01,如图:

    ** 制定计划任务,周期性同步时间

    # crontab -e

    */10 * * * * /usr/sbin/ntpdate z01,如图所示:

    重启定时任务:

    # systemctl restart  crond.service,centOS7以下使用:service crond restart,z03这台机器的配置同理

    三、配置无密钥登录

    配置hadoop集群,首先需要配置集群中的各个主机的ssh无密钥访问

    在z01上,通过如下命令,生成一对公私钥对

    $ ssh-keygen -t rsa,一顿回车操作,这条命令执行完毕后(注意使用普通用户执行该命令),会在/home/z/.ssh/目录下生成两个文件:id_rsa 和 id_rsa.pub,如图所示:

    生成之后呢,把z01生成的公钥拷贝给z01,z02,z03这三台机器,对,没错,包含当前机器。

    $ ssh-copy-id z01

    $ ssh-copy-id z02

    $ ssh-copy-id z03

    完成后,z02机器如图(z03同理):

    以上完成了z01生成私钥,公钥并把公钥拷贝给z01,z02,z03三台机器的过程,z02,z03这两台机器也需要进行如上操作。全部完成后,我们可以在任意一台机器上,无密钥的连接到另外一台机器,比如,我们在z01连接到z02这台机器,使用命令:

    $ ssh z02,如图:

    这样就成功的在z01的机器登录到z02机器了。

    四、安装配置JDK

    使用root用户,在后面两台机器上创建/opt/modules文件夹,并使该文件夹的所属改为普通用户。

    接着便可以使用远程命令scp,把已经在z01中安装好的jdk目录拷贝给另外两台机器。

    $ scp -r /opt/modules/jdk1.7.0_67/ z02:/opt/modules/

    $ scp -r /opt/modules/jdk1.7.0_67/ z03:/opt/modules/

    注意中间有空格分开。配置完成后,记得去z02,z03修改/etc/profile环境变量

    五、安装配置Hadoop

    ** 首先,需要先删除z01中的/opt/modules/hadoop-2.5.0/data目录,执行命令:

    $ rm -rf /opt/modules/hadoop-2.5.0/data

    ** 在如下文件中,修改JAVA_HOME

    hadoop-env.sh  yarn-env.sh  mapred-env.sh

    export JAVA_HOME=/opt/modules/jdk1.8.0_121

    ** 修改HDFS默认地址、HDFS临时存储路径

    涉及文件:core-site.xml

    fs.defaultFS:hdfs://z01:8020

    hadoop.tmp.dir:/opt/modules/hadoop-2.5.0/data

    如图:

    ** 声明哪些服务器是datanode

    涉及文件:slaves

    z01

    z02

    z03

    如图:

    ** 修改数据存放的副本数,SecondaryNameNode节点地址

    涉及文件:hdfs-site.xml

    dfs.replication:3

    dfs.namenode.secondary.http-address:z03:50090

    dfs.namenode.http-address:z01:50070

    dfs.permissions.enabled:false

    如图:

    **resourcemanager节点配置,以及一些其他配置

    涉及文件:yarn-site.xml

    yarn.resourcemanager.hostname:z02

    yarn.nodemanager.aux-services:mapreduce_shuffle

    yarn.log-aggregation-enable:true

    yarn.log-aggregation.retain-seconds:86400

    如图:

    ** jobhistory服务以及其他设置

    涉及文件:mapred-site.xml

    mapreduce.framework.name:yarn

    mapreduce.jobhistory.address:z01:10020

    mapreduce.jobhistory.webapp.address:z01:19888

    如图:

    ** 配置好后,拷贝hadoop安装目录给其他服务器

    $ rm -rf /opt/modules/hadoop-2.5.0/share/doc/,删除该文档目录,以减少远程拷贝的体积

    $ scp -r /opt/modules/hadoop-2.5.0/ z02:/opt/modules/

    $ scp -r/opt/modules/ hadoop-2.5.0/ z03:/opt/modules/

    全部搞定后,接下来我们就可以启动这个分布式系统了

    六、启动Hadoop

    ** 在z01需要先格式化hdfs的namenode:

    $ bin/hdfs namenode -format

    ** 使用start的脚本启动集群中所有的hdfs服务,包含namenode和datanode节点

    $ sbin/start-dfs.sh

    ** 在z02中启动yarn服务,包含resourcemanager和nodemanager,注意,如果resourcemanger和namenode服务不在同一台机器上,那么启动resourcemanager服务必须在所在的机器启动,这里参看我们之前设定的集群配置图,所以需要在z02机器上执行如下命令:

    $ sbin/start-yarn.sh

    启动完成后,分别查看z01,z02,z03机器的jps,如下图:

    z01:

    z02:

    z03:

    在对比一下之前的集群配置图,是符合我们的期望的。

    ** 总结

    本节主要深入讨论mapreduce的运算原理及过程,以及如何配置一个hadoop完全分布式集群。


    IT全栈公众号:

    QQ大数据技术交流群(广告勿入):476966007


    下一节:Hadoop框架基础(五)

    相关文章

      网友评论

        本文标题:Hadoop框架基础(四)

        本文链接:https://www.haomeiwen.com/subject/sxeggttx.html