[TOC]
在契约机制深度剖析那里,我们提到过两个类,DFSOutputStream
和 DataStreamer
这两个类是写数据的核心类。我们先看看注释:
1. DFSOutputStream
注释
* DFSOutputStream creates files from a stream of bytes.
*
* The client application writes data that is cached internally by
* this stream. Data is broken up into packets, each packet is
* typically 64K in size. A packet comprises of chunks. Each chunk
* is typically 512 bytes and has an associated checksum with it.
*
* When a client application fills up the currentPacket, it is
* enqueued into dataQueue. The DataStreamer thread picks up
* packets from the dataQueue, sends it to the first datanode in
* the pipeline and moves it from the dataQueue to the ackQueue.
* The ResponseProcessor receives acks from the datanodes. When an
* successful ack for a packet is received from all datanodes, the
* ResponseProcessor removes the corresponding packet from the
* ackQueue.
*
* In case of error, all outstanding packets and moved from
* ackQueue. A new pipeline is setup by eliminating the bad
* datanode from the original pipeline. The DataStreamer now
* starts sending packets from the dataQueue.
-
block
分解成多个packet
(每个64K),packet
分解成多个chunk
(每个512B),每个chunk
都有个校验和。 -
DFSOutputStream
负责把数据写入dataQueue
,写入单位为packet
。 -
DataStreamer
从dataQueue
中提取packet
,发送到管道中的第一个datanode
,同时将该packet
写入ackQueue
中(大家都知道block是有副本的,在写block之前就已经确定了,这些副本要写到哪些datanode上,这些datanode形成一个数据管道,DataStreamer
只会把数据写入管道的第一个datanode
,然后第一个dataNode
向第二个datanode写数据,第二个再向第三个写,它们之间使用socket传输数据) -
ResponseProcessor
会从datanodes
接收ack
(此ack
是Datanode
接收packet
成功后的确定),ResponseProcessor
接收到所有Datanote
的ack
后,就从ackQueue
中移除相应的packet
。 -
如果遇到异常,所有的
packets
都从ackQueue
移除,并排除异常的Datanode
,再重新申请一个管道 。 -
然后
DataStreamer
又重新从dataQueue
中,获得packets
并发送。
2. DataStreamer
类注释
// The DataStreamer class is responsible for sending data packets to the
// datanodes in the pipeline. It retrieves a new blockid and block locations
// from the namenode, and starts streaming packets to the pipeline of
// Datanodes. Every packet has a sequence number associated with
// it. When all the packets for a block are sent out and acks for each
// if them are received, the DataStreamer closes the current block.
-
DataStreamer
负责发送packet
到管道中的datanode
. -
DataStreamer
从Namenode
中检索新的blockid
和block
位置,并开始将packet
流式传输到datanode
的管道。 - 每个
packet
都有序列号。 - 当一个
block
的所有packet
都被发送出去,并且收到每个packet
的ack
时,DataStreamer
关闭当前block
。
3. 创建packet
根据DFSOutputStream
和DataStream
的注释,我们知道这两个类是HDFS写数据的关键。那么他们在哪里被使用呢?我们直接跟进fos.write("abc".getBytes());
看看。
-
顺着
imagefos.write("abc".getBytes());
一直跟进,就跟到了OutputStream.write(int b)
方法。并且这个还是个抽象方法。一路跟下来并没有发现调用HDFS的什么,那这样跟肯定是不对的。在最初的地方肯定是返回的并不是实际的对象。我们要去找到真实的对象。
image
image
image -
我们在通过
imagefileSystem.create
方法去寻找真实返回的对象类型。当然我们之前讲过fileSystem.create
实际上是调用了DistributedFileSystem.create
。我们直接看DistributedFileSystem.create
就可以了。这里最后就是返回的return dfs.createWrappedOutputStream(dfsos, statistics);
,我们接着跟进去看看。
-
下面都是一行代码,直接过了。
image
image -
可以看到,这里最后返回的其实是
imageHdfsDataOutputStream
类。那么我们就直接看HdfsDataOutputStream.write()
方法吧。
-
结果在
imageHdfsDataOutputStream
里没有找到,那么我们到他的父类里去看看。
-
在
imageHdfsDataOutputStream
的父类FSDataOutputStream
里,还真找到了。里面调用了out.write(b);
,跟进去结果发现又到了OutputStream.write()
。那么我们就想下,是不是out.write()
的out
被赋值为其他子类了呢?
-
仔细看看代码,发现原来这个
imageout
变量是由构造函数赋值的,并且还是调用了父类的构造函数赋值。那我们看看在构建HdfsDataOutputStream
时传入的实际对象是哪个?
-
可以看到其实是
imageDFSOutputStream
。那么我们看看DFSOutputStream.write()
方法吧。
-
结果又是没有找到,那么我们再看看他的父类
imageFSOutputStream
。
-
终于找到了,这里调用了
imageflushBuffer();
。我们在跟进下。
-
imageFSOutputSummer.flushBuffer()
,这里除了writeChecksumChunks(buf, 0, lenToFlush);
之外,其他都是定义变量,或者判断之类的。
image -
imageFSOutputSummer.writeChecksumChunks()
,1
:这里是计算校验和;2
:按照chunk的大小来遍历字节;3
:把每个chunk发送出去;FSOutputSummer.writeChunk()
这个方法是个抽象方法,需要看他的实现DFSOutputStream.writeChunk()
-
imageDFSOutputStream.writeChunk()
,代码writeChunkImpl()
就是写Chunk
的实现。跟进。
-
imageDFSOutputStream.writeChunkImpl()
,这里代码比较多,分开看看
14.1 这块没什么可说的, 都是写检查
14.2 这里是把chunk
写入packet
,有校验和,数据,和计数
image
14.31
:这里就是判断是不是写够一个packet
了;2
:这里就是个debug日志;3
:这里这开始写数据了;
image -
imageDFSOutputStream.waitAndQueueCurrentPacket()
.
15.1 这里没什么可看的。就是有个while循环。就是当dataQueue
+ackQueue
超过配置的大小时,就进行等待。
15.2DFSOutputStream.queueCurrentPacket()
,这个才是我们要找的代码。
image -
imageDFSOutputStream.queueCurrentPacket()
,这里就是把packet添加到dataQueue
队列了,后面还有个notifyAll(),因为前面判断如何dataQueue
满了,会wait。
以上就是创建packet
并把packet
写入 dataQueue
的过程了。这都发生在客户端。以下是个简单的总结
网友评论