美文网首页
Hadoop之HDFS详解(三)

Hadoop之HDFS详解(三)

作者: RalapHao | 来源:发表于2019-02-28 22:46 被阅读0次

    简介

    1. 分布式文件存储,文件在物理上是分块存储(block),可以通过dfs.blocksizes设定,2.x默认是128M,老版本是64M;
    2. 为客户端提供统一的抽象目录树,客户端通过路径来访问;
    3. 目录结构及分块信息(元数据)由namenode节点承担,namenode是HDFS集群的主节点,维护整个目录结构block块信息以及datanode信息;
    4. 文件的各个block的存储管理由datanode负责,DataNode是HDFS的从节点,每一个block都可以在多个DataNode上存储多个副本;
    5. 一次写入,多次读取,不支持修改;

    注意:HDFS适合做数据分析,并不适合做网盘应用,不便修改,延迟 大,网络开销大,成本太高。

    常用Shell命令

    1. 显示目录信息
       hadoop fs -sl /
    2. 创建目录
      hadoop fs -mkdir -p /dir1/dir2
    3. 本地截切粘贴
       hadoop fs -moveFromLocal  /root/1.txt  /dir1/dir2
    4. 追加文件到已存在文件末尾
      hadoop fs -appendToFile  /root/2.txt /dir1/dir2/1.txt
    5. 查看内容
      hadoop fs -cat  /dir1/dir2/1.txt
      hadoop fs -tail  /dir1/dir2/1.txt
      hadoop fs -text /dir1/dir2/1.txt
    6. 拷贝到HDFS
      hadoop fs -copyFromLocal ./1.txt  /dir1/dir2
      hadoop fs -put ./1.txt  /dir1/dir2
    7. HDFS内部拷贝
      hadoop fs -cp /dir1/dir2/1.txt /dir1/dir2/2.txt
    8. HDFS内部移动
      hadoop fs -mv ./2.txt /
    9. 从HDFS拷贝到本地
      hadoop fs -copyToLocal  /2.txt
      hadoop fs -get  /2.txt
    10. 合并下载多个文件
      hadoop fs -getmerage /dir1/dir2/1.txt /2.txt
    11. 删除文件或文件夹
      hadoop fs -rm -r  /dir1/dir2
      hadoop fs -rmdir /dir1/dir2(删除空文件夹)
    12. 统计文件系统可用空间
      hadoop fs -df -h /
      hadoop fs -du  -h /dir1/dir2
    13. 统计一个指定目录下的文件节点数据量
      hadoop fs -count  /dir1/
    14. 设置HDFS中文件的副本数量
      hadoop fs -setrep 3 /dir1/dir2/1.txt
    

    HDFS 工作机制

    1. 文件写入过程
      客户端向HDFS写数据,首先要与NameNode通信,以确认可以写文件并获取DataNode节点信息,然后客户端根据DateNode信息按顺序上传数据,DataNode将Block副本复制到其他DataNode;
      1. 与NameNode 通信,请求上传数据,NameNode检测是否存在、父目录是否存在;
      2. 返回确认信息;
      3. 客户端请求NameNode,block1上传到那些DataNode;
      4. NameNode返回DataNode信息
      5. 客户端根据返回的DateNode信息,请求其中一台,上传数据(本质是RPC调用,建立pipeline),A到B,B到C根据副本数量依次类推,完成后逐级返回客户端。
      6. 客户端上传block,以packet为单位,A到B,B到C,A每传一个packet都会放入应答队列,等待应答。
      7. 当一个block传输完成,客户端上传第二个block。
    2. 文件读取流程
      客户端将要获取的文件路径发送给NameNode,NameNode获取文件的元数据信息(Block存放位置)返回给客户端,客户端通过相应信息找到DataNode,逐个获取文件的block并在客户端本地进行数据追加合并,从而得到完整文件。
      1. 通过NameNode获取元数据信息,找到文件块所在DataNode。
      2. 挑选一台DataNode(就近原则,然后随机),建立Socket流。
      3. datanode开始发送数据(从磁盘读取数据放入流,以packet为单位来做校验);
      4. 客户端以packet为单位接收,先在本地缓存,然后写入目标文件

    NameNode机制

    1. 元数据存储机制
      1. 内存数据(完整的元数据信息)
      2. 磁盘元数据镜像文件(“准完整”,在namenode的工作目录)
      3. 数据操作日志文件(可通过日志文件运算出元数据),用于衔接内存和持久化元数据镜像的操作日志(edits文件),
        注:当客户端新增,操作日志首先被记入edits文件,当客户端操作成功,相应的元数据会更新到内存中。
    2. 元数据手动查看
      可以通过hdfs的一个工具来查看edits中的信息
    bin/hdfs oev -i edits -o edits.xml
    
    bin/hdfs oiv -i fsimage_0000000000000000087 -p XML -o fsimage.xml
    
    1. 元数据的checkpoint
      每隔一段时间,会由secondary namenode将namenode上积累的所有edits和一个最新的fsimage下载到本地,并加载到内存进行merage(这个过程称作checkpoint)
    2. checkpoint操作的触发条件配置参数
    dfs.namenode.checkpoint.check.period=60  #检查触发条件是否满足的频率,60秒
    dfs.namenode.checkpoint.dir=file://${hadoop.tmp.dir}/dfs/namesecondary
    #以上两个参数做checkpoint操作时,secondary namenode的本地工作目录
    dfs.namenode.checkpoint.edits.dir=${dfs.namenode.checkpoint.dir}
    
    dfs.namenode.checkpoint.max-retries=3  #最大重试次数
    dfs.namenode.checkpoint.period=3600  #两次checkpoint之间的时间间隔3600秒
    dfs.namenode.checkpoint.txns=1000000 #两次checkpoint之间最大的操作记录
    
    
    1. checkpoint的附带作用
      namenode和secondary namenode的工作目录存储结构完全相同,所以,当namenode故障退出需要重新恢复时,可以从secondary namenode的工作目录中将fsimage拷贝到namenode的工作目录,以恢复namenode的元数据

    DataNode工作机制

    1. 工作职责
      存储用户的文件块数据
      定期向namenode汇报自身所持有的block信息,(通过心跳信息上报)
    2. Datanode掉线判断时限参数
      datanode进程死亡或者网络故障造成datanode无法与namenode通信,namenode不会立即把该节点判定为死亡,要经过一段时间,这段时间暂称作超时时长。HDFS默认的超时时长为10分钟+30秒。如果定义超时时间为timeout,则超时时长的计算公式为:
      timeout = 2 * heartbeat.recheck.interval + 10 * dfs.heartbeat.interval。
      而默认的heartbeat.recheck.interval 大小为5分钟,dfs.heartbeat.interval默认为3秒。
      需要注意的是hdfs-site.xml 配置文件中的heartbeat.recheck.interval的单位为毫秒,dfs.heartbeat.interval的单位为秒。所以,举个例子,如果heartbeat.recheck.interval设置为5000(毫秒),dfs.heartbeat.interval设置为3(秒,默认),则总的超时时间为40秒。
     <property>
           <name>heartbeat.recheck.interval</name>
           <value>2000</value>
    </property>
    <property>
           <name>dfs.heartbeat.interval</name>
           <value>1</value>
    </property>
    

    Java API

    1. HDFS客户端依赖
    <dependency>
     <groupId>org.apache.hadoop</groupId>
     <artifactId>hadoop-client</artifactId>
     <version>2.6.1</version>
    </dependency>
    

    注意:org.apache.hadoop.security.AccessControlException: org.apache.hadoop.security .AccessControlException: Permission denied
    没有权限访问:core-site.xml添加

    ```
    <property>
    
      <name>hadoop.security.authorization</name>
    
      <value>false</value>
    
    </property>
    
    2. windows上开发注意
       1. 解压hadoop安装包;
       2. 将安装包下的lib和bin目录用对应windows版本平台编译的本地库替换
       3. 在window系统中配置HADOOP_HOME指向你解压的安装包
       4. 在windows系统的path变量中加入hadoop的bin目录
    3. 代码
     ```
     //创建连接
     Configuration conf = new Configuration();
     //配置连接服务器
     conf.set("fs.defaultFS", "hdfs://hdp-node01:9000");
     //副本数量 :参数优先级: 1、客户端代码中设置的值 2、classpath下的用户自定义配置文件 3、然后是服务器的默认配置
     conf.set("dfs.replication", "3");
     //获取Hdfs客户端
     FileSystem fs = FileSystem.get(conf);
    
    // 如果这样去获取,那conf里面就可以不要配"fs.defaultFS"参数,而且,这个客户端的身份标识已经是hadoop用户
     FileSystem fs = FileSystem.get(new URI("hdfs://hdp-node01:9000"), conf, "hadoop");
    
    @Test
     public void testAddFileToHdfs() throws Exception {
    
         // 要上传的文件所在的本地路径
         Path src = new Path("g:/redis-recommend.zip");
         // 要上传到hdfs的目标路径
         Path dst = new Path("/aaa");
         fs.copyFromLocalFile(src, dst);
         fs.close();
     }
    
     /**
      * 从hdfs中复制文件到本地文件系统
      * 
      * @throws IOException
      * @throws IllegalArgumentException
      */
     @Test
     public void testDownloadFileToLocal() throws IllegalArgumentException, IOException {
         fs.copyToLocalFile(new Path("/jdk-7u65-linux-i586.tar.gz"), new Path("d:/"));
         fs.close();
     }
    
     @Test
     public void testMkdirAndDeleteAndRename() throws IllegalArgumentException, IOException {
    
         // 创建目录
         fs.mkdirs(new Path("/a1/b1/c1"));
    
         // 删除文件夹 ,如果是非空文件夹,参数2必须给值true
         fs.delete(new Path("/aaa"), true);
    
         // 重命名文件或文件夹
         fs.rename(new Path("/a1"), new Path("/a2"));
    
     }
    
     /**
      * 查看目录信息,只显示文件
      * 
      * @throws IOException
      * @throws IllegalArgumentException
      * @throws FileNotFoundException
      */
     @Test
     public void testListFiles() throws FileNotFoundException, IllegalArgumentException, IOException {
    
         // 思考:为什么返回迭代器,而不是List之类的容器
         RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
    
         while (listFiles.hasNext()) {
             LocatedFileStatus fileStatus = listFiles.next();
             System.out.println(fileStatus.getPath().getName());
             System.out.println(fileStatus.getBlockSize());
             System.out.println(fileStatus.getPermission());
             System.out.println(fileStatus.getLen());
             BlockLocation[] blockLocations = fileStatus.getBlockLocations();
             for (BlockLocation bl : blockLocations) {
                 System.out.println("block-length:" + bl.getLength() + "--" + "block-offset:" + bl.getOffset());
                 String[] hosts = bl.getHosts();
                 for (String host : hosts) {
                     System.out.println(host);
                 }
             }
             System.out.println("--------------为angelababy打印的分割线--------------");
         }
     }
    
     /**
      * 查看文件及文件夹信息
      * 
      * @throws IOException
      * @throws IllegalArgumentException
      * @throws FileNotFoundException
      */
     @Test
     public void testListAll() throws FileNotFoundException, IllegalArgumentException, IOException {
    
         FileStatus[] listStatus = fs.listStatus(new Path("/"));
    
         String flag = "d--             ";
         for (FileStatus fstatus : listStatus) {
             if (fstatus.isFile())  flag = "f--         ";
             System.out.println(flag + fstatus.getPath().getName());
         }
    
    通过流的方式访问hdfs
    
    @Test
     public void testDownLoadFileToLocal() throws IllegalArgumentException, IOException{
         
         //先获取一个文件的输入流----针对hdfs上的
         FSDataInputStream in = fs.open(new Path("/jdk-7u65-linux-i586.tar.gz"));
         
         //再构造一个文件的输出流----针对本地的
         FileOutputStream out = new FileOutputStream(new File("c:/jdk.tar.gz"));
         
         //再将输入流中数据传输到输出流
         IOUtils.copyBytes(in, out, 4096);
         
         
     }
     
     
     /**
      * hdfs支持随机定位进行文件读取,而且可以方便地读取指定长度
      * 用于上层分布式运算框架并发处理数据
      * @throws IllegalArgumentException
      * @throws IOException
      */
     @Test
     public void testRandomAccess() throws IllegalArgumentException, IOException{
         //先获取一个文件的输入流----针对hdfs上的
         FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
         
         
         //可以将流的起始偏移量进行自定义
         in.seek(22);
         
         //再构造一个文件的输出流----针对本地的
         FileOutputStream out = new FileOutputStream(new File("c:/iloveyou.line.2.txt"));
         
         IOUtils.copyBytes(in,out,19L,true);
         
     }
     
     
     
     /**
      * 显示hdfs上文件的内容
      * @throws IOException 
      * @throws IllegalArgumentException 
      */
     @Test
     public void testCat() throws IllegalArgumentException, IOException{
         
         FSDataInputStream in = fs.open(new Path("/iloveyou.txt"));
         
         IOUtils.copyBytes(in, System.out, 1024);
     }
    
    
     ```

    相关文章

      网友评论

          本文标题:Hadoop之HDFS详解(三)

          本文链接:https://www.haomeiwen.com/subject/rhixuqtx.html