简介
- 分布式文件存储,文件在物理上是分块存储(block),可以通过dfs.blocksizes设定,2.x默认是128M,老版本是64M;
- 为客户端提供统一的抽象目录树,客户端通过路径来访问;
- 目录结构及分块信息(元数据)由namenode节点承担,namenode是HDFS集群的主节点,维护整个目录结构block块信息以及datanode信息;
- 文件的各个block的存储管理由datanode负责,DataNode是HDFS的从节点,每一个block都可以在多个DataNode上存储多个副本;
- 一次写入,多次读取,不支持修改;
注意:HDFS适合做数据分析,并不适合做网盘应用,不便修改,延迟 大,网络开销大,成本太高。
常用Shell命令
1. 显示目录信息
hadoop fs -sl /
2. 创建目录
hadoop fs -mkdir -p /dir1/dir2
3. 本地截切粘贴
hadoop fs -moveFromLocal /root/1.txt /dir1/dir2
4. 追加文件到已存在文件末尾
hadoop fs -appendToFile /root/2.txt /dir1/dir2/1.txt
5. 查看内容
hadoop fs -cat /dir1/dir2/1.txt
hadoop fs -tail /dir1/dir2/1.txt
hadoop fs -text /dir1/dir2/1.txt
6. 拷贝到HDFS
hadoop fs -copyFromLocal ./1.txt /dir1/dir2
hadoop fs -put ./1.txt /dir1/dir2
7. HDFS内部拷贝
hadoop fs -cp /dir1/dir2/1.txt /dir1/dir2/2.txt
8. HDFS内部移动
hadoop fs -mv ./2.txt /
9. 从HDFS拷贝到本地
hadoop fs -copyToLocal /2.txt
hadoop fs -get /2.txt
10. 合并下载多个文件
hadoop fs -getmerage /dir1/dir2/1.txt /2.txt
11. 删除文件或文件夹
hadoop fs -rm -r /dir1/dir2
hadoop fs -rmdir /dir1/dir2(删除空文件夹)
12. 统计文件系统可用空间
hadoop fs -df -h /
hadoop fs -du -h /dir1/dir2
13. 统计一个指定目录下的文件节点数据量
hadoop fs -count /dir1/
14. 设置HDFS中文件的副本数量
hadoop fs -setrep 3 /dir1/dir2/1.txt
HDFS 工作机制
- 文件写入过程
客户端向HDFS写数据,首先要与NameNode通信,以确认可以写文件并获取DataNode节点信息,然后客户端根据DateNode信息按顺序上传数据,DataNode将Block副本复制到其他DataNode;- 与NameNode 通信,请求上传数据,NameNode检测是否存在、父目录是否存在;
- 返回确认信息;
- 客户端请求NameNode,block1上传到那些DataNode;
- NameNode返回DataNode信息
- 客户端根据返回的DateNode信息,请求其中一台,上传数据(本质是RPC调用,建立pipeline),A到B,B到C根据副本数量依次类推,完成后逐级返回客户端。
- 客户端上传block,以packet为单位,A到B,B到C,A每传一个packet都会放入应答队列,等待应答。
- 当一个block传输完成,客户端上传第二个block。
- 文件读取流程
客户端将要获取的文件路径发送给NameNode,NameNode获取文件的元数据信息(Block存放位置)返回给客户端,客户端通过相应信息找到DataNode,逐个获取文件的block并在客户端本地进行数据追加合并,从而得到完整文件。- 通过NameNode获取元数据信息,找到文件块所在DataNode。
- 挑选一台DataNode(就近原则,然后随机),建立Socket流。
- datanode开始发送数据(从磁盘读取数据放入流,以packet为单位来做校验);
- 客户端以packet为单位接收,先在本地缓存,然后写入目标文件
NameNode机制
- 元数据存储机制
- 内存数据(完整的元数据信息)
- 磁盘元数据镜像文件(“准完整”,在namenode的工作目录)
- 数据操作日志文件(可通过日志文件运算出元数据),用于衔接内存和持久化元数据镜像的操作日志(edits文件),
注:当客户端新增,操作日志首先被记入edits文件,当客户端操作成功,相应的元数据会更新到内存中。
- 元数据手动查看
可以通过hdfs的一个工具来查看edits中的信息
bin/hdfs oev -i edits -o edits.xml
bin/hdfs oiv -i fsimage_0000000000000000087 -p XML -o fsimage.xml
- 元数据的checkpoint
每隔一段时间,会由secondary namenode将namenode上积累的所有edits和一个最新的fsimage下载到本地,并加载到内存进行merage(这个过程称作checkpoint) - checkpoint操作的触发条件配置参数
dfs.namenode.checkpoint.check.period=60 #检查触发条件是否满足的频率,60秒
dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary
#以上两个参数做checkpoint操作时,secondary namenode的本地工作目录
dfs.namenode.checkpoint.edits.dir=${dfs.namenode.checkpoint.dir}
dfs.namenode.checkpoint.max-retries=3 #最大重试次数
dfs.namenode.checkpoint.period=3600 #两次checkpoint之间的时间间隔3600秒
dfs.namenode.checkpoint.txns=1000000 #两次checkpoint之间最大的操作记录
- checkpoint的附带作用
namenode和secondary namenode的工作目录存储结构完全相同,所以,当namenode故障退出需要重新恢复时,可以从secondary namenode的工作目录中将fsimage拷贝到namenode的工作目录,以恢复namenode的元数据
DataNode工作机制
- 工作职责
存储用户的文件块数据
定期向namenode汇报自身所持有的block信息,(通过心跳信息上报) - Datanode掉线判断时限参数
datanode进程死亡或者网络故障造成datanode无法与namenode通信,namenode不会立即把该节点判定为死亡,要经过一段时间,这段时间暂称作超时时长。HDFS默认的超时时长为10分钟+30秒。如果定义超时时间为timeout,则超时时长的计算公式为:
timeout = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval。
而默认的heartbeat.recheck.interval 大小为5分钟,dfs.heartbeat.interval默认为3秒。
需要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的单位为毫秒,dfs.heartbeat.interval的单位为秒。所以,举个例子,如果heartbeat.recheck.interval设置为5000(毫秒),dfs.heartbeat.interval设置为3(秒,默认),则总的超时时间为40秒。
<property>
<name>heartbeat.recheck.interval</name>
<value>2000</value>
</property>
<property>
<name>dfs.heartbeat.interval</name>
<value>1</value>
</property>
Java API
- HDFS客户端依赖
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>2.6.1</version>
</dependency>
注意:org.apache.hadoop.security.AccessControlException: org.apache.hadoop.security .AccessControlException: Permission denied
没有权限访问:core-site.xml添加
```
<property>
<name>hadoop.security.authorization</name>
<value>false</value>
</property>
2. windows上开发注意
1. 解压hadoop安装包;
2. 将安装包下的lib和bin目录用对应windows版本平台编译的本地库替换
3. 在window系统中配置HADOOP_HOME指向你解压的安装包
4. 在windows系统的path变量中加入hadoop的bin目录
3. 代码
```
//创建连接
Configuration conf = new Configuration();
//配置连接服务器
conf.set("fs.defaultFS", "hdfs://hdp-node01:9000");
//副本数量 :参数优先级: 1、客户端代码中设置的值 2、classpath下的用户自定义配置文件 3、然后是服务器的默认配置
conf.set("dfs.replication", "3");
//获取Hdfs客户端
FileSystem fs = FileSystem.get(conf);
// 如果这样去获取,那conf里面就可以不要配"fs.defaultFS"参数,而且,这个客户端的身份标识已经是hadoop用户
FileSystem fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");
@Test
public void testAddFileToHdfs() throws Exception {
// 要上传的文件所在的本地路径
Path src = new Path("g:/redis-recommend.zip");
// 要上传到hdfs的目标路径
Path dst = new Path("/aaa");
fs.copyFromLocalFile(src, dst);
fs.close();
}
/**
* 从hdfs中复制文件到本地文件系统
*
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void testDownloadFileToLocal() throws IllegalArgumentException, IOException {
fs.copyToLocalFile(new Path("/jdk-7u65-linux-i586.tar.gz"), new Path("d:/"));
fs.close();
}
@Test
public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException {
// 创建目录
fs.mkdirs(new Path("/a1/b1/c1"));
// 删除文件夹 ,如果是非空文件夹,参数2必须给值true
fs.delete(new Path("/aaa"), true);
// 重命名文件或文件夹
fs.rename(new Path("/a1"), new Path("/a2"));
}
/**
* 查看目录信息,只显示文件
*
* @throws IOException
* @throws IllegalArgumentException
* @throws FileNotFoundException
*/
@Test
public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException {
// 思考:为什么返回迭代器,而不是List之类的容器
RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
while (listFiles.hasNext()) {
LocatedFileStatus fileStatus = listFiles.next();
System.out.println(fileStatus.getPath().getName());
System.out.println(fileStatus.getBlockSize());
System.out.println(fileStatus.getPermission());
System.out.println(fileStatus.getLen());
BlockLocation[] blockLocations = fileStatus.getBlockLocations();
for (BlockLocation bl : blockLocations) {
System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset());
String[] hosts = bl.getHosts();
for (String host : hosts) {
System.out.println(host);
}
}
System.out.println("--------------为angelababy打印的分割线--------------");
}
}
/**
* 查看文件及文件夹信息
*
* @throws IOException
* @throws IllegalArgumentException
* @throws FileNotFoundException
*/
@Test
public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException {
FileStatus[] listStatus = fs.listStatus(new Path("/"));
String flag = "d-- ";
for (FileStatus fstatus : listStatus) {
if (fstatus.isFile()) flag = "f-- ";
System.out.println(flag + fstatus.getPath().getName());
}
通过流的方式访问hdfs
@Test
public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException{
//先获取一个文件的输入流----针对hdfs上的
FSDataInputStream in = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz"));
//再构造一个文件的输出流----针对本地的
FileOutputStream out = new FileOutputStream(new File("c:/jdk.tar.gz"));
//再将输入流中数据传输到输出流
IOUtils.copyBytes(in, out, 4096);
}
/**
* hdfs支持随机定位进行文件读取,而且可以方便地读取指定长度
* 用于上层分布式运算框架并发处理数据
* @throws IllegalArgumentException
* @throws IOException
*/
@Test
public void testRandomAccess() throws IllegalArgumentException, IOException{
//先获取一个文件的输入流----针对hdfs上的
FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
//可以将流的起始偏移量进行自定义
in.seek(22);
//再构造一个文件的输出流----针对本地的
FileOutputStream out = new FileOutputStream(new File("c:/iloveyou.line.2.txt"));
IOUtils.copyBytes(in,out,19L,true);
}
/**
* 显示hdfs上文件的内容
* @throws IOException
* @throws IllegalArgumentException
*/
@Test
public void testCat() throws IllegalArgumentException, IOException{
FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
IOUtils.copyBytes(in, System.out, 1024);
}
```
网友评论