hdfs写之写数据<二>

作者: 古语1 | 来源:发表于2019-08-27 16:59 被阅读0次

    一、写数据流程图

    该流程主要是客户端开始写数据,然后把数据切分多个chunk,多个chunk组成一个packet,发送到dequeue中,等待datanode写数据。

    image.png

    二、写数据流程

    1、FSDataOutputStream.writer写方法

    调用父类的FSOutputSummer.write方法

        @Override
        public void write(byte b[], int off, int len) throws IOException {
          //2。 DFSOutputStream extends FSOutputSummer.write
          out.write(b, off, len);
          position += len;                            // update position
          if (statistics != null) {
            statistics.incrementBytesWritten(len);
          }
        }
    

    该方法检查客户端端状态是否正常

     /**
       * Writes <code>len</code> bytes from the specified byte array 
       * starting at offset <code>off</code> and generate a checksum for
       * each data chunk.
       *
       * <p> This method stores bytes from the given array into this
       * stream's buffer before it gets checksumed. The buffer gets checksumed 
       * and flushed to the underlying output stream when all data 
       * in a checksum chunk are in the buffer.  If the buffer is empty and
       * requested length is at least as large as the size of next checksum chunk
       * size, this method will checksum and write the chunk directly 
       * to the underlying output stream.  Thus it avoids uneccessary data copy.
       *
       * @param      b     the data.
       * @param      off   the start offset in the data.
       * @param      len   the number of bytes to write.
       * @exception  IOException  if an I/O error occurs.
       */
      @Override
      public synchronized void write(byte b[], int off, int len)
          throws IOException {
        //3
        checkClosed();
        
        if (off < 0 || len < 0 || off > b.length - len) {
          throw new ArrayIndexOutOfBoundsException();
        }
        //len is block size
        for (int n=0; n<len; n += write1(b, off+n, len-n)) {
        }
      }
    

    该方法最终调用writeChecksumChunks,包括flushBuffer也是调用writeChecksumChunks。写chunk数据到packet中。

    /**
       * Write a portion of an array, flushing to the underlying
       * stream at most once if necessary.
       */
      private int write1(byte b[], int off, int len) throws IOException {
        //buf: internal buffer for storing data before it is checksumed
        //如果buffer为空并且写入数据大于buffer长度(一个校验块chunk大小),直接将数据与校验写入IO中
        if(count==0 && len>=buf.length) {
          // local buffer is empty and user buffer size >= local buffer size, so
          // simply checksum the user buffer and send it directly to the underlying
          // stream
          final int length = buf.length;//4608=512*9=sum.getBytesPerChecksum() * BUFFER_NUM_CHUNKS
          //每次正好写一个校验块chunk大小
          writeChecksumChunks(b, off, length);
          return length;
        }
        // 当数据小于本地数据库chunk时候,先写入buf,当buf写满之后,flushBuffer也执行writeChecksumChunks
        // copy user data to local buffer
        int bytesToCopy = buf.length-count;
        bytesToCopy = (len<bytesToCopy) ? len : bytesToCopy;
        System.arraycopy(b, off, buf, count, bytesToCopy);
        count += bytesToCopy;
        if (count == buf.length) {
          // local buffer is full
          flushBuffer();
        } 
        return bytesToCopy;
      }
    
    

    根据写的数据大小,切分多个chunk分别由writeChunk写。

      /** Generate checksums for the given data chunks and output chunks & checksums
       * to the underlying output stream.
       */
      private void writeChecksumChunks(byte b[], int off, int len)
      throws IOException {
        //len=4068
        sum.calculateChunkedSums(b, off, len, checksum, 0);
        //每次正好写一个校验块chunk大小,len=4608,getBytesPerChecksum=512
        for (int i = 0; i < len; i += sum.getBytesPerChecksum()) {
          //chunkLen=512,和blocksize有什么数学计算关系?
          int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
          int ckOffset = i / sum.getBytesPerChecksum() * getChecksumSize();
          writeChunk(b, off + i, chunkLen, checksum, ckOffset, getChecksumSize());
        }
      }
    

    调用writeChunkImpl处理

      @Override
      protected synchronized void writeChunk(byte[] b, int offset, int len,
          byte[] checksum, int ckoff, int cklen) throws IOException {
        TraceScope scope =
            dfsClient.getPathTraceScope("DFSOutputStream#writeChunk", src);
        try {
          //len=512
          writeChunkImpl(b, offset, len, checksum, ckoff, cklen);
        } finally {
          scope.close();
        }
      }
    

    该方法主要将数据和校验和写入packet中,如果packet写满了chunk或者达到blocksize就会将整个packet发送给dequeue队列中,等待线程DataStreamer 发送,最后发生一个空的packet告诉DataStreamer已经发送完成一个整的packet。

      private synchronized void writeChunkImpl(byte[] b, int offset, int len,
              byte[] checksum, int ckoff, int cklen) throws IOException {
        dfsClient.checkOpen();
        checkClosed();
    
        //      int chunkLen = Math.min(sum.getBytesPerChecksum(), len - i);
        // chunkLen = len
        //写的数据不能大于校验块chunk大小,len=512,bytesPerChecksum=512
        if (len > bytesPerChecksum) {
          throw new IOException("writeChunk() buffer size is " + len +
                                " is larger than supported  bytesPerChecksum " +
                                bytesPerChecksum);
        }
        //实际写的校验和大小和给的值不一致
        if (cklen != 0 && cklen != getChecksumSize()) {
          throw new IOException("writeChunk() checksum size is supposed to be " +
                                getChecksumSize() + " but found to be " + cklen);
        }
        //当前写入的packet包为空则重新创建
        if (currentPacket == null) {
          //DFSPacket maxChunks=chunksPerPacket,第一次chunksPerPacket=126,后续就=1
          currentPacket = createPacket(packetSize, chunksPerPacket, 
              bytesCurBlock, currentSeqno++, false);
          if (DFSClient.LOG.isDebugEnabled()) {
            DFSClient.LOG.debug("DFSClient writeChunk allocating new packet seqno=" + 
                currentPacket.getSeqno() +
                ", src=" + src +
                ", packetSize=" + packetSize +
                ", chunksPerPacket=" + chunksPerPacket +
                ", bytesCurBlock=" + bytesCurBlock);
          }
        }
        //写入校验和
        currentPacket.writeChecksum(checksum, ckoff, cklen);
        //写实际数据
        currentPacket.writeData(b, offset, len);
        //chunk个数加一
        currentPacket.incNumChunks();
        //当前block size累加len(512)
        bytesCurBlock += len;
        if (currentPacket.getNumChunks() == 126) {
          System.out.println("========");
        }
        // If packet is full, enqueue it for transmission
        //如果校验块或者写满了block size 则将packet放到queue中。
        // 会不会有不等于的情况发生?由于incNumChunks是加一操作,所以肯定会有currentPacket.getNumChunks() == currentPacket.getMaxChunks()
        //blockSize=65536
        //如果当前bytesCurBlock大小大于默认的blockSize怎么办?这种情况好像出现不了
        if (currentPacket.getNumChunks() == currentPacket.getMaxChunks() ||
            bytesCurBlock == blockSize) {
          if (DFSClient.LOG.isDebugEnabled()) {
            DFSClient.LOG.debug("DFSClient writeChunk packet full seqno=" +
                currentPacket.getSeqno() +
                ", src=" + src +
                ", bytesCurBlock=" + bytesCurBlock +
                ", blockSize=" + blockSize +
                ", appendChunk=" + appendChunk);
          }
          //将数据currentPacket放到队列dataqueue中,等待线程DataStreamer 发送
          waitAndQueueCurrentPacket();
    
          // If the reopened file did not end at chunk boundary and the above
          // write filled up its partial chunk. Tell the summer to generate full 
          // crc chunks from now on.
          //默认appendChunk false,如果chunk没有写满则appendChunk=true,见DataStreamer构造方法
          if (appendChunk && bytesCurBlock%bytesPerChecksum == 0) {
            appendChunk = false;
            resetChecksumBufSize();
          }
    
          if (!appendChunk) {
            //writePacketSize=dfs.client-write-packet-size默认65536
            int psize = Math.min((int)(blockSize-bytesCurBlock), dfsClient.getConf().writePacketSize);
            System.out.println("psize="+psize);
            //将chunksPerPacket重新赋值,本次计算为1
            computePacketChunkSize(psize, bytesPerChecksum);
          }
          //
          // if encountering a block boundary, send an empty packet to 
          // indicate the end of block and reset bytesCurBlock.
          //当getNumChunks() == currentPacket.getMaxChunks时候bytesCurBlock=64512
          //dataQueue:
          //0 = {DFSPacket@7262} "packet seqno: 0 offsetInBlock: 0 lastPacketInBlock: false lastByteOffsetInBlock: 64512"
          //1 = {DFSPacket@7263} "packet seqno: 1 offsetInBlock: 64512 lastPacketInBlock: false lastByteOffsetInBlock: 65024"
          //2 = {DFSPacket@7188} "packet seqno: 2 offsetInBlock: 65024 lastPacketInBlock: false lastByteOffsetInBlock: 65536"
          //当block正好写满了,发送一个空packet
          if (bytesCurBlock == blockSize) {
            currentPacket = createPacket(0, 0, bytesCurBlock, currentSeqno++, true);
            currentPacket.setSyncBlock(shouldSyncBlock);
            waitAndQueueCurrentPacket();
            //重新赋值
            bytesCurBlock = 0;
            lastFlushOffset = 0;
          }
        }
      }
    

    相关文章

      网友评论

        本文标题:hdfs写之写数据<二>

        本文链接:https://www.haomeiwen.com/subject/hqgeectx.html