美文网首页
Hadoop源码分析-HDFS写数据之创建packet

Hadoop源码分析-HDFS写数据之创建packet

作者: 晨磊的微博 | 来源:发表于2021-01-21 19:13 被阅读0次

    [TOC]
    契约机制深度剖析那里,我们提到过两个类,DFSOutputStreamDataStreamer 这两个类是写数据的核心类。我们先看看注释:

    1. DFSOutputStream注释

     * DFSOutputStream creates files from a stream of bytes.
     *
     * The client application writes data that is cached internally by
     * this stream. Data is broken up into packets, each packet is
     * typically 64K in size. A packet comprises of chunks. Each chunk
     * is typically 512 bytes and has an associated checksum with it.
     *
     * When a client application fills up the currentPacket, it is
     * enqueued into dataQueue.  The DataStreamer thread picks up
     * packets from the dataQueue, sends it to the first datanode in
     * the pipeline and moves it from the dataQueue to the ackQueue.
     * The ResponseProcessor receives acks from the datanodes. When an
     * successful ack for a packet is received from all datanodes, the
     * ResponseProcessor removes the corresponding packet from the
     * ackQueue.
     *
     * In case of error, all outstanding packets and moved from
     * ackQueue. A new pipeline is setup by eliminating the bad
     * datanode from the original pipeline. The DataStreamer now
     * starts sending packets from the dataQueue.
    
    • block 分解成多个 packet (每个64K),packet分解成多个chunk(每个512B),每个chunk都有个校验和。

    • DFSOutputStream负责把数据写入dataQueue,写入单位为packet

    • DataStreamerdataQueue中提取packet,发送到管道中的第一个datanode,同时将该packet写入 ackQueue中(大家都知道block是有副本的,在写block之前就已经确定了,这些副本要写到哪些datanode上,这些datanode形成一个数据管道,DataStreamer只会把数据写入管道的第一个datanode,然后第一个dataNode向第二个datanode写数据,第二个再向第三个写,它们之间使用socket传输数据)

    • ResponseProcessor会从datanodes接收ack(此ackDatanode接收packet成功后的确定),ResponseProcessor接收到所有Datanoteack后,就从ackQueue中移除相应的packet

    • 如果遇到异常,所有的packets都从ackQueue移除,并排除异常的Datanode,再重新申请一个管道 。

    • 然后 DataStreamer又重新从dataQueue中,获得packets并发送。

    2. DataStreamer 类注释

      // The DataStreamer class is responsible for sending data packets to the
      // datanodes in the pipeline. It retrieves a new blockid and block locations
      // from the namenode, and starts streaming packets to the pipeline of
      // Datanodes. Every packet has a sequence number associated with
      // it. When all the packets for a block are sent out and acks for each
      // if them are received, the DataStreamer closes the current block.
    
    • DataStreamer 负责发送packet到管道中的datanode.
    • DataStreamerNamenode中检索新的blockidblock位置,并开始将packet流式传输到datanode的管道。
    • 每个packet都有序列号。
    • 当一个block的所有packet都被发送出去,并且收到每个packetack时,DataStreamer关闭当前block

    3. 创建packet

    根据DFSOutputStreamDataStream的注释,我们知道这两个类是HDFS写数据的关键。那么他们在哪里被使用呢?我们直接跟进fos.write("abc".getBytes());看看。

    1. 顺着fos.write("abc".getBytes());一直跟进,就跟到了OutputStream.write(int b)方法。并且这个还是个抽象方法。一路跟下来并没有发现调用HDFS的什么,那这样跟肯定是不对的。在最初的地方肯定是返回的并不是实际的对象。我们要去找到真实的对象。

      image
      image
      image
      image
    2. 我们在通过fileSystem.create方法去寻找真实返回的对象类型。当然我们之前讲过fileSystem.create实际上是调用了DistributedFileSystem.create。我们直接看DistributedFileSystem.create就可以了。这里最后就是返回的return dfs.createWrappedOutputStream(dfsos, statistics);,我们接着跟进去看看。

      image
    3. 下面都是一行代码,直接过了。


      image
      image
    4. 可以看到,这里最后返回的其实是HdfsDataOutputStream类。那么我们就直接看HdfsDataOutputStream.write()方法吧。

      image
    5. 结果在 HdfsDataOutputStream里没有找到,那么我们到他的父类里去看看。

      image
    6. HdfsDataOutputStream的父类FSDataOutputStream里,还真找到了。里面调用了out.write(b);,跟进去结果发现又到了OutputStream.write()。那么我们就想下,是不是out.write()out被赋值为其他子类了呢?

      image
    7. 仔细看看代码,发现原来这个out变量是由构造函数赋值的,并且还是调用了父类的构造函数赋值。那我们看看在构建 HdfsDataOutputStream时传入的实际对象是哪个?

      image
    8. 可以看到其实是DFSOutputStream。那么我们看看DFSOutputStream.write()方法吧。

      image
    9. 结果又是没有找到,那么我们再看看他的父类FSOutputStream

      image
    10. 终于找到了,这里调用了flushBuffer();。我们在跟进下。

      image
    11. FSOutputSummer.flushBuffer(),这里除了writeChecksumChunks(buf, 0, lenToFlush);之外,其他都是定义变量,或者判断之类的。

      image
      image
    12. FSOutputSummer.writeChecksumChunks()1:这里是计算校验和;2:按照chunk的大小来遍历字节;3:把每个chunk发送出去;FSOutputSummer.writeChunk()这个方法是个抽象方法,需要看他的实现DFSOutputStream.writeChunk()

      image
    13. DFSOutputStream.writeChunk(),代码writeChunkImpl()就是写Chunk的实现。跟进。

      image
    14. DFSOutputStream.writeChunkImpl(),这里代码比较多,分开看看
      14.1 这块没什么可说的, 都是写检查

      image
      14.2 这里是把 chunk 写入 packet,有校验和,数据,和计数
      image
      14.3 1:这里就是判断是不是写够一个packet了;2:这里就是个debug日志;3:这里这开始写数据了;
      image
    15. DFSOutputStream.waitAndQueueCurrentPacket().
      15.1 这里没什么可看的。就是有个while循环。就是当dataQueue+ackQueue超过配置的大小时,就进行等待。

      image
      15.2 DFSOutputStream.queueCurrentPacket(),这个才是我们要找的代码。
      image
    16. DFSOutputStream.queueCurrentPacket(),这里就是把packet添加到dataQueue队列了,后面还有个notifyAll(),因为前面判断如何dataQueue满了,会wait。

      image

    以上就是创建packet并把packet写入 dataQueue 的过程了。这都发生在客户端。以下是个简单的总结

    image

    相关文章

      网友评论

          本文标题:Hadoop源码分析-HDFS写数据之创建packet

          本文链接:https://www.haomeiwen.com/subject/iqvdzktx.html