一、背景
在大数据的场景下,单台物理机无法满足数据的存储和运算的需求,所以我们需要有一个分布式集群来做数据的存储和运算。先讨论下使用分布式集群会面临的问题:
-
1、网络通信:分布式集群中多台机器之间需要通过网络进行通信,因此需要在不可靠的网络上保障集群的可靠性,这引入了网络编程的复杂性。
-
2、高可用:用集群来存储文件,这样的文件系统称为分布式文件系统(Distributed FileSystem),部分节点故障不能影响集群的使用。
-
3、集群管理:
1)元数据管理:需要有一个单独的地方来存储分布式文件系统的元数据,比如文件树的文件目录结构信息,这些元数据不应该因为部分存储数据的节点故障而丢失,数据的存储跟元数据的存储应该分离。
2)节点管理:集群中的部分节点故障要能及时发现。
3)数据安全:同一份数据应该在多个节点上备份,否则单点存储可能会因为单点故障而永远丢失数据。 -
4、成本问题:为了保证节点可靠,减少故障发生概率,可以采购高可靠的硬件,但是高可靠硬件成本比较高,集群中成千上万的节点都用成本太高。
二、HDFS架构
我们从上述的背景需求出发,一步步推导出分布式文件系统应该具备的架构,最终讨论HDFS的架构实现。
1、推演
(1)满足存储需求
首先,客户端要向服务端上传数据存储,在大数据场景下存储的数据超过单台物理机器的磁盘空间是很常见的事,单台服务器存不下就需要有多台服务器组成的集群来存储,并且服务端磁盘要保证有足够存储空间,并且能够水平扩展,这样才能满足海量数据存储的要求。
现在假设客户端应用要上传1000G的数据到服务端存储,而服务端每台服务器只有400G磁盘,则至少需要3台服务器的存储空间。
综上所述,目前的架构图如下:
(2)集群通信管理
解决了海量数据存储的问题,接着需要对集群内部进行治理,对一个分布式集群来说,最经典最常用的架构即主-从架构,主节点管理维护整个集群的状态,接收客户端请求,与集群节点通信并给从节点分发要执行的任务。
这样一来,分布式文件系统中的文件目录结构信息作为元数据就理所当然地保存在主节点上,对目录信息,全部保存在主节点上;对文件,主节点应该存储该文件保存在哪个从节点上,什么时候创建,创建用户和组是什么,文件的读写执行权限是什么,这些构成了文件的元数据。
对于主节点,要能够及时发现从节点的故障,以便做出决策(比如客户端上传文件,不把数据保存在故障从节点中),所以从节点应该定期向主节点发送心跳信息。
如果数据只在一个从节点上保存一份,万一这个从节点故障,可能会永远丢失数据,所以同份数据应该在多个从节点上冗余备份,假设文件a.txt在node1、node2、node3存储了3个副本,那么即使这3个node有部分发生故障,也不影响客户端对数据的读取。当然如果3个node都同时故障了,那么数据依然永远丢失了,而这发生的可能性很低。
综上所述,架构图演化为:
(3)响应客户端请求
对客户端,应该先请求主节点,再由主节点通知客户端把你的请求放在哪个从节点上执行。对DFS,客户端请求通常是上传和下载数据,假如是上传数据,则应该由主节点先创建文件的元信息(保存在哪个从节点、客户端操作的用户组是什么、文件读写权限),分配要将副本存储在哪些从节点,再将这些信息返回客户端,客户端再跟从节点通信,将数据上传到各个从节点上。
如果客户端因为有多少个副本就上传多少次数据给集群,会严重消耗网络带宽,既然第一次上传的时候保存副本的 slave 已经有了一份数据,其他要保存副本的 slave 何不直接从该 slave 上拷贝?如果副本数设置较大,那其他 slave 都从第一个 slave 拷贝数据会影响第一个 slave 的性能,所以比较好的做法是水平复制,即从 slave1 复制到 slave2,从 slave2 拷贝到 slave3,以此类推,如下所示:
2、HDFS体系架构的组成和概念
到这里为止,已经非常接近HDFS的架构了,HDFS的架构图如下:
下面继续详细讨论HDFS架构中的各个部分。
(1)NameNode
HDFS集群属于主从架构,集群中有两类节点:NameNode、DataNode。
NameNode是集群的主节点,也叫管理节点。维护文件系统树及整颗树内所有的文件和目录,每个数据块的数量、副本和其他细节等元信息都存储在NameNode中。同时,一般NameNode所在服务器上一般还运行一个Yarn 的 ResourceManager 的进程,用来对客户端的请求生成作业并调度MapReduce的执行,因为元数据在NameNode上,NameNode 进程和 ResourceManager 进行在同一台机器上,可避免元数据在网络上的传输开销,提高效率。
NameNode 的职责汇总如下:
① 管理文件系统命名空间。
HDFS所有目录的元数据都保存在NameNode上。
② 接受客户端上传下载文件请求,规范客户端文件访问权限。
③ 执行文件系统相关的操作。
如关闭、打开目录,以及给文件/目录命名。
④ 维护和管理DataNode。
所有DataNode都会给NameNode定时发送心跳和块信息报告。心跳用来检测DataNode是否处于正常状态,块信息是DataNode上所有块信息的列表。
⑤ 处理所有块的副本因子。
DataNode在存储数据到数据块中时,会计算和存储数据的校验和到数据块中。当客户端读取数据时,会计算数据块的校验和并和数据块中存储的校验和进行比较,若不一致,则客户端会向NameNode汇报损坏的数据块;DataNode本身也会有一个后台线程定期验证存储在datanode上的所有数据块,若发现有数据损坏,也会向namenode汇报。
对客户端,namenode会找到损坏数据块的在其他datanode上无损副本,告诉客户端将请求发给新的datanode处理。然后namenode后台从无损副本datanode复制数据块到当前datanode中,或者复制到另一个节点上,从而满足数据块副本数的要求。
NameNode是整个集群的中心,一旦发生故障,整个集群都不可用,因此最好把它部署在可靠的硬件机器上!!!
(2)DataNode
datanode是集群的从节点,也叫工作节点。是实际存储数据的节点,需要根据namenode和客户端指令存储和检索数据块。
DataNode的职责汇总如下:
① 根据 namenode 的指令创建、删除和复制数据库。
② datanode 管理系统的数据存储。
③ datanode 定期给 namenode 发送心跳告知自身的健康状态,默认3秒发送一次心跳。
若心跳发送超时,namenode 会认为该 datanode 发生故障,先将其从可用节点池中移除,并复制节点上的数据块的其他副本到新的数据节点,维持副本数不变。
④ datanode 的后台线程定期给 namenode 发送所有数据块的健康状况。
PS.设置数据块冗余度的一般原则:冗余度跟数据节点个数一致,最大不超过3。
(3)数据块
磁盘有默认的数据块大小(一般为512字节),文件系统也有文件系统块的大小(一般为几千字节),它们分别是磁盘读写和文件读写的最小单位。HDFS中同样有数据块(block)的概念,默认大小为128M(hadoop 2.x,hadoop1.x是64M),它是MapReduce作业中Map任务数据处理的单位。
文件系统块基于磁盘块,同样HDFS数据块也基于文件块,但有一点不同的是,HDFS的块是一个逻辑概念,假如数据不满128M,还是HDFS会分配一个数据块,但是在磁盘中不会实际占用128M的存储空间,如下所示:
假如需要把一个 518MB 的文本文件 Example.txt 存储到 HDFS,在块大小默认情况下,HDFS 将会创建 5 个数据块,前面4个数据块大小将是 128MB,最后一个是 6MB,而不是 128MB。这样会节省不少存储空间。
为什么HDFS中数据块的大小远比磁盘块和文件系统块大,达到128M?
1)最小化寻址开销。
传输数据前需要先定位数据块位置,同样大小的数据,如果数据块比较小,就需要用更多的块来存储,就产生更多的寻址开销,大文件块有利于提高磁盘传输效率。
2)减少元数据的管理维护开销。
HDFS 存储的数据集一般比较大,数据量级一般是 TB 级别或者 PB 级别的。如果像 Linux 系统那样每个块只有 4KB。那么 HDFS 将会存储非常多的数据块,这将导致元数据暴增,NameNode 管理维护这些元数据将非常吃力。且很快会成为集群性能的瓶颈。
既然HDFS大数据块有这么多好处,能不能设置得更大?
随着新一代磁盘驱动器传输速率的提升,块的大小会被设置得更大。但是也不宜过大,而Map任务通常一次只处理一个块中的任务,处理相同的数据集,如果数据块过大,划分成的块数量就比较少,会导致任务数太少(少于集群中的节点数量),作业的运行速度就会比较慢。
使用HDFS数据块的好处:
① 存储大文件。
一个文件的大小可以大于网络中任意一个磁盘的容量。
② 方便管理,简化存储子系统设计。
磁盘很容易计算出能存储多少个数据块。
③ 简化 DataNode 存储机制。
所有块的元数据都是在 Namenode 维护的。Datanode 不需要关心块的元数据,比如文件权限,存储位置等。
④ 容错性和高可用。
块非常适合数据备份,块被复制到物理独立的不同机器上,保证某个块、磁盘、机器发生故障后数据不会丢失。
可以通过以下命令检查HDFS中数据块的详情:
hdfs fsck / -files -blocks
假设现在HDFS文件系统的文件目录结构如下所示:
使用检查命令查询文件块信息:
如上图所示,HDFS的文件系统中存在两个目录(/
、/HadoopGuide
),两个文件(/HadoopGuide/hadoop-2.7.3.tar
、/HadoopGuide/hadoop-2.7.3.tar.gz
),hadoop-2.7.3.tar 文件的大小为 323.56 MB,所以需要用3个blocks,前两个block大小为128M(128 x 1024 x 1024 = 134217728 byte),第三个block大小为67M左右,而 hadoop-2.7.3.tar.gz 文件的大小为 204.17 MB,只需要两个block。
下面还列出数据块的完整状态信息。
(4)机架感知
在HDFS集群中,物理机器部署在数据中心内部的很多个机架上,每个机架上多有多个机器,如果为了实现双活灾备,机器还可能部署在不同的数据中心,按照网络带宽的开销,下面的场景依次递减:
- 同一节点上的进程
- 同一机架上的不同节点
- 同一数据中心中不同机架上的节点
- 不同数据中心中的节点
例如,假设与数据中心 d1 机架 r1 中的节点 n1。该节点可以表示为 /d1/r1/n1
,利用这种标记,这里给出四种距离描述:
- distance(/d1/r1/n1, /d1/r1/n1) = 0(同一节点上的进程)
- distance(/d1/r1/n1, /d1/r1/n2) = 2(同一机架上的不同节点)
- distance(/d1/r1/n1, /d1/r2/n3) = 4(同一数据中心不同机架上的节点)
- distance(/d1/r1/n1, /d2/r3/n4) = 6(不同数据中心中的节点)
数据块的多个副本不能保存在同个节点上,否则节点故障一锅端,也没有意义,如果全部放在不同机架,则会增加网络传输的开销。为了兼顾效率和安全,部分副本放到同一机架上,部分副本放到不同机架上,如果为了不同数据中心的灾备,还得放到不同数据中心的不同机架上(不过一般不这么做,开销太大)。
HDFS支持机架感知策略,确保将一个数据块复制到不同机架来实现容错的目的,这样,如果网络被关闭或者整个机架下架,仍然能够对数据进行访问。
Namenode会选择给同一个机架或者最近的机架上的Datanode发送读写请求。NameNode通过维护每个DataNode的机架id来获取机架信息。HDFS机架感知就是基于机架信息选择Datanode的过程。
在HDFS架构里面,Namenode需要确保所有的块副本不能存储在同一个机架上。它根据机架感知算法减少延迟时间和容错。我们知道默认副本因子是3,根据机架感知算法,第一个块副本将存储在本地机架,第二个副本存储在同一个机架的不同节点,第三个将存储在另一个机架上。
机架感知的好处:
1)提升数据的高可用和可靠性。
2)改善集群性能。
3)改善网络带宽。
(5)元数据
对于HDFS的元数据来说,创建之后会持久化保存到磁盘,并且后续会有非常多的读操作,如果每次读取元数据时都需要从磁盘中加载数据,就会产生磁盘IO的开销,从而降低响应速度,并且也会影响 NameNode 的性能。所以元数据必须被缓存在内存中,每次客户端请求直接从内存中读取,速度就非常快!
如果发生了元数据的写,就暂时先写到内存中,并且将这笔操作记录在日志中,然后有个后台进程负责定时地将日志中的操作更新到元数据磁盘文件中。这里非常类似 MySQL 的内存和日志,虽然写日志也会发生磁盘IO,但因为是追加写,而元数据是随机IO,前者速度远大于后者,因此同样会发生磁盘IO,但是前者更快。
在HDFS中,元数据包含了上面说的日志,NameNode 中元数据主要是两个文件:一个是FSImage,一个是EditLog。
在HDFS中,元数据所在的目录是由core-site.xml(位于Hadoop安装目录下的etc/hadoop/core-site.xml)中的 hadoop.tmp.dir
属性决定的,如下图所示:
FsImage
FsImage是一个“镜像文件”。它保存了最新的元数据检查点,包含了整个HDFS文件系统的所有目录和文件的信息。对于文件来说包括了数据块描述信息、修改时间、访问时间等;对于目录来说包括修改时间、访问权限控制信息(目录所属用户,所在组)等。
EditLog
EditLog主要是在NameNode已经启动情况下对HDFS进行的各种更新操作进行记录,HDFS客户端执行所有的写操作都会被记录到editlog中。
进入数据目录,可以看到有 data、name、namesecondary 三个子目录,分别对应 datanode、namenode、secondarynamenode,name目录下的current子目录有 edits、fsimage 文件。
edits 和 fsimage 都是二进制文件,无法直接查看,需要使用专用的命令来转化为 xml 等格式。edits_inprogress_0000000000000000038
记录了当前线程的最新操作。
可以使用如下命令查看 edits 文件,格式如下:
hdfs oev -i [editsfile] -o [outputfile]
// oev: e是edits的意思
// -i: input
// -o: output
执行一个 hdfs dfs -mkdir /tools
命令,并执行上述命令转换为 xml 文件,可以看到记录下来的操作信息。
可以使用如下命令查看 fsimage 文件,格式如下:
hdfs oiv -i [fsimagefile] -o [outputfile] -p XML
执行一个文件上传命令
并执行上述命令将编号最大 fsimage 文件转换为 xml 格式,如果没看到上传的文件的元信息,是因为 SecondaryNameNode 还没到合并镜像和日志文件的时机,将dfs关闭再重启,会自动更新 fsimage,更新后编号最大的 fsimage 文件为 048,执行命令生成 xml 文件,并查看:
[图片上传失败...(image-a8d9f8-1645950703478)]
将 xml 格式化一下:
// 其他信息
<inode>
<id>16390</id>
<type>FILE</type>
<name>hadoop-2.7.3.tar.gz</name>
<replication>1</replication>
<mtime>1645740258249</mtime>
<atime>1645740248487</atime>
<perferredBlockSize>134217728</perferredBlockSize>
<permission>root:supergroup:rw-r--r--</permission>
<blocks>
<block>
<id>1073741830</id>
<genstamp>1006</genstamp>
<numBytes>134217728</numBytes>
</block>
<block>
<id>1073741831</id>
<genstamp>1007</genstamp>
<numBytes>79874467</numBytes>
</block>
</blocks>
</inode>
</INodeSection>
// 其他信息
<block> 标签下的信息中,<id> 为 block id,<genstamp> 为生成该image时的时间戳信息,这个时间戳不是真正意义上的时间戳,可以看成是一个创建数据块的顺序编号。
通过数据块检查命令,可以看到上传的数据块信息,blk_1073741830_1006 和 blk_1073741831_1007,跟查看xml的信息刚好吻合。
具体的数据块数据在 datanode 端,会保存在目录 /root/training/hadoop-2.7.3/tmp/dfs/data/current/BP-2119504323-192.168.153.111-1644881216510/current/finalized/subdir0/subdir0 下,查看目录下的文件:
不带 meta 的是数据文件,单位字节,换算一下可知,blk_1073741830 大小为128M,blk_1073741831 大小约为 76M,加起来正好是上传的文件大小 204M,而 meta 文件是响应数据块的元信息,大小约为 1M,由此可知 datanode 也会保存数据块的元信息。
(6)SecondaryNameNode
SecondaryNameNode 就是上述所说的后台负责合并的进程,是一个辅助的 NameNode,不能代替 NameNode,主要作用是用于合并 FsImage 和 editlog 文件。SecondaryNameNode 会将 NameNode 元数据目录下的文件复制到自己的目录,执行合并后再发给 NameNode,NameNode 校验文件后替换掉旧的 FsImage 文件,并创建一个新的 editlog 文件。
两者的工作机制如下图所示:
第一阶段:NameNode 启动
1、创建 NameNode时,会在
hadoop.tmp.dir
参数设置的目录下,执行 hdfs namenode -format 命令进行格式化,创建 fsimage 和 editlog 文件;若已经格式化过,则加载 fsimage、editslog 文件到内存中;
2、客户端对 NameNode 发出请求,触发对元数据的增删改的操作;
3、NameNode 记录操作日志,更新滚动日志;
4、NameNode 对内存中的元数据进行增删改。
【下面的操作对 NameNode中第一个白框描述的日志滚动作出了演示】
在 NameNode 被格式化创建时,会创建一个 fsimage_0000000000000000000
和 一个 edits_inprogress_0000000000000000001
文件。
这里的序号是有含义的,对 hdfs 元数据进行增删改时,每个操作都会对应一个 txid,从 0000000000000000001 开始计算,每次递增, edits_inprogress_0000000000000000001
文件实时记录 hdfs 最近一次启动以来的所有客户端请求触发的对元数据的增删改,并且 hdfs 最近一次启动以来的第一个操作id是 0000000000000000001,并且会记录之后的所有元数据写操作。
接着执行 hdfs dfs -mkdir /data
命令,触发一个元数据写,生成 xml 格式的元数据文件。
可以看到,id 为1的是一个操作码为 OP_START_LOG_SEGMENT
的操作,是每次生成一个新的edits日志文件都会自动做的一个操作,所以上述命令的id为2。
接着关闭 dfs 服务并重启,可以发生 inprogress 文件滚动日志生成了一个新的 edits_0000000000000000001-0000000000000000002
文件,表示该文件中记录了第1~2个对元数据的操作日志,并且原来的 edits_inprogress_0000000000000000001
文件不见了,实际上这只是一个文件重命名操作,生成一个新的 edits_inprogress_0000000000000000003
文件。
接着连着执行3个触发元数据写的目录创建命令(这里特别注意 inprogress 的最后修改时间,因为命令执行,有数据写入,所以由上面的 00:16 变成了 00:19)
再次关闭重启 dfs 服务
可以看到生成了一个新的 inprogress 文件,刚才执行了3个写操作,加上日志文件创建的默认操作,一共四个,序号为 3~6,因此滚动后的日志名为 edits_0000000000000000003-0000000000000000006
,而新的 inprogress 文件序号从 007 开始。
第二阶段:SecondaryNameNode 工作
1、Secondary NameNode询问 NameNode 是否需要checkpoint,直接带回NameNode是否检查结果;
2、当触发合并时机,Secondary NameNode请求执行CheckPoint;
3、NameNode滚动正在写的Edits日志;
4、将滚动后的编辑日志和镜像文件拷贝到Secondary NameNode;
5、加载编辑日志和镜像文件到内存,并合并;
6、生成新的镜像文件 fsimage.chkpoint;
7、拷贝 fsimage.chkpoint 到 NameNode;
8、NameNode 将 fsimage.chkpoint 重新命名成 fsimage。
SecondrayNameNode 的 checkpoint 可以理解为触发镜像和编辑日志文件合并的时机,一般是每隔1小时合并一次,或者操作次数达到100万次时合并一次,对应的参数配置(在 hdfs-site.xml 中配置)如下:
<configuration>
<property>
<name>dfs.namenode.checkpoint.period</name>
<value>3600</value>
</property >
<property>
<name>dfs.namenode.checkpoint.txns</name>
<value>1000000</value>
<description>操作动作次数</description>
</property>
<property>
<name>dfs.namenode.checkpoint.check.period</name>
<value>60</value>
<description> 1分钟检查一次操作次数</description>
</property >
</configuration>
还有其他相关的参数,可以查阅hadoop官方文档进行设置。
三、HDFS的设计
HDFS脱胎于GFS(Google File System),有非常优秀的设计理念和特性,汇总如下:
1、超大文件,海量数据存储,支持水平扩展
简单地说,就是单挑不行(数据量超过单台机器的存储极限),就群殴,并且支持水平扩展,粗暴地说,就是如果人不够,继续叫兄弟
2、流式数据访问
简单的说,就是数据来多少处理多少,比如用迅雷下载电影可以一边下一边看,就是流式数据访问,而不是等一整部电影都下载好了再看,很明显前者效率高很多。
3、高可用
对于一个拥有成千上万机器的集群来说,出现一些节点故障的可能性是很高的,HDFS并不需要使用昂贵的高可靠硬件,因为它自身使用分块和备份机制实现了高可用。在讨论分块和备份之前,先考虑这么一个文件:
往HDFS里存储一个大文件,是将它整体保存在一个有足够空间的节点好,还是先将其分成一块块文件块,再保存在不同的节点上好?
答案是分成一块块好,可以充分利用集群性能,加快数据处理,比如有一个640M的文件,如果分成数据块存储(HDFS中每个数据块大小为128M),假设每块都不在同一个集群数据节点上,那么存储和下载时5个集群数据节点可以并行,有利于提高数据处理的吞吐量。
如果一个数据块只保存在一个DataNode上,若这个DataNode故障,依然无法读取该数据块,需要有一个冗余备份机制,简单的说就是将同一数据块在多个的DataNode上各保存一份,份数称为副本数。多个节点同时故障的概率非常低,只要有一个节点正常,则该数据块不会丢失无法读取,可靠性大大提高,当然需要付出存储空间的代价。
4、与分布式运算结合
移动计算比移动数据更高效,假设要对上面存储的640M文件进行运算,文件是行式存储的数据,要求统计出某个列的最大值。
- 移动数据的处理:将5个block下载到一台机器上再执行运算。
- 移动计算的处理:在每个block所在的机器上,计算出该block的最大值,再汇总到某台机器上比较得到最大值。
很明显,移动计算的效率更高,不仅体现在计算性能上的提升,还节省了网络数据的传输的开销(在数据中心,网络带宽是非常宝贵的资源)。
5、简单计算模型
只允许写,不支持在文件的任意位置进行增删改,适合一次写入、多次读取的数据,不适合需要频繁增删改的数据。
四、高可用
1、实现HA的HDFS
NameNode 存储HDFS元数据,并且响应客户端请求,一旦发生故障,整个集群都无法正常工作,为了防止单点故障,需要实现 NameNode 的高可用,这通过 zookeeper 来实现。
zookeeper 监控所有 NameNode 的状态,所有 NameNode 定时向 zookeeper 发送心跳,其中正充当整个集群 master 的 NameNode 的状态为 Active,其余备用 NameNode 的状态为 Standby。
当活跃状态的 NameNode 发生故障而不再发送心跳时,zookeeper 认为该节点已宕掉,从其余 NameNode 中选举出一个新的 master,被选中的 NameNode 状态转换为 Active,接收处理客户端请求。
HDFS 的主节点状态信息由谁维护?
每个NameNode上都会运行一个FailOverController,它负责跟zookeeper通讯,定时发送心跳信息,当活跃的主节点挂掉时,zk就会通知其他NameNode参与选举,选举出的NameNode称为新的主节点,所有DataNode都会向它发送周期性通讯(心跳信息、汇报数据块健康状况)。
HA架构下的元数据由谁维护?
这里需要引入高可用共享存储,可以选择NFS过滤器或群体日志管理器QJM(Quorum Journal Manager)。
QJM是一个专用的HDFS实现,为提供一个高可用的编辑日志而设计,被推荐用于大多数HDFS的部署中。
QJM以一组日志节点(Journal Node)的形式运行,每一次编辑必须写入多数日志节点。典型的,有三个JournalNode系统能忍受其中任何一个的丢失,跟zk类似,但是并不依赖zk。
每次活动 NameNode 发生对元数据的写操作时,都会写到 JournalNode 中,备用 NameNode 则从 JournalNode 同步数据并加载到内存中,这保证了活跃 NameNode 故障时,备用节点能快速地实现任务的接管。
2、联邦HDFS
对于一个HDFS集群,NameNode 是单点,即使实现了高可用,但它的存储空间和处理能力终究是有限的,对于每个文件或目录的元数据,大概占用150字节,10亿个文件目录需要几百G的磁盘和内存,超出单台 NameNode 的负载能力,内存称为限制系统横向扩展的瓶颈。
因此需要有多台 NameNode 来支撑,允许添加 NameNode 实现扩展,这就是联邦HDFS的能力。
每个 NameNode 管理文件系统命名空间中的一部分,比如 namenode1 管理 /movies
目录下的所有文件,namenode2 管理 /mp3
目录下的所有文件。
- 1、HDFS联邦使用了多个独立的NameNode/namespace 来使得 HDFS 的命名服务能够水平扩展。
每个 namenode 维护一个命名空间卷(namespace ),由命名空间的元数据和一个数据块池(block pool)组成,数据池块包含了该命名空间下文件的所有数据块。
-
2、多个NameNode 之间是联邦的关系,它们之间互相独立且不需要相互协调。
-
3、datanode 被所有 namenode 当做公共存储的地方。
每个 datanode 都要向所有 namenode 注册,并且存储着来自多个数据块池中的数据块。
五、HDFS的环境搭建
1、本地模式
本地模式的特点如下:
(1)环境安装
① hadoop 环境对 Java 有依赖,因此需要先安装 JDK。
解压安装包
tar -zxvf jdk-8u181-linux-x64.tar.gz -C ~/training/
设置环境变量(~/.bash_profile)
JAVA_HOME=/root/training/jdk1.8.0_181
export JAVA_HOME
PATH=$JAVA_HOME/bin:$PATH
export PATH
重载配置
source ~/.bash_profile
验证是否生效
java -version
② 关闭防火墙
systemctl stop firewalld.service
systemctl disable firewalld.service
③ 安装 hadoop
解压安装包
tar -zxvf hadoop-2.7.3.tar.gz -C ~/training/
设置环境变量(~/.bash_profile)
HADOOP_HOME=/root/training/hadoop-2.7.3
export HADOOP_HOME
PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
export PATH
重载配置文件生效
source ~/.bash_profile
验证配置是否生效
echo $HADOOP_HOME
echo $PATH
④ 安装 tree 工具查看安装目录层次
rpm -ivh tree-1.6.0-10.el7.x86_64.rpm
tree -d -L 3 hadoop-2.7.3/
# -d:只查看目录
# -L 3:查看深度为3
bin/sbin: 二进制命令脚本
etc:配置文件
include:用c++写的库
lib:本地调用库
libexec:调用本地库的可执行脚本
share:文档和jar包依赖库
(2)hadoop配置
修改 hadoop-env.sh (目录:$HADOOP_HOME/etc/hadoop
)
将 export JAVA_HOME=${JAVA_HOME}
修改成 export JAVA_HOME=/root/training/jdk1.8.0_181
(3)程序测试
在 /root/temp
目录下创建一个 data.txt 文件,内容如下:
I love Beijing
I love China
Beijing is the capital of China
用自带的测试程序测试,这个程序的效果是对输入数据进行分词,并统计每个单词出现的次数
cd $HADOOP_HOME/share/hadoop/mapreduce
hadoop jar hadoop-mapreduce-examples-2.7.3.jar wordcount /root/temp /root/output/wc
# /root/temp:程序执行的输入数据目录,这里会扫描该目录下的所有文件的内容作为输入
# /root/output/wc:程序执行输出结果的目录,不必事先提前创建
输出显示程序执行结果:
cat /root/output/wc/part-r-00000
2、伪分布模式
伪分布模式,即在单机上运行 Hadoop,具备 hadoop 的所有功能,包括 HDFS 和 Yarn,多用于开发和测试。
(1)环境配置
需要修改的配置文件均位于 $HADOOP_HOME/etc/hadoop
① hdfs-site.xml
<configuration>
<!--数据块的冗余度,默认是3-->
<!--一般来说,数据块冗余度跟数据节点的个数一致,最大不超过3-->
<property>
<name>dfs.replication</name>
<value>1</value>
</property>
<!--禁用了HDFS的权限检查-->
<!--暂时先不配置-->
<!--property>
<name>dfs.permissions</name>
<value>false</value>
</property-->
</configuration>
② core-site.xml
<configuration>
<!--9000是RPC通信的端口,默认端口是8082,可配置修改
主机可用host也可用ip
-->
<property>
<name>fs.defaultFS</name>
<value>hdfs://bigdata114:9000</value>
</property>
<!--HDFS对应的操作系统目录-->
<!--默认值是Linux的tmp目录-->
<property>
<name>hadoop.tmp.dir</name>
<value>/root/training/hadoop-2.7.3/tmp</value>
</property>
</configuration>
③ mapred-site.xml
该文件不存在,需要自行创建,从 mapred-site.xml.template 拷贝即可。
<configuration>
<!--配置MapReduce运行的框架是Yarn-->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
④ yarn-site.xml
<configuration>
<!--配置ResourceManager的地址-->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>bigdata111</value>
</property>
<!--MapReduce运行的方式是洗牌-->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
(2)初始化生成 namenode
① 创建初始化目目录
即在 /root/training/hadoop-2.7.3/etc/hadoop/core-site.xml
中配置的hadoop.tmp.dir
参数的值 /root/training/hadoop-2.7.3/tmp
② 格式化生成NameNode
hdfs namenode -format
③ 启动Hadoop
start-all.sh
start-all.sh 位于/root/training/hadoop-2.7.3/sbin,因为前面已经配置了该目录在环境变量HADOOP_HOME中,所以可以直接执行,无需先切换到该目录下。
查看后台进程
成功启动后,在 /root/training/hadoop-2.7.3/tmp/dfs 目录下可以看到生成了name、namesecondary的子目录
(3)执行MapReduce任务
创建hdfs目录作为数据输入目录
hdfs dfs -mkdir /input
上传数据到hdfs目录
hdfs dfs -put ~/temp/data.txt /input
切换到jar目录下执行程序
hadoop jar hadoop-mapreduce-examples-2.7.3.jar wordcount /input /output/wc
PS.注意虽然跟在本地执行程序的命令相同,但是这里的 /input 不是linux文件系统下的文件目录,而是hdfs文件系统中的目录,输出目录也一样。
程序执行完,可以查看输出目录的数据
hdfs dfs -cat /output/wc/part-r-00000
打开yarn的控制台可以看到任务执行状态:http://192.168.153.114:8088/cluster
打开dfs控制台可以看到hdfs的节点信息和文件系统:http://192.168.153.114:50070/
可以点击下载文件
(4)停止hadoop
stop-all.sh
是对启动Hadoop的一个逆过程,同样需要输入多次密码
(5)配置免密登录
为了避免每次启动和停止Hadoop都要输入多次密码的麻烦,需要配置好免密登录,步骤如下:
① 生成ssh密钥对
ssh-keygen -t rsa
生成的密钥对在 /root/.ssh/ 目录下,id_rsa为私钥,id_rsa.pub为公钥
② 拷贝公钥到需要开启免密登录的主机
ssh-copy-id -i root@bigdata114
③ 测试是否可以免密登录(无需输入密码即为成功)
ssh bigdata114
④ 启动Hadoop再验证下是否需要输入密码
start-all.sh
没有再次出现需要输入密码的步骤
3、全分布模式
使用三台主机,分别是 192.168.153.111、192.168.153.112、192.168.153.113,主机名分别为 bigdata111、bigdata112、bigdata113。
(1)关闭防火墙
systemctl stop firewalld.service
systemctl disable firewalld.service
(2)设置主机名
vi /etc/hosts
配置集群中所有主机的hosts,方便集群通信
192.168.153.111 bigdata111
192.168.153.112 bigdata112
192.168.153.113 bigdata113
(3)配置免密登录
ssh-keygen -t rsa
ssh-copy-id -i ~/.ssh/id_rsa.pub root@bigdata111
ssh-copy-id -i ~/.ssh/id_rsa.pub root@bigdata112
ssh-copy-id -i ~/.ssh/id_rsa.pub root@bigdata113
验证
ssh root@bigdata111
ssh root@bigdata112
ssh root@bigdata113
(4)安装JDK
参考本地模式,可以只在111机器上安装,然后拷贝到112、113即可。
(5)同步时间
在三台主机同时执行:data -s 具体时间
(6)安装Hadoop
现在有三台主机,分别是:
192.168.153.111(bigdata111)
192.168.153.112(bigdata112)
192.168.153.113(bigdata113)
现在将bigdata111设定为集群中的主节点,在上面安装hadoop环境:
PS.未特别说明只在bigdata111上执行!!!
① 解压安装hadoop
tar -zxvf hadoop-2.7.3.tar.gz -C ~/training/
② 配置环境变量(三台主机都要设置)
HADOOP_HOME=/root/training/hadoop-2.7.3
export HADOOP_HOME
PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
export PATH
重载配置文件生效
source ~/.bash_profile
验证配置是否生效
echo $HADOOP_HOME
echo $PATH
③ 修改hadoop-env.sh
将 export JAVA_HOME=${JAVA_HOME}
修改成 export JAVA_HOME=/root/training/jdk1.8.0_181
(7)hadoop配置
① hdfs-site.xml
参数名 | 参数值 | 含义 |
---|---|---|
dfs.replication | 2 | 设置数据块副本数为2 |
dfs.permissions | false | 禁用权限检查 |
<configuration>
<!--数据块的冗余度,默认是3-->
<!--一般来说,数据块冗余度跟数据节点的个数一致,最大不超过3-->
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<!--禁用了HDFS的权限检查-->
<!--暂时先不配置-->
<!--property>
<name>dfs.permissions</name>
<value>false</value>
</property-->
</configuration>
② core-site.xml
参数名 | 参数值 | 含义 |
---|---|---|
fs.defaultFS | hdfs://bigdata111:9000 | 默认文件系统的名称。 一个URI,其方案和权限确定文件系统实现。 URI的方案确定命名文件系统实现类的Config属性(FS.Scheme.impl)。 URI的权限用于确定文件系统的主机,端口等。 |
hadoop.tmp.dir | /root/training/hadoop-2.7.3/tmp | 其他临时目录的基础。 |
<configuration>
<!--9000是RPC通信的端口-->
<property>
<name>fs.defaultFS</name>
<value>hdfs://bigdata111:9000</value>
</property>
<!--HDFS对应的操作系统目录-->
<!--默认值是Linux的tmp目录-->
<property>
<name>hadoop.tmp.dir</name>
<value>/root/training/hadoop-2.7.3/tmp</value>
</property>
</configuration>
③ mapred-site.xml
参数名 | 参数值 | 含义 |
---|---|---|
mapreduce.framework.name | yarn | 执行MapReduce作业的运行时框架。 |
<configuration>
<!--配置MapReduce运行的框架是Yarn-->
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
④ yarn-site.xml
参数名 | 参数值 | 含义 |
---|---|---|
yarn.resourcemanager.hostname | bigdata111 | yarn的主节点 |
yarn.nodemanager.aux-services | mapreduce_shuffle |
<configuration>
<!--配置ResourceManager的地址-->
<property>
<name>yarn.resourcemanager.hostname</name>
<value>bigdata111</value>
</property>
<!--MapReduce运行的方式是洗牌-->
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
⑤ 设置从节点
配置 salves 文件
bigdata112
bigdata113
⑥ 格式化NameNode(只在bigdata111执行)
cd $HADOOP_HOME/tmp
hdfs dfs -format
⑦ 复制bigdata111上的hadoop安装目录到两个从节点
scp -r $HADOOP_HOME/ root@bigdata112:/root/training
scp -r $HADOOP_HOME/ root@bigdata113:/root/training
(8)启动集群
在bigdata111上启动hadoop
start-all.sh
可以看到bigdata111上有namenode、secondarynamenode、resourcemanage
bigdata112上有datanode、nodemanager
bigdata113上有datanode、nodemanager
登录主节点的hdfs控制台,可以看到有两个datanode
创建hdfs目录并上传输入数据文件
hdfs dfs -mkdir /input
hdfs dfs -put ~/temp/input/data.txt /input
(9)程序测试
cd ~/training/hadoop-2.7.3/share/hadoop/mapreduce/
hadoop jar hadoop-mapreduce-examples-2.7.3.jar wordcount /input /output/wc
查看两个从节点,看看任务在那个节点上执行(可以看到是112)
查看程序执行结果
hdfs dfs -cat /output/wc/part-r-00000
(10)停止Hadoop
在bigdata111执行:
stop-all.sh
查看三台主机,相关的进程都已停止运行
4、HA模式
HA模式需要有 zookeeper 集群、HDFS集群、JournalNode集群,下面使用四台机器来模拟HA的实现。
192.168.190.112 bigdata112
192.168.190.113 bigdata113
192.168.190.114 bigdata114
192.168.190.115 bigdata115
首先这四台机器需要先进行初始化,即关闭防火墙、设置主机名、配置互相之间的免密登录,参照全分布模式前三步。
【zookeeper集群】
bigdata112、bigdata113、bigdata114
【HDFS集群】
namenode:bigdata112、bigdata113
datanode:bigdata114、bigdata115
【Yarn集群】
ResourceManager:bigdata112、bigdata113
NodeManager:bigdata114、bigdata115
【JournalNode集群】
bigdata112、bigdata113
(1)配置启动zk集群
① 在主节点(bigdata112)上配置ZooKeeper
配置 /root/training/zookeeper-3.4.10/conf/zoo.cfg
文件
// 其他配置
dataDir=/root/training/zookeeper-3.4.6/tmp
server.1=bigdata112:2888:3888
server.2=bigdata113:2888:3888
server.3=bigdata114:2888:3888
在 /root/training/zookeeper-3.4.10/tmp
目录下创建一个myid的空文件,在bigdata112、bigdata113、bidata114上的文件内存分别为1、2、3,这时唯一标识每个zk节点的id,所以不可以重复。
将配置好的zookeeper拷贝到其他节点,同时修改各自的myid文件
scp -r /root/training/zookeeper-3.4./ bigdata113:/root/training
scp -r /root/training/zookeeper-3.4.6/ bigdata114:/root/training
启动zk集群(在每个节点上执行)
zkServer.sh start
查看java进程,可以看到每台机器上都出现了一个 QuorumPeerMain
的进程,说明zk集群正常启动了。
通过 zkServer.sh status
查看节点状态,可以看到 bigdata114 当选为 leader,bigdata112、bigdata113 为 follower。
(2)hadoop配置
所有配置文件均位于:$HADOOP_HOME/etc/hadoop
目录下,并且只在bigdata112安装和配置hadoop,完成之后复制到另外三台机器即可。
① core-site.xml
参数名 | 参数值 | 含义 |
---|---|---|
fs.defaultFS | hdfs://ns1 | 默认文件系统的名称。 一个URI,其方案和权限确定文件系统实现。 URI的方案确定命名文件系统实现类的Config属性(FS.Scheme.impl)。 URI的权限用于确定文件系统的主机,端口等。 |
hadoop.tmp.dir | /root/training/hadoop-2.7.3/tmp | 其他临时目录的基础。 |
ha.zookeeper.quorum | bigdata112:2181,bigdata113:2181,bigdata114:2181 | zk服务器列表,用逗号分隔;被ZKFailoverController用于失败转移中 |
<configuration>
<!-- 指定hdfs的nameservice为ns1 -->
<property>
<name>fs.defaultFS</name>
<value>hdfs://ns1</value>
</property>
<!-- 指定hadoop临时目录 -->
<property>
<name>hadoop.tmp.dir</name>
<value>/root/training/hadoop-2.7.3/tmp</value>
</property>
<!-- 指定zookeeper地址 -->
<property>
<name>ha.zookeeper.quorum</name>
<value>bigdata112:2181,bigdata113:2181,bigdata114:2181</value>
</property>
</configuration>
② hdfs-site.xml
参数名 | 参数值 | 含义 |
---|---|---|
dfs.nameservices | ns1 | 逗号分隔的nameservices |
dfs.ha.namenodes.ns1 | nn1,nn2 | 给定nameservice的前缀,包含了一组给定nameservice的逗号分隔的NameNode列表 |
dfs.namenode.rpc-address.ns1.nn1 | bigdata112:9000 | nn1的RPC通讯地址 |
dfs.namenode.http-address.ns1.nn1 | bigdata112:50070 | nn1的HTTP通讯地址 |
dfs.namenode.rpc-address.ns1.nn2 | bigdata113:9000 | nn2的RPC通讯地址 |
dfs.namenode.http-address.ns1.nn2 | bigdata113:50070 | nn2的RPC通讯地址 |
dfs.namenode.shared.edits.dir | qjournal://bigdata112:8485;bigdata113:8485;/ns1 | 一个在HA集群中多个namenode的共享存储目录。该目录会被active的namenode所写,被standby的namenode所读,来保持命名空间的同步;设置本参数则不应该使用 dfs.namenode.name.dir参数;一个非HA的集群本配置应该留空。 |
dfs.journalnode.edits.dir | /root/training/hadoop-2.7.3/journal | 指定JournalNode在本地磁盘存放数据的位置 |
dfs.ha.automatic-failover.enabled | true | 开启namenode的自动故障转移 |
dfs.client.failover.proxy.provider.ns1 | org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider | 配置失败自动切换实现方式 |
dfs.ha.fencing.methods | sshfence shell(/bin/true) |
配置隔离机制方法,多个机制用换行分割,即每个机制暂用一行 |
dfs.ha.fencing.ssh.private-key-files | /root/.ssh/id_rsa | 使用sshfence隔离机制时需要ssh免登陆 |
dfs.ha.fencing.ssh.connect-timeout | 30000 | 配置sshfence隔离机制超时时间 |
<configuration>
<!--指定hdfs的nameservice为ns1,需要和core-site.xml中的保持一致 -->
<property>
<name>dfs.nameservices</name>
<value>ns1</value>
</property>
<!-- ns1下面有两个NameNode,分别是nn1,nn2 -->
<property>
<name>dfs.ha.namenodes.ns1</name>
<value>nn1,nn2</value>
</property>
<!-- nn1的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.ns1.nn1</name>
<value>bigdata112:9000</value>
</property>
<!-- nn1的http通信地址 -->
<property>
<name>dfs.namenode.http-address.ns1.nn1</name>
<value>bigdata112:50070</value>
</property>
<!-- nn2的RPC通信地址 -->
<property>
<name>dfs.namenode.rpc-address.ns1.nn2</name>
<value>bigdata113:9000</value>
</property>
<!-- nn2的http通信地址 -->
<property>
<name>dfs.namenode.http-address.ns1.nn2</name>
<value>bigdata113:50070</value>
</property>
<!-- 指定NameNode的日志在JournalNode上的存放位置 -->
<property>
<name>dfs.namenode.shared.edits.dir</name>
<value>qjournal://bigdata112:8485;bigdata113:8485;/ns1</value>
</property>
<!-- 指定JournalNode在本地磁盘存放数据的位置 -->
<property>
<name>dfs.journalnode.edits.dir</name>
<value>/root/training/hadoop-2.7.3/journal</value>
</property>
<!-- 开启NameNode失败自动切换 -->
<property>
<name>dfs.ha.automatic-failover.enabled</name>
<value>true</value>
</property>
<!-- 配置失败自动切换实现方式 -->
<property>
<name>dfs.client.failover.proxy.provider.ns1</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<!-- 配置隔离机制方法,多个机制用换行分割,即每个机制暂用一行-->
<property>
<name>dfs.ha.fencing.methods</name>
<value>
sshfence
shell(/bin/true)
</value>
</property>
<!-- 使用sshfence隔离机制时需要ssh免登陆 -->
<property>
<name>dfs.ha.fencing.ssh.private-key-files</name>
<value>/root/.ssh/id_rsa</value>
</property>
<!-- 配置sshfence隔离机制超时时间 -->
<property>
<name>dfs.ha.fencing.ssh.connect-timeout</name>
<value>30000</value>
</property>
</configuration>
③ mapred-site.xml
参数名 | 参数值 | 含义 |
---|---|---|
mapreduce.framework.name | yarn | 执行MapReduce作业的运行时框架。 |
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
④ yarn-site.xml
参数名 | 参数值 | 含义 |
---|---|---|
yarn.resourcemanager.ha.enabled | true | 开启RM高可靠。启用后, (1)默认情况下RM在待机模式下启动,并在提示时转换到活动模式; (2)所有的RM列表由参数yarn.resourcemanager.ha.rm-ids设置; |
yarn.resourcemanager.cluster-id | yrc | 群集的名称。 在HA设置中,这用于确保RM参与此集群的领导者选举,并确保它不会影响其他集群 |
yarn.resourcemanager.ha.rm-ids | rm1,rm2 | RM节点的列表 |
yarn.resourcemanager.hostname.rm1 | bigdata112 | 指定rm1的主机名 |
yarn.resourcemanager.hostname.rm2 | bigdata113 | 指定rm2的主机名 |
yarn.resourcemanager.zk-address | bigdata112:2181,bigdata113:2181,bigdata114:2181 | 指定zk集群的地址 |
yarn.resourcemanager.zk-address | mapreduce_shuffle | 指定MapReduce的运行方式是洗牌 |
<configuration>
<!-- 开启RM高可靠 -->
<property>
<name>yarn.resourcemanager.ha.enabled</name>
<value>true</value>
</property>
<!-- 指定RM的cluster id -->
<property>
<name>yarn.resourcemanager.cluster-id</name>
<value>yrc</value>
</property>
<!-- 指定RM的名字 -->
<property>
<name>yarn.resourcemanager.ha.rm-ids</name>
<value>rm1,rm2</value>
</property>
<!-- 分别指定RM的地址 -->
<property>
<name>yarn.resourcemanager.hostname.rm1</name>
<value>bigdata112</value>
</property>
<property>
<name>yarn.resourcemanager.hostname.rm2</name>
<value>bigdata113</value>
</property>
<!-- 指定zk集群地址 -->
<property>
<name>yarn.resourcemanager.zk-address</name>
<value>bigdata112:2181,bigdata113:2181,bigdata114:2181</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
⑤ slaves
指定集群的从节点,这里设置为 bigdata114、bigdata115。
bigdata114
bigdata115
(3)拷贝hadoop
scp -r /root/training/hadoop-2.7.3/ root@bigdata113:/root/training/
scp -r /root/training/hadoop-2.7.3/ root@bigdata114:/root/training/
scp -r /root/training/hadoop-2.7.3/ root@bigdata115:/root/training/
(4)启动JournalNode
在bigdata112、bigdata113上执行:
hadoop-daemon.sh start journalnode
(5)格式化HDFS
① 在bigdata112上格式化生成namenode
hdfs namenode -format
② 将格式化后tmp目录下生成的文件拷贝到bigdata113
scp -r dfs/ root@bigdata113:/root/training/hadoop-2.7.3/tmp
③ 格式化zookeeper
hdfs zkfc -formatZK
通过zkfc来开启对namenode的监控。
(6)启动Hadoop集群
start-all.sh
可以看到以下日志:
// 1、启动 namenode 和 datanode
Starting namenodes on [bigdata112 bigdata113]
bigdata112: starting namenode, logging to /root/training/hadoop-2.7.3/logs/hadoop-root-namenode-bigdata112.out
bigdata113: starting namenode, logging to /root/training/hadoop-2.7.3/logs/hadoop-root-namenode-bigdata113.out
bigdata115: starting datanode, logging to /root/training/hadoop-2.7.3/logs/hadoop-root-datanode-bigdata115.out
bigdata114: starting datanode, logging to /root/training/hadoop-2.7.3/logs/hadoop-root-datanode-bigdata114.out
// 2、启动 JournalNode
Starting journal nodes [bigdata112 bigdata113 ]
bigdata113: journalnode running as process 1554. Stop it first.
bigdata112: journalnode running as process 1597. Stop it first.
// 3、启动ZK失败转移控制器
Starting ZK Failover Controllers on NN hosts [bigdata112 bigdata113]
bigdata113: starting zkfc, logging to /root/training/hadoop-2.7.3/logs/hadoop-root-zkfc-bigdata113.out
bigdata112: starting zkfc, logging to /root/training/hadoop-2.7.3/logs/hadoop-root-zkfc-bigdata112.out
// 4、启动Yarn
starting yarn daemons
starting resourcemanager, logging to /root/training/hadoop-2.7.3/logs/yarn-root-resourcemanager-bigdata112.out
bigdata114: starting nodemanager, logging to /root/training/hadoop-2.7.3/logs/yarn-root-nodemanager-bigdata114.out
bigdata115: starting nodemanager, logging to /root/training/hadoop-2.7.3/logs/yarn-root-nodemanager-bigdata115.out
bigdata113上 的 ResourceManager 需要单独启动:
yarn-daemon.sh start resourcemanag
启动后的HA Hadoop集群中所有节点上的 java 进程如下
(7)测试HA
通过命令检查两个namenode的状态,其中nn1是bigdata112,nn2是bigdata113。
[root@bigdata112 hadoop]# hdfs haadmin -getServiceState nn1
active
[root@bigdata112 hadoop]# hdfs haadmin -getServiceState nn2
standby
可以看到 bigdata112 是活跃 namenode,bigdata113 是备用 namenode。
现在杀死 bigdata112 上的 namenode 进程,再查看两个节点的状态。
[root@bigdata112 hadoop]# jps
1776 NameNode
2578 Jps
2197 ResourceManager
2092 DFSZKFailoverController
1597 JournalNode
1406 QuorumPeerMain
[root@bigdata112 hadoop]# kill -9 1776
[root@bigdata112 hadoop]# hdfs haadmin -getServiceState nn1
22/02/28 20:29:16 INFO ipc.Client: Retrying connect to server: bigdata112/192.168.190.112:9000. Already tried 0 time(s); retry policy is RetryUpToMaximumCountWithFixedSleep(maxRetries=1, sleepTime=1000 MILLISECONDS)
Operation failed: Call From bigdata112/192.168.190.112 to bigdata112:9000 failed on connection exception: java.net.ConnectException: 拒绝连接; For more details see: http://wiki.apache.org/hadoop/ConnectionRefused
[root@bigdata112 hadoop]# hdfs haadmin -getServiceState nn2
active
可以看到 nn1,无法连接,而 nn2 的状态变成了 active,证明 HA 的配置生效了。
在 bigdata112 上重启 hdfs,可以看到又有了 namenode 的进程,nn1 的状态为 standby。
[root@bigdata112 hadoop]# start-dfs.sh
Starting namenodes on [bigdata112 bigdata113]
bigdata112: starting namenode, logging to /root/training/hadoop-2.7.3/logs/hadoop-root-namenode-bigdata112.out
bigdata113: namenode running as process 1642. Stop it first.
bigdata115: datanode running as process 1313. Stop it first.
bigdata114: datanode running as process 1535. Stop it first.
Starting journal nodes [bigdata112 bigdata113 ]
bigdata113: journalnode running as process 1554. Stop it first.
bigdata112: journalnode running as process 1597. Stop it first.
Starting ZK Failover Controllers on NN hosts [bigdata112 bigdata113]
bigdata113: zkfc running as process 1764. Stop it first.
bigdata112: zkfc running as process 2092. Stop it first.
[root@bigdata112 hadoop]# jps
2197 ResourceManager
3094 Jps
2092 DFSZKFailoverController
2780 NameNode
1597 JournalNode
1406 QuorumPeerMain
[root@bigdata112 hadoop]# hdfs haadmin -getServiceState nn1
standby
使用zk工具查看znode的数据信息,可以看到 hdfs、yarn 的注册信息。
5、联邦HDFS
(1)hadoop配置
① core-site.xml
参数名 | 参数值 | 含义 |
---|---|---|
hadoop.tmp.dir | /root/training/hadoop-2.7.3/tmp | 其他临时目录的基础。 |
fs.viewfs.mounttable.xdl1.homedir | /home | 视图文件系统home目录 |
fs.viewfs.mounttable.xdl1.link./movie | hdfs://bigdata112:9000/movie | viewFs挂载/movies目录到bigdata112 |
fs.viewfs.mounttable.xdl1.link./mp3 | hdfs://bigdata113:9000/mp3 | viewFs挂载/mp3目录到bigdata113 |
fs.default.name | viewfs://xdl1 | 使用视图文件系统,并使用路由规则xdl1 |
<configuration>
<property>
<name>hadoop.tmp.dir</name>
<value>/root/training/hadoop-2.7.3/tmp</value>
</property>
<!-- 下面的配置信息定义的是路由规则 -->
<property>
<name>fs.viewfs.mounttable.xdl1.homedir</name>
<value>/home</value>
</property>
<property>
<name>fs.viewfs.mounttable.xdl1.link./movie</name>
<value>hdfs://bigdata112:9000/movie</value>
</property>
<property>
<name>fs.viewfs.mounttable.xdl1.link./mp3</name>
<value>hdfs://bigdata113:9000/mp3</value>
</property>
<property>
<name>fs.default.name</name>
<value>viewfs://xdl1</value>
</property>
</configuration>
② hdfs-site.xml
参数名 | 参数值 | 含义 |
---|---|---|
dfs.replication | 2 | 逗号分隔的nameservices |
dfs.permissions | false | 不开启权限检验 |
dfs.nameservices | ns1,ns2 | 配置联盟中有两个名称服务,即有两个namenode |
dfs.namenode.rpc-address.ns1 | bigdata112:9000 | ns1的RPC通讯地址 |
dfs.namenode.http-address.ns1 | bigdata112:50070 | ns1的HTTP通讯地址 |
dfs.namenode.secondaryhttp-address.ns1 | bigdata112:50090 | ns1的SecondaryNameNode的RPC通讯地址 |
dfs.namenode.rpc-address.ns2 | bigdata113:9000 | ns2的RPC通讯地址 |
dfs.namenode.http-address.ns2 | bigdata113:50070 | ns2的HTTP通讯地址 |
dfs.namenode.secondaryhttp-address.ns2 | bigdata113:50090 | ns2的SecondaryNameNode的RPC通讯地址 |
<configuration>
<!-- 只有两个datanode,副本数设为2 -->
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
<!--配置联盟有几个名称服务-->
<!--有几个NameNode-->
<property>
<name>dfs.nameservices</name>
<value>ns1,ns2</value>
</property>
<!--配置第一个NameNode-->
<property>
<name>dfs.namenode.rpc-address.ns1</name>
<value>bigdata112:9000</value>
</property>
<property>
<name>dfs.namenode.http-address.ns1</name>
<value>bigdata112:50070</value>
</property>
<property>
<name>dfs.namenode.secondaryhttp-address.ns1</name>
<value>bigdata112:50090</value>
</property>
<!--配置第二个NameNode-->
<property>
<name>dfs.namenode.rpc-address.ns2</name>
<value>bigdata113:9000</value>
</property>
<property>
<name>dfs.namenode.http-address.ns2</name>
<value>bigdata113:50070</value>
</property>
<property>
<name>dfs.namenode.secondaryhttp-address.ns2</name>
<value>bigdata113:50090</value>
</property>
</configuration>
③ mapred-site.xml
参数名 | 参数值 | 含义 |
---|---|---|
mapreduce.framework.name | yarn | 执行MapReduce作业的运行时框架。 |
<configuration>
<property>
<name>mapreduce.framework.name</name>
<value>yarn</value>
</property>
</configuration>
④ yarn-site.xml
参数名 | 参数值 | 含义 |
---|---|---|
yarn.resourcemanager.hostname | bigdata112 | 指定rm2的主机名 |
yarn.nodemanager.aux-services | mapreduce_shuffle | 指定MapReduce的运行方式是洗牌 |
<configuration>
<property>
<name>yarn.resourcemanager.hostname</name>
<value>bigdata112</value>
</property>
<property>
<name>yarn.nodemanager.aux-services</name>
<value>mapreduce_shuffle</value>
</property>
</configuration>
⑤ 设置从节点
配置 salves 文件
bigdata112
bigdata113
(2)拷贝hadoop
scp -r hadoop-2.7.3/ root@bigdata113:/root/training
scp -r hadoop-2.7.3/ root@bigdata114:/root/training
scp -r hadoop-2.7.3/ root@bigdata115:/root/training
(3)格式化namenode
在bigdata112、bigdata113上指定ID格式化
hdfs namenode -format -clusterId xdl1
(4)启动Hadoop集群
start-all.sh
(5)创建路由目录
在每个namenode上分别创建对应的目录
hadoop fs -mkdir hdfs://bigdata112:9000/movie
hadoop fs -mkdir hdfs://bigdata113:9000/mp3
查看hdfs文件目录
[root@bigdata112 hadoop]# hdfs dfs -ls /
Found 2 items
-r-xr-xr-x - root root 0 2022-02-28 22:30 /movie
-r-xr-xr-x - root root 0 2022-02-28 22:30 /mp3
分别查看bigdata112、bigdata113的hdfs网页控制台,都只看到一个目录。
(6)上传文件测试
[root@bigdata112 ~]# hdfs dfs -put data1.txt /movie
[root@bigdata112 ~]# hdfs dfs -put data2.txt /mp3
查看hdfs网页控制台
五、HDFS命令
hdfs dfs
-mkdir [-p]: 创建目录
-ls: 查询目录
-ls -R / -lsr: 查询目录,包括子目录
-put: 上传数据
-copyFromLocal: 上传数据
-moveFromLocal: 上传数据
-get: 下载数据
-copyToLocal: 下载数据
-rm: 删除目录
-rmr: 删除目录,包括子目录
-getmerge: 先把某个目录下的文件合并,再下载
vi student01.txt
vi student02.txt
hdfs dfs -mkdir /students
hdfs dfs -put student0* /students
hdfs dfs -getmerge /students ./allstudents.txt
-cp:拷贝 hdfs dfs -cp /input/data.txt /input/data2.txt
-mv:剪切
-text、-cat:查看文本文件的内容
balancer:平衡操作
hdfs balancer
目录创建与查询
[root@bigdata111 temp]# hdfs dfs -mkdir /data
[root@bigdata111 temp]# hdfs dfs -ls /
Found 11 items
drwxr-xr-x - root supergroup 0 2022-02-11 17:57 /data
创建和查询多层目录
[root@bigdata111 temp]# hdfs dfs -mkdir -p /data/aaa/bbb
[root@bigdata111 temp]# hdfs dfs -lsr /data
lsr: DEPRECATED: Please use 'ls -R' instead.
drwxr-xr-x - root supergroup 0 2022-03-01 01:06 /data/aaa
drwxr-xr-x - root supergroup 0 2022-03-01 01:06 /data/aaa/bbb
上传文件(-put
、-copyFromLocal
效果一样)
[root@bigdata111 temp]# ll
总用量 60
-rw-r--r--. 1 root root 33 1月 14 05:25 data01.txt
-rw-r--r--. 1 root root 13 1月 14 05:26 data02.txt
-rw-r--r--. 1 root root 32 1月 14 05:26 data03.txt
...
[root@bigdata111 temp]# hdfs dfs -put data01.txt /data
[root@bigdata111 temp]# hdfs dfs -copyFromLocal data02.txt /data
[root@bigdata111 temp]# hdfs dfs -lsr /data
lsr: DEPRECATED: Please use 'ls -R' instead.
drwxr-xr-x - root supergroup 0 2022-03-01 01:06 /data/aaa
drwxr-xr-x - root supergroup 0 2022-03-01 01:06 /data/aaa/bbb
-rw-r--r-- 2 root supergroup 33 2022-03-01 01:08 /data/data01.txt
-rw-r--r-- 2 root supergroup 13 2022-03-01 01:08 /data/data02.txt
上传文件并且从本地文件系统中删除
[root@bigdata111 temp]# hdfs dfs -moveFromLocal data03.txt /data
[root@bigdata111 temp]# hdfs dfs -lsr /data
lsr: DEPRECATED: Please use 'ls -R' instead.
drwxr-xr-x - root supergroup 0 2022-03-01 01:06 /data/aaa
drwxr-xr-x - root supergroup 0 2022-03-01 01:06 /data/aaa/bbb
-rw-r--r-- 2 root supergroup 33 2022-03-01 01:08 /data/data01.txt
-rw-r--r-- 2 root supergroup 13 2022-03-01 01:08 /data/data02.txt
-rw-r--r-- 2 root supergroup 32 2022-03-01 01:10 /data/data03.txt
[root@bigdata111 temp]# ll
总用量 56
-rw-r--r--. 1 root root 33 1月 14 05:25 data01.txt
-rw-r--r--. 1 root root 13 1月 14 05:26 data02.txt
-rw-r--r--. 1 root root 60 12月 22 07:04 data.txt
...
下载数据(-get
、-moveFromLocal
的效果一样)
[root@bigdata111 temp]# hdfs dfs -get /data/data03.txt
[root@bigdata111 temp]# ll
总用量 60
-rw-r--r--. 1 root root 33 1月 14 05:25 data01.txt
-rw-r--r--. 1 root root 13 1月 14 05:26 data02.txt
-rw-r--r--. 1 root root 32 3月 1 01:11 data03.txt
-rw-r--r--. 1 root root 60 12月 22 07:04 data.txt
...
删除文件
[root@bigdata111 temp]# hdfs dfs -rm /data/data01.txt
[root@bigdata111 temp]# hdfs dfs -lsr /data
lsr: DEPRECATED: Please use 'ls -R' instead.
drwxr-xr-x - root supergroup 0 2022-03-01 01:06 /data/aaa
drwxr-xr-x - root supergroup 0 2022-03-01 01:06 /data/aaa/bbb
-rw-r--r-- 2 root supergroup 13 2022-03-01 01:08 /data/data02.txt
-rw-r--r-- 2 root supergroup 32 2022-03-01 01:10 /data/data03.txt
...
删除目录,包括目录下的文件和子目录
[root@bigdata111 temp]# hdfs dfs -rmr /data/aaa
[root@bigdata111 temp]# hdfs dfs -lsr /data
lsr: DEPRECATED: Please use 'ls -R' instead.
-rw-r--r-- 2 root supergroup 13 2022-03-01 01:08 /data/data02.txt
-rw-r--r-- 2 root supergroup 32 2022-03-01 01:10 /data/data03.txt
合并文件再下载
[root@bigdata111 temp]# cat student01.txt
1,Tom,23
2,Mary,22
[root@bigdata111 temp]# cat student02.txt
3,Mike,24
[root@bigdata111 temp]# cat student03.txt
4,Jone,21
[root@bigdata111 temp]# hdfs dfs -put student0* /data
[root@bigdata111 temp]# hdfs dfs -lsr /data
lsr: DEPRECATED: Please use 'ls -R' instead.
-rw-r--r-- 2 root supergroup 13 2022-03-01 01:08 /data/data02.txt
-rw-r--r-- 2 root supergroup 32 2022-03-01 01:10 /data/data03.txt
-rw-r--r-- 2 root supergroup 19 2022-03-01 01:19 /data/student01.txt
-rw-r--r-- 2 root supergroup 10 2022-03-01 01:19 /data/student02.txt
-rw-r--r-- 2 root supergroup 10 2022-03-01 01:19 /data/student03.txt
[root@bigdata111 temp]# hdfs dfs -getmerge /data/student* ./allstudents.txt
[root@bigdata111 temp]# cat allstudents.txt
1,Tom,23
2,Mary,22
3,Mike,24
4,Jone,21
文件复制与剪切
[root@bigdata111 temp]# hdfs dfs -lsr /data
lsr: DEPRECATED: Please use 'ls -R' instead.
-rw-r--r-- 2 root supergroup 13 2022-03-01 01:08 /data/data02.txt
-rw-r--r-- 2 root supergroup 32 2022-03-01 01:10 /data/data03.txt
-rw-r--r-- 2 root supergroup 19 2022-03-01 01:19 /data/student01.txt
-rw-r--r-- 2 root supergroup 10 2022-03-01 01:19 /data/student02.txt
-rw-r--r-- 2 root supergroup 10 2022-03-01 01:19 /data/student03.txt
[root@bigdata111 temp]# hdfs dfs -cp /data/data02.txt /data/data.txt
[root@bigdata111 temp]# hdfs dfs -lsr /data
lsr: DEPRECATED: Please use 'ls -R' instead.
-rw-r--r-- 2 root supergroup 13 2022-03-01 01:22 /data/data.txt
-rw-r--r-- 2 root supergroup 13 2022-03-01 01:08 /data/data02.txt
-rw-r--r-- 2 root supergroup 32 2022-03-01 01:10 /data/data03.txt
-rw-r--r-- 2 root supergroup 19 2022-03-01 01:19 /data/student01.txt
-rw-r--r-- 2 root supergroup 10 2022-03-01 01:19 /data/student02.txt
-rw-r--r-- 2 root supergroup 10 2022-03-01 01:19 /data/student03.txt
[root@bigdata111 temp]# hdfs dfs -mv /data/data02.txt /data/data01.txt
[root@bigdata111 temp]# hdfs dfs -lsr /data
lsr: DEPRECATED: Please use 'ls -R' instead.
-rw-r--r-- 2 root supergroup 13 2022-03-01 01:22 /data/data.txt
-rw-r--r-- 2 root supergroup 13 2022-03-01 01:08 /data/data01.txt
-rw-r--r-- 2 root supergroup 32 2022-03-01 01:10 /data/data03.txt
-rw-r--r-- 2 root supergroup 19 2022-03-01 01:19 /data/student01.txt
-rw-r--r-- 2 root supergroup 10 2022-03-01 01:19 /data/student02.txt
-rw-r--r-- 2 root supergroup 10 2022-03-01 01:19 /data/student03.txt
查看文本文件内容
[root@bigdata111 temp]# hdfs dfs -cat /data/data.txt
I love China
[root@bigdata111 temp]# hdfs dfs -text /data/data.txt
I love China
六、数据上传下载工作原理
1、数据上传
数据上传的流程步骤如下:
① 客户端创建 DistributedFileSystem 对象;
② DistributedFileSystem 对象内部会封装创建一个 DFSClient 对象,并依赖该对象与 namenode 进行RPC通讯;
③ DFSClient 与 namenode 建立 RPC 通信,得到一个 NameNode Proxy 的代理对象;
④ 客户端请求创建文件的元信息;
⑤ NameNode 接收到请求,创建文件的元信息,一般占用150个字节,根据文件大小划分为多个数据块,并根据副本数设置和机架感知,指定要存储在哪些 DataNode 上;
元信息:大概150个字节 { 文件名:a.avi 路径:/movie 大小:200M 数据块:2个 数据块1:DN1, DN2, DN3 数据块2:DN1, DN2, DN3 }
⑥ 将元信息写入内存和编辑日志;
⑦ 后台进程会定期生成与合并 fsimage 文件;
⑧ NameNode 将生成的元信息数据返回给客户端;
⑨ 客户端创建输出流对象 FSDataOutputStream,对文件进行切块;
⑩ 将第一份数据上传到 DN1,并根据机架感知水平复制到 DN2、DN3;
⑪ 循环上传所有的数据块。
【详细设计】
FSDataOutputStream 封装一个 DFSOutputStream 对象,该对象负责处理 datanode 和 namenode 之间的通信。在客户端写入数据时,DFSOutputStream 将它分成一个个数据包,并写入内部队列,称为 “数据队列”(data queue),DFSOutputStream 内部封装了一个 DataStream,它会处理数据队列,挑选出适合存储数据复本的一组 datanode,并据此来要求 namenode 分配新的数据块。
DFSOutputStream 也维护者一个内部数据包队列来等待 dataname 收到确认回执,表示数据块已正常存储,称为 “确认队列”(ack queue)。收到所有 datanode 确认信息后,该数据包才会从确认队列删除。
若任何 datanode 在数据写入期间发生故障,则按照以下步骤处理:
1)关闭管线,把确认队列中的所有数据包添加会队列的最前端,以保障故障节点下游的 datanode 不会漏掉任何一个数据包;
2)为存储在另一正常 datanode 的当前数据块指定一个新的标识,并将该标识传给 namenode,以便故障 datanode 在恢复后删除存储的部分数据块(即DN5中的block2不是完整的四个包);
3)从管线中删除故障 datanode,基于两个正常 datanode 构建一条新管线;
4)余下的数据块写入管线中正常的 datanode;
5)namenode 注意到副本数不足,会在另一个节点上创建一个新的副本。
相关设置参数:
参数名 | 参数值 | 含义 |
---|---|---|
dfs.namenode.replication.min | 1 | 认为数据块同步写入成功的最小副本 |
dfs.replication | 3 | 写操作成功后,最终数据块在集群中异步复制要达到的目标副本数 |
2、数据下载
数据下载的流程步骤如下:
① 客户端创建 DistributedFileSystem 对象;
② DistributedFileSystem 对象内部会封装创建一个 DFSClient 对象,并依赖该对象与 namenode 进行RPC通讯;
③ DFSClient 与 namenode 建立 RPC 通信,得到一个 NameNode Proxy 的代理对象;
④ 客户端请求获取文件的元信息;
⑤ NameNode 接收到请求,从内存中快速找到文件的元信息;
⑥ 若内存没有则从 fsimage 中读取;
⑦ 将元信息返回给客户端;
元信息:大概150个字节 { 文件名:a.avi 路径:/movie 大小:200M 数据块:2个 数据块1:DN1, DN2, DN3 数据块2:DN1, DN2, DN3 }
⑧ 客户端创建输入流对象 FSDataInputStream;
⑨ 下载第一个数据块;
⑩ 循环下载所有数据块。
七、Java API的使用
启动全分布集群,namenode 为 bigdata111(192.168.190.111)。
1、目录操作
@Test
public void test1() throws Exception{
// 指定NameNode地址
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.190.111:9000");
// 创建一个HDFS的客户端 DistributedFileSystem
FileSystem client = FileSystem.get(conf);
// 创建目录
client.mkdirs(new Path("/folder1"));
// 关闭客户端
client.close();
}
错误:
- org.apache.hadoop.security.AccessControlException:
Permission denied: user=huyihao, access=WRITE, inode="/folder1":root:supergroup:drwxr-xr-x
若发生上述错误,说明当前用户没有写入权限,有几种方法可以解决:
(1) 设置执行用户是 root 用户
① 使用 System.setProperty("HADOOP_USER_NAME", "root");
直接设置
@Test
public void test2() throws Exception{
System.setProperty("HADOOP_USER_NAME", "root");
// 指定NameNode地址
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.190.111:9000");
// 创建一个HDFS的客户端 DistributedFileSystem
FileSystem client = FileSystem.get(conf);
// 创建目录
client.mkdirs(new Path("/folder1"));
// 关闭客户端
client.close();
}
② 使用传JVM参数的方式设置 -DHADOOP_USER_NAME=root
(2)修改文件权限
使用chmod的命令: hdfs dfs -chmod 777 /folder2
@Test
public void test3() throws Exception{
// 指定NameNode地址
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.190.111:9000");
// 创建一个HDFS的客户端 DistributedFileSystem
FileSystem client = FileSystem.get(conf);
// 创建目录
client.mkdirs(new Path("/folder2"));
// 关闭客户端
client.close();
}
(3)直接关闭 dfs 的权限校验
hdfs-site.xml
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
2、读取数据
(1)使用标准输出读取数据
package chapter03.hdfs;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.URL;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
/**
* 1. 以标准输出方式显示Hadoop文件系统中的文件
*/
public class URLCat {
static {
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
public static void main(String[] args) throws Exception {
String url = "hdfs://bigdata111:9000/data/data.txt";
InputStream input = null;
OutputStream output = System.out;
try {
input = new URL(url).openStream();
//构造一个缓冲区
byte[] buffer = new byte[1024];
int len = 0;
while((len=input.read(buffer)) > 0) {
output.write(buffer, 0, len);
}
} finally {
if (input != null) {
input.close();
}
output.flush();
output.close();
}
}
}
在 windows 下的IDE执行上述程序时,会报错
需要在 windows 上安装 hadoop 的环境,步骤如下:
① 下载包
最好跟服务端的版本一致,不一致修改下载链接的版本号即可:
http://archive.apache.org/dist/hadoop/core/hadoop-2.7.3/hadoop-2.7.3.tar.gz
https://github.com/amihalik/hadoop-common-2.6.0-bin (github上没找到2.7.3版本)
② 解压
将第一个链接下载的包解压到 D:\soft\hadoop-2.7.3
, 将第二个链接下载的文件中的 bin 目录下的所有文件拷贝到 D:\soft\hadoop-2.7.3\bin
。
③ 配置环境变量
变量名 | 参数值 |
---|---|
HADOOP_HOME | D:\soft\hadoop-2.7.3 |
JAVA_HOME | 将原来设置的C:\Program Files\Java\jdk1.8.0_202改为C:\Progra~1\Java\jdk1.8.0_202 |
CLASSPATH | 在原来的值后面追加 D:\soft\hadoop-2.7.3\bin\winutils.exe |
PATH | 在原来的值后面追加 %HADOOP_HOME%\bin |
④ 验证环境变量是否生效
配置完成后,正常把hdfs读取到的文件内容打印到输出流
(2)使用 Hadoop API 读取数据
@Test
public void standardOutput() throws IOException {
//指定NameNode的地址
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.190.111:9000");
//创建HDFS的客户端
FileSystem client = FileSystem.get(conf);
//得到一个输入流
InputStream input = client.open(new Path("/data/data.txt"));
OutputStream output = System.out;
IOUtils.copyBytes(input, output, 1024);
// 上面一行代码的作用相当于下面的这段代码
// 构造一个缓冲区
/*byte[] buffer = new byte[1024];
int len = 0;
while((len=input.read(buffer)) > 0) {
output.write(buffer, 0, len);
}
output.flush();
output.close();
input.close();*/
}
使用命令查看文件内容:
[root@bigdata111 hadoop]# hdfs dfs -text /data/data.txt
I love China
代码执行结果
3、写入数据
将本地文件写到HDFS中,实际上就是个上传的过程。
@Test
public void upload() throws Exception {
// 构造一个输入流,代表要上传的数据
InputStream input = new FileInputStream("D:\\Hadoop\\软件包\\Hadoop生态圈组件\\hadoop-2.7.3.tar.gz");
// 指定NameNode 地址
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.190.111:9000");
// 创建HDFS的客户端
FileSystem client = FileSystem.get(conf);
//构造一个输出流,指向HDFS
OutputStream output = client.create(new Path("/folder1/b.tag.gz"));
// 使用工具类简化
IOUtils.copyBytes(input, output, 1024);
}
带进度条的上传
@Test
public void uploadWithProgress() throws Exception {
// 构造一个输入流,代表要上传的数据
InputStream input = new FileInputStream("D:\\Hadoop\\软件包\\Hadoop生态圈组件\\hadoop-2.7.3.tar.gz");
// 指定NameNode 地址
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.190.111:9000");
// 创建HDFS的客户端
FileSystem client = FileSystem.get(conf);
//构造一个输出流,指向HDFS
OutputStream output = client.create(new Path("/folder1/b.tag.gz"), new Progressable() {
@Override
public void progress() {
System.out.print(".");
}
});
// 使用工具类简化
IOUtils.copyBytes(input, output, 1024);
}
可以看到文件上传成功
4、删除数据
@Test
public void delete() throws IOException {
// 指定NameNode 地址
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.190.111:9000");
// 创建HDFS的客户端
FileSystem client = FileSystem.get(conf);
boolean ret = client.delete(new Path("/folder1/a.tag.gz"));
System.out.println(ret ? "删除成功" : "删除失败");
}
5、查询文件系统
显示一个文件、目录的状态信息
import java.io.IOException;
import java.io.OutputStream;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
public class ShowFileStatus {
// 显示文件状态信息
@Test
public void fileStatusForFile() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.190.111:9000");
FileSystem fs = FileSystem.get(conf);
// 创建一个文件
String filePath = "/dir/file";
OutputStream out = fs.create(new Path(filePath));
out.write("content".getBytes("UTF-8"));
// 读取文件状态
FileStatus fileStatus = fs.getFileStatus(new Path(filePath));
System.out.println("文件路径为:\t" + fileStatus.getPath().toUri().getPath());
System.out.println("是否为目录:\t" + fileStatus.isDirectory());
System.out.println("是否为文件:\t" + fileStatus.isFile());
System.out.println("文件权限:\t" + fileStatus.getPermission().toString());
System.out.println("用户:\t" + fileStatus.getOwner());
System.out.println("组:\t" + fileStatus.getGroup());
System.out.println("文件大小:\t" + fileStatus.getLen());
System.out.println("创建时间:\t" + fileStatus.getAccessTime());
System.out.println("最近修改时间:\t" + fileStatus.getModificationTime());
System.out.println("文件副本数:\t" + fileStatus.getReplication());
System.out.println("文件占用数据块数:\t" + fileStatus.getBlockSize() / (128 * 1024 * 1024L));
//System.out.println("符号链路的内容(有可能是软链接):\t" + fileStatus.getSymlink());
}
// 显示目录状态信息
@Test
public void fileStatusForDirectory() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.190.111:9000");
FileSystem fs = FileSystem.get(conf);
Path dir = new Path("/dir");
FileStatus fileStatus = fs.getFileStatus(dir);
System.out.println("文件路径为:\t" + fileStatus.getPath().toUri().getPath());
System.out.println("是否为目录:\t" + fileStatus.isDirectory());
System.out.println("是否为文件:\t" + fileStatus.isFile());
System.out.println("文件权限:\t" + fileStatus.getPermission().toString());
System.out.println("用户:\t" + fileStatus.getOwner());
System.out.println("组:\t" + fileStatus.getGroup());
System.out.println("文件大小:\t" + fileStatus.getLen());
System.out.println("创建时间:\t" + fileStatus.getAccessTime());
System.out.println("最近修改时间:\t" + fileStatus.getModificationTime());
System.out.println("文件副本数:\t" + fileStatus.getReplication());
System.out.println("文件占用数据块数:\t" + fileStatus.getBlockSize() / (128 * 1024 * 1024L));
}
}
获取文件状态信息列表
import java.io.IOException;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
public class ListStatus {
// 显示一组路径的文件信息
@Test
public void listGroupStatus() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.190.111:9000");
FileSystem fs = FileSystem.get(conf);
Path[] paths = new Path[2];
paths[0] = new Path("/data/student01.txt");
paths[1] = new Path("/data/student02.txt");
FileStatus[] status = fs.listStatus(paths);
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths)
{
System.out.println(p);
}
showStatus(status);
}
// 显示一个路径下的所有文件和目录(不会迭代遍历)
@Test
public void listDirectoryStatus() throws IOException {
Configuration conf = new Configuration();
conf.set("fs.defaultFS", "hdfs://192.168.190.111:9000");
FileSystem fs = FileSystem.get(conf);
FileStatus[] status = fs.listStatus(new Path("/data"));
Path[] listedPaths = FileUtil.stat2Paths(status);
for (Path p : listedPaths)
{
System.out.println(p);
}
showStatus(status);
}
private void showStatus(FileStatus[] status) {
for (FileStatus fs : status) {
showStatus(fs);
}
}
private void showStatus(FileStatus fileStatus) {
System.out.println("-------------------------------------------------------");
System.out.println("文件路径为:\t" + fileStatus.getPath().toUri().getPath());
System.out.println("是否为目录:\t" + fileStatus.isDirectory());
System.out.println("是否为文件:\t" + fileStatus.isFile());
System.out.println("文件权限:\t" + fileStatus.getPermission().toString());
System.out.println("用户:\t" + fileStatus.getOwner());
System.out.println("组:\t" + fileStatus.getGroup());
System.out.println("文件大小:\t" + fileStatus.getLen());
System.out.println("创建时间:\t" + fileStatus.getAccessTime());
System.out.println("最近修改时间:\t" + fileStatus.getModificationTime());
System.out.println("文件副本数:\t" + fileStatus.getReplication());
System.out.println("文件占用数据块数:\t" + fileStatus.getBlockSize() / (128 * 1024 * 1024L));
System.out.println("-------------------------------------------------------");
}
}
八、并行复制
Hadoop 自带一个可并行复制大量数据的命令 distcp,执行一个 distcp 命令会触发一个 MapReduce 作业的执行,在大文件复制时具有较高的性能。
文件复制
hadoop distcp [-overwrite] file1 file2
# -overwrite: 若目标文件存在强制覆盖。
目录复制
hadoop distcp [-overwrite] [-update] [-delete] [-p] dir1 dir2
# -update: 仅更新发生变化的文件。
# -delete: 删除目标路径中任意没在源路径中出现的文件或目录。
# -p: 文件状态属性如权限、块大小和复本数被保留。
可以方便地在不同的HDFS集群间拷贝数据
hadoop distcp hdfs://nn1:8020/foo/bar hdfs://nn2:8020/bar/foo
更多用法参考官方文档:https://hadoop.apache.org/docs/r2.7.3/hadoop-distcp/DistCp.html。
网友评论