美文网首页
Hadoop2-HDFS-read file

Hadoop2-HDFS-read file

作者: raincoffee | 来源:发表于2017-08-31 00:54 被阅读109次

Hadoop读书笔记2-HDFS-read write file

HDFS是一个分布式文件系统,在HDFS上写文件的过程与我们平时使用的单机文件系统非常不同,从宏观上来看,在HDFS文件系统上创建并写一个文件,流程如下图(来自《Hadoop:The Definitive Guide》一书)所示:
(注意L:这是读文件的示意图 。 贴错图了)



具体过程描述如下:

  1. Client调用DistributedFileSystem对象的create方法,创建一个文件输出流(FSDataOutputStream)对象
  2. 通过DistributedFileSystem对象与Hadoop集群的NameNode进行一次RPC远程调用,在HDFS的Namespace中创建一个文件条目(Entry),该条目没有任何的Block
  3. 通过FSDataOutputStream对象,向DataNode写入数据,数据首先被写入FSDataOutputStream对象内部的Buffer中,然后数据被分割成一个个Packet数据包
  4. 以Packet最小单位,基于Socket连接发送到按特定算法选择的HDFS集群中一组DataNode(正常是3个,可能大于等于1)中的一个节点上,在这组DataNode组成的Pipeline上依次传输Packet
  5. 这组DataNode组成的Pipeline反方向上,发送ack,最终由Pipeline中第一个DataNode节点将Pipeline ack发送给Client
  6. 完成向文件写入数据,Client在文件输出流(FSDataOutputStream)对象上调用close方法,关闭流
  7. 调用DistributedFileSystem对象的complete方法,通知NameNode文件写入成功

下面代码使用Hadoop的API来实现向HDFS的文件写入数据,同样也包括创建一个文件和写数据两个主要过程,代码如下所示:

static String[] contents = new String[] {
     "aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa",
     "bbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbbb",
     "cccccccccccccccccccccccccccccccccccccccccccccccccccccccccc",
     "dddddddddddddddddddddddddddddddd",
     "eeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeeee",
};

public static void main(String[] args) {
     String file = "hdfs://h1:8020/data/test/test.log";
   Path path = new Path(file);
   Configuration conf = new Configuration();
   FileSystem fs = null;
   FSDataOutputStream output = null;
   try {
          fs = path.getFileSystem(conf);
          output = fs.create(path); // 创建文件
          for(String line : contents) { // 写入数据
               output.write(line.getBytes("UTF-8"));
               output.flush();
          }
     } catch (IOException e) {
          e.printStackTrace();
     } finally {
          try {
               output.close();
          } catch (IOException e) {
               e.printStackTrace();
          }
     }
}

结合上面的示例代码,我们先从fs.create(path);开始,可以看到FileSystem的实现DistributedFileSystem中给出了最终返回FSDataOutputStream对象的抽象逻辑,代码如下所示:

public FSDataOutputStream create(Path f, FsPermission permission,
  boolean overwrite,
  int bufferSize, short replication, long blockSize,
  Progressable progress) throws IOException {

  statistics.incrementWriteOps(1);
  return new FSDataOutputStream
     (dfs.create(getPathName(f), permission, overwrite, true, replication, blockSize, progress, bufferSize), statistics);
}

下面,我们从DFSOutputStream类开始,说明其内部实现原理。

DFSOutputStream内部原理

打开一个DFSOutputStream流,Client会写数据到流内部的一个缓冲区中,然后数据被分解成多个Packet,每个Packet大小为64k字节,每个Packet又由一组chunk和这组chunk对应的checksum数据组成,默认chunk大小为512字节,每个checksum是对512字节数据计算的校验和数据。
当Client写入的字节流数据达到一个Packet的长度,这个Packet会被构建出来,然后会被放到队列dataQueue中,接着DataStreamer线程会不断地从dataQueue队列中取出Packet,发送到复制Pipeline中的第一个DataNode上,并将该Packet从dataQueue队列中移到ackQueue队列中。ResponseProcessor线程接收从Datanode发送过来的ack,如果是一个成功的ack,表示复制Pipeline中的所有Datanode都已经接收到这个Packet,ResponseProcessor线程将packet从队列ackQueue中删除。
在发送过程中,如果发生错误,所有未完成的Packet都会从ackQueue队列中移除掉,然后重新创建一个新的Pipeline,排除掉出错的那些DataNode节点,接着DataStreamer线程继续从dataQueue队列中发送Packet。
下面是DFSOutputStream的结构及其原理,如图所示:

我们从下面3个方面来描述内部流程:

  • 创建Packet

Client写数据时,会将字节流数据缓存到内部的缓冲区中,当长度满足一个Chunk大小(512B)时,便会创建一个Packet对象,然后向该Packet对象中写Chunk Checksum校验和数据,以及实际数据块Chunk Data,校验和数据是基于实际数据块计算得到的。每次满足一个Chunk大小时,都会向Packet中写上述数据内容,直到达到一个Packet对象大小(64K),就会将该Packet对象放入到dataQueue队列中,等待DataStreamer线程取出并发送到DataNode节点。

  • 发送Packet

DataStreamer线程从dataQueue队列中取出Packet对象,放到ackQueue队列中,然后向DataNode节点发送这个Packet对象所对应的数据。

  • 接收ack

发送一个Packet数据包以后,会有一个用来接收ack的ResponseProcessor线程,如果收到成功的ack,则表示一个Packet发送成功。如果成功,则ResponseProcessor线程会将ackQueue队列中对应的Packet删除。

参考:http://shiyanjun.cn/archives/942.html

相关文章

网友评论

      本文标题:Hadoop2-HDFS-read file

      本文链接:https://www.haomeiwen.com/subject/cuyfjxtx.html