hdfs读之read读取解析<二>

作者: 古语1 | 来源:发表于2019-08-20 17:01 被阅读27次

    一、读取block数据方法流转

    image.png

    传输数据是通过scoket建立的,Sender和Receiver,这两个方法都继承了父类DataTransferProtocol。

    二、读取方法

    1、读取操作DFSInputStream.read():
      /**
       * Read the entire buffer.
       */
      @Override
      public synchronized int read(final byte buf[], int off, int len) throws IOException {
        ReaderStrategy byteArrayReader = new ByteArrayStrategy(buf);
        TraceScope scope =
            dfsClient.getPathTraceScope("DFSInputStream#byteArrayRead", src);
        try {
          //根据ByteArrayStrategy的策略读取
          return readWithStrategy(byteArrayReader, off, len);
        } finally {
          scope.close();
        }
      }
    
    2、ByteArrayStrategy的策略读取DFSInputStream.readWithStrategy()

    该方法选择最优的datanode并且读取内容,默认允许失败读取三次。失败原因可能是没有合适的datanode,比如datanode都挂了;还有block数据丢失了,读取不到。

    private synchronized int readWithStrategy(ReaderStrategy strategy, int off, int len) throws IOException {
        dfsClient.checkOpen();
        if (closed.get()) {
          throw new IOException("Stream closed");
        }
        Map<ExtendedBlock,Set<DatanodeInfo>> corruptedBlockMap 
          = new HashMap<ExtendedBlock, Set<DatanodeInfo>>();
        failures = 0;
        if (pos < getFileLength()) {
          int retries = 2;
          while (retries > 0) {
            try {
              // currentNode can be left as null if previous read had a checksum
              // error on the same block. See HDFS-3067
              if (pos > blockEnd || currentNode == null) {
                // 根据当前position所在的block位置选择最优的datanode节点读取,并且获取blockReader是本地读取还是远程读取方式
                //1.获取读取数据的datanode,和读取策略(远程或者本地)
                currentNode = blockSeekTo(pos);
              }
              int realLen = (int) Math.min(len, (blockEnd - pos + 1L));
              synchronized(infoLock) {
                if (locatedBlocks.isLastBlockComplete()) {
                  realLen = (int) Math.min(realLen,
                      locatedBlocks.getFileLength() - pos);
                }
              }
              //2.从datanode节点上读取数据
              int result = readBuffer(strategy, off, realLen, corruptedBlockMap);
              
              if (result >= 0) {
                pos += result;
              } else {
                // got a EOS from reader though we expect more data on it.
                throw new IOException("Unexpected EOS from the reader");
              }
              if (dfsClient.stats != null) {
                dfsClient.stats.incrementBytesRead(result);
              }
              return result;
            } catch (ChecksumException ce) {
              throw ce;            
            } catch (IOException e) {
              if (retries == 1) {
                DFSClient.LOG.warn("DFS Read", e);
              }
              blockEnd = -1;
              if (currentNode != null) { addToDeadNodes(currentNode); }
              if (--retries == 0) {
                throw e;
              }
            } finally {
              // Check if need to report block replicas corruption either read
              // was successful or ChecksumException occured.
              reportCheckSumFailure(corruptedBlockMap, 
                  currentLocatedBlock.getLocations().length);
            }
          }
        }
        return -1;
      }
    

    三、选择最优的datanode节点

    1、DFSInputStream.blockSeekTo()

    该方法判断选择读取方式是远程读取还是本地读取。从nanenode读取的block,然后根据这个block获取最优的datanode节点读取。方法newBlockReader在获取blockReader时候可能出现IO异常,while循环可能在fetchBlockAt(target)抛出异常终止。
    blockReader构建的时候如果是远程读取会获取一个RemoteBlockReader2对象,该对象主要作用是客户端和datanode建立一个socket连接,主要返回两个结果:一是、是否可以正常读取datanode数据,二是、读取datanode正常的block数据,在客户端的readNextPacket方法接处理danode返回数据。后面会单独讲解Sender和DataXceiver处理socket请求。

    /**
       * Open a DataInputStream to a DataNode so that it can be read from.
       * We get block ID and the IDs of the destinations at startup, from the namenode.
       */
      private synchronized DatanodeInfo blockSeekTo(long target) throws IOException {
        if (target >= getFileLength()) {
          throw new IOException("Attempted to read past end of file");
        }
    
        // Will be getting a new BlockReader.
        closeCurrentBlockReader();
    
        //
        // Connect to best DataNode for desired Block, with potential offset
        //
        DatanodeInfo chosenNode = null;
        int refetchToken = 1; // only need to get a new access token once
        int refetchEncryptionKey = 1; // only need to get a new encryption key once
        
        boolean connectFailedOnce = false;
    
        while (true) {
          //
          // Compute desired block
          //1. 从locatedBlocks缓存中获取block,缓存集合中没有就rpc到namenode上读取
          LocatedBlock targetBlock = getBlockAt(target);
    
          // update current position
          this.pos = target;
          this.blockEnd = targetBlock.getStartOffset() +
                targetBlock.getBlockSize() - 1;
          this.currentLocatedBlock = targetBlock;
    
          assert (target==pos) : "Wrong postion " + pos + " expect " + target;
          long offsetIntoBlock = target - targetBlock.getStartOffset();
          // 2. 选择最优的datanode读取block
          DNAddrPair retval = chooseDataNode(targetBlock, null);
          chosenNode = retval.info;
          InetSocketAddress targetAddr = retval.addr;
          StorageType storageType = retval.storageType;
    
          try {
            ExtendedBlock blk = targetBlock.getBlock();
            Token<BlockTokenIdentifier> accessToken = targetBlock.getBlockToken();
            CachingStrategy curCachingStrategy;
            boolean shortCircuitForbidden;
            synchronized(infoLock) {
              curCachingStrategy = cachingStrategy;
              shortCircuitForbidden = shortCircuitForbidden();
            }
            //选择读取方式,如果是本地读取就读文件或者走TCP方式读取,RemoteBlockReader2 or BlockReaderLocal
            blockReader = new BlockReaderFactory(dfsClient.getConf()).
                setInetSocketAddress(targetAddr).
                setRemotePeerFactory(dfsClient).
                setDatanodeInfo(chosenNode).//最优的datanode
                setStorageType(storageType).
                setFileName(src).
                setBlock(blk).
                setBlockToken(accessToken).
                setStartOffset(offsetIntoBlock).
                setVerifyChecksum(verifyChecksum).
                setClientName(dfsClient.clientName).
                setLength(blk.getNumBytes() - offsetIntoBlock).
                setCachingStrategy(curCachingStrategy).
                setAllowShortCircuitLocalReads(!shortCircuitForbidden).
                setClientCacheContext(dfsClient.getClientContext()).
                setUserGroupInformation(dfsClient.ugi).
                setConfiguration(dfsClient.getConfiguration()).
                build();
            if(connectFailedOnce) {
              DFSClient.LOG.info("Successfully connected to " + targetAddr +
                                 " for " + blk);
            }
            return chosenNode;
          } catch (IOException ex) {
            if (ex instanceof InvalidEncryptionKeyException && refetchEncryptionKey > 0) {
              DFSClient.LOG.info("Will fetch a new encryption key and retry, " 
                  + "encryption key was invalid when connecting to " + targetAddr
                  + " : " + ex);
              // The encryption key used is invalid.
              refetchEncryptionKey--;
              dfsClient.clearDataEncryptionKey();
            } else if (refetchToken > 0 && tokenRefetchNeeded(ex, targetAddr)) {
              refetchToken--;
              fetchBlockAt(target);
            } else {
              connectFailedOnce = true;
              DFSClient.LOG.warn("Failed to connect to " + targetAddr + " for block"
                + ", add to deadNodes and continue. " + ex, ex);
              // Put chosen node into dead list, continue
              addToDeadNodes(chosenNode);
            }
          }
        }
      }
    
    
    2、获取读取的block DFSInputStream.getBlockAt():

    从locatedBlocks缓存中获取block,缓存集合中没有就rpc到namenode上读取

    /**
       * Get block at the specified position.
       * Fetch it from the namenode if not cached.
       * 
       * @param offset block corresponding to this offset in file is returned
       * @return located block
       * @throws IOException
       */
      private LocatedBlock getBlockAt(long offset) throws IOException {
        synchronized(infoLock) {
          assert (locatedBlocks != null) : "locatedBlocks is null";
    
          final LocatedBlock blk;
    
          //check offset
          if (offset < 0 || offset >= getFileLength()) {
            throw new IOException("offset < 0 || offset >= getFileLength(), offset="
                + offset
                + ", locatedBlocks=" + locatedBlocks);
          }
          else if (offset >= locatedBlocks.getFileLength()) {
            // offset to the portion of the last block,
            // which is not known to the name-node yet;
            // getting the last block
            blk = locatedBlocks.getLastLocatedBlock();
          }
          else {
            // search cached blocks first
            //从缓存去查询
            int targetBlockIdx = locatedBlocks.findBlock(offset);
            if (targetBlockIdx < 0) { // block is not cached
              targetBlockIdx = LocatedBlocks.getInsertIndex(targetBlockIdx);
              // fetch more blocks,缓存中没有找到从namenode获取
              final LocatedBlocks newBlocks = dfsClient.getLocatedBlocks(src, offset);
              assert (newBlocks != null) : "Could not find target position " + offset;
              //重新插入到block集合中
              locatedBlocks.insertRange(targetBlockIdx, newBlocks.getLocatedBlocks());
            }
            blk = locatedBlocks.get(targetBlockIdx);
          }
          return blk;
        }
      }
    
    
    3、选择最优的datanode节点 DFSInputStream.chooseDataNode()

    如果读取失败次数超过阈值(默认3次),就会抛异常,该参数是通过dfs.client.max.block.acquire.failures修改。

    private DNAddrPair chooseDataNode(LocatedBlock block,
          Collection<DatanodeInfo> ignoredNodes) throws IOException {
        while (true) {
          try {
            return getBestNodeDNAddrPair(block, ignoredNodes);
          } catch (IOException ie) {
            String errMsg = getBestNodeDNAddrPairErrorString(block.getLocations(),
              deadNodes, ignoredNodes);
            String blockInfo = block.getBlock() + " file=" + src;
            if (failures >= dfsClient.getMaxBlockAcquireFailures()) {
              String description = "Could not obtain block: " + blockInfo;
              DFSClient.LOG.warn(description + errMsg
                  + ". Throwing a BlockMissingException");
              throw new BlockMissingException(src, description,
                  block.getStartOffset());
            }
    
            DatanodeInfo[] nodes = block.getLocations();
            if (nodes == null || nodes.length == 0) {
              DFSClient.LOG.info("No node available for " + blockInfo);
            }
            DFSClient.LOG.info("Could not obtain " + block.getBlock()
                + " from any node: " + ie + errMsg
                + ". Will get new block locations from namenode and retry...");
            try {
              // Introducing a random factor to the wait time before another retry.
              // The wait time is dependent on # of failures and a random factor.
              // At the first time of getting a BlockMissingException, the wait time
              // is a random number between 0..3000 ms. If the first retry
              // still fails, we will wait 3000 ms grace period before the 2nd retry.
              // Also at the second retry, the waiting window is expanded to 6000 ms
              // alleviating the request rate from the server. Similarly the 3rd retry
              // will wait 6000ms grace period before retry and the waiting window is
              // expanded to 9000ms. 
              final int timeWindow = dfsClient.getConf().timeWindow;
              double waitTime = timeWindow * failures +       // grace period for the last round of attempt
                timeWindow * (failures + 1) * DFSUtil.getRandom().nextDouble(); // expanding time window for each failure
              DFSClient.LOG.warn("DFS chooseDataNode: got # " + (failures + 1) + " IOException, will wait for " + waitTime + " msec.");
              Thread.sleep((long)waitTime);
            } catch (InterruptedException iex) {
            }
            deadNodes.clear(); //2nd option is to remove only nodes[blockId]
            openInfo();
            block = getBlockAt(block.getStartOffset());
            failures++;
            continue;
          }
        }
      }
    
    
    4、选择最优的节点DFSInputStream.getBestNodeDNAddrPair():
    
    /**
       * Get the best node from which to stream the data.
       * @param block LocatedBlock, containing nodes in priority order.
       * @param ignoredNodes Do not choose nodes in this array (may be null)
       * @return The DNAddrPair of the best node.
       * @throws IOException
       */
      private DNAddrPair getBestNodeDNAddrPair(LocatedBlock block,
          Collection<DatanodeInfo> ignoredNodes) throws IOException {
        //nodes已经是最优排序后的datanode集合
        DatanodeInfo[] nodes = block.getLocations();
        StorageType[] storageTypes = block.getStorageTypes();
        DatanodeInfo chosenNode = null;
        StorageType storageType = null;
        if (nodes != null) {
          for (int i = 0; i < nodes.length; i++) {
            if (!deadNodes.containsKey(nodes[i])
                && (ignoredNodes == null || !ignoredNodes.contains(nodes[i]))) {
              chosenNode = nodes[i];
              // Storage types are ordered to correspond with nodes, so use the same
              // index to get storage type.
              if (storageTypes != null && i < storageTypes.length) {
                storageType = storageTypes[i];
              }
              break;
            }
          }
        }
        if (chosenNode == null) {
          throw new IOException("No live nodes contain block " + block.getBlock() +
              " after checking nodes = " + Arrays.toString(nodes) +
              ", ignoredNodes = " + ignoredNodes);
        }
        final String dnAddr =
            chosenNode.getXferAddr(dfsClient.getConf().connectToDnViaHostname);
        if (DFSClient.LOG.isDebugEnabled()) {
          DFSClient.LOG.debug("Connecting to datanode " + dnAddr);
        }
        InetSocketAddress targetAddr = NetUtils.createSocketAddr(dnAddr);
        return new DNAddrPair(chosenNode, targetAddr, storageType);
      }
    
    

    四、选择读取数据策略

    1、读取数据 DFSInputStream.readBuffer():

    读取block时候,如果Checksum校验失败,会一直选择合适的datanode节点读取,直到读取成功或者遍历所有datanode都读取失败抛出ChecksumException终止读取数据;如果读取出现IO异常,也是不断的遍历正常的datanode,直到成功或者抛出IOException终止读取。

    /* This is a used by regular read() and handles ChecksumExceptions.
       * name readBuffer() is chosen to imply similarity to readBuffer() in
       * ChecksumFileSystem
       */ 
        private synchronized int readBuffer(ReaderStrategy reader, int off, int len,
          Map<ExtendedBlock, Set<DatanodeInfo>> corruptedBlockMap)
          throws IOException {
        IOException ioe;
        
        /* we retry current node only once. So this is set to true only here.
         * Intention is to handle one common case of an error that is not a
         * failure on datanode or client : when DataNode closes the connection
         * since client is idle. If there are other cases of "non-errors" then
         * then a datanode might be retried by setting this to true again.
         */
        boolean retryCurrentNode = true;
    
        while (true) {
          // retry as many times as seekToNewSource allows.
          try {
            //reader策略是ByteArrayStrategy,开始读取数据。blockReader是远程读RemoteBlockReader2还是本地读 BlockReaderLocal
            return reader.doRead(blockReader, off, len);
          } catch ( ChecksumException ce ) {
            DFSClient.LOG.warn("Found Checksum error for "
                + getCurrentBlock() + " from " + currentNode
                + " at " + ce.getPos());        
            ioe = ce;
            retryCurrentNode = false;
            // we want to remember which block replicas we have tried
            addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
                corruptedBlockMap);
          } catch ( IOException e ) {
            if (!retryCurrentNode) {
              DFSClient.LOG.warn("Exception while reading from "
                  + getCurrentBlock() + " of " + src + " from "
                  + currentNode, e);
            }
            ioe = e;
          }
          boolean sourceFound = false;
          if (retryCurrentNode) {
            /* possibly retry the same node so that transient errors don't
             * result in application level failures (e.g. Datanode could have
             * closed the connection because the client is idle for too long).
             */ 
            sourceFound = seekToBlockSource(pos);
          } else {
            //如果读取失败,则将当前datanode添加到失败的节点deadNodes Map集合中
            addToDeadNodes(currentNode);
            //重新找个正常的datanode读取
            sourceFound = seekToNewSource(pos);
          }
          //没有找到position所在的datanode就抛异常
          if (!sourceFound) {
            throw ioe;
          }
          retryCurrentNode = false;
        }
      }
    
    2、采用ByteArrayStrategy读取ByteArrayStrategy.doRead()
     @Override
        public int doRead(BlockReader blockReader, int off, int len)
              throws ChecksumException, IOException {
          // BlockReaderLocal本地读写,
          //RemoteBlockReader2 远程读写
          int nRead = blockReader.read(buf, off, len);
          updateReadStatistics(readStatistics, nRead, blockReader);
          return nRead;
        }
    

    五、远程读取方式

    1、以packet单位读取 RemoteBlockReader2.read()
    @Override
      public synchronized int read(byte[] buf, int off, int len) 
                                   throws IOException {
    
        UUID randomId = null;
        if (LOG.isTraceEnabled()) {
          randomId = UUID.randomUUID();
          LOG.trace(String.format("Starting read #%s file %s from datanode %s",
            randomId.toString(), this.filename,
            this.datanodeID.getHostName()));
        }
        // 所有缓冲区都有4个属性:capacity、limit、position、mark,并遵循:mark <= position <= limit <= capacity
        //remaining(): return limit - position; 返回limit和position之间相对位置差
        //curDataSlice为空或者curDataSlice已经读取完了,读取下一个packet
        if (curDataSlice == null || curDataSlice.remaining() == 0 && bytesNeededToFinish > 0) {
          TraceScope scope = Trace.startSpan(
              "RemoteBlockReader2#readNextPacket(" + blockId + ")", Sampler.NEVER);
          try {
            //以Packet单位读取,每个packet由若干个chunk组成
            readNextPacket();
          } finally {
            scope.close();
          }
        }
    
        if (LOG.isTraceEnabled()) {
          LOG.trace(String.format("Finishing read #" + randomId));
        }
    
        if (curDataSlice.remaining() == 0) {
          // we're at EOF now
          return -1;
        }
        //去最小值读取
        int nRead = Math.min(curDataSlice.remaining(), len);
        //从curDataSlice的position位置开始相对读,读nRead个byte,并写入buf下标从off到off+nRead的区域
        curDataSlice.get(buf, off, nRead);
        
        return nRead;
      }
    
    
    2、以Packet单位读取RemoteBlockReader2.readNextPacket()
    private void readNextPacket() throws IOException {
        //Read packet headers.
        packetReceiver.receiveNextPacket(in);
    
        PacketHeader curHeader = packetReceiver.getHeader();
        curDataSlice = packetReceiver.getDataSlice();
        assert curDataSlice.capacity() == curHeader.getDataLen();
        
        if (LOG.isTraceEnabled()) {
          LOG.trace("DFSClient readNextPacket got header " + curHeader);
        }
    
        // Sanity check the lengths
        if (!curHeader.sanityCheck(lastSeqNo)) {
             throw new IOException("BlockReader: error in packet header " +
                                   curHeader);
        }
        
        if (curHeader.getDataLen() > 0) {
          //计算一个packet由多少个chunks
          int chunks = 1 + (curHeader.getDataLen() - 1) / bytesPerChecksum;
          int checksumsLen = chunks * checksumSize;
    
          assert packetReceiver.getChecksumSlice().capacity() == checksumsLen :
            "checksum slice capacity=" + packetReceiver.getChecksumSlice().capacity() + 
              " checksumsLen=" + checksumsLen;
          
          lastSeqNo = curHeader.getSeqno();
          if (verifyChecksum && curDataSlice.remaining() > 0) {
            // N.B.: the checksum error offset reported here is actually
            // relative to the start of the block, not the start of the file.
            // This is slightly misleading, but preserves the behavior from
            // the older BlockReader.
            //校验checksum
            checksum.verifyChunkedSums(curDataSlice,
                packetReceiver.getChecksumSlice(),
                filename, curHeader.getOffsetInBlock());
          }
          bytesNeededToFinish -= curHeader.getDataLen();
        }    
        
        // First packet will include some data prior to the first byte
        // the user requested. Skip it.
        if (curHeader.getOffsetInBlock() < startOffset) {
          int newPos = (int) (startOffset - curHeader.getOffsetInBlock());
          curDataSlice.position(newPos);
        }
    
        // If we've now satisfied the whole client read, read one last packet
        // header, which should be empty
        if (bytesNeededToFinish <= 0) {
          readTrailingEmptyPacket();
          //告诉客户的结果
          if (verifyChecksum) {
            sendReadResult(Status.CHECKSUM_OK);
          } else {
            sendReadResult(Status.SUCCESS);
          }
        }
      }
    

    六、本地读取方式

    1、BlockReaderLocal.read()
    @Override
      public synchronized int read(ByteBuffer buf) throws IOException {
        boolean canSkipChecksum = createNoChecksumContext();
        try {
          String traceString = null;
          if (LOG.isTraceEnabled()) {
            traceString = new StringBuilder().
                append("read(").
                append("buf.remaining=").append(buf.remaining()).
                append(", block=").append(block).
                append(", filename=").append(filename).
                append(", canSkipChecksum=").append(canSkipChecksum).
                append(")").toString();
            LOG.info(traceString + ": starting");
          }
          int nRead;
          try {
            if (canSkipChecksum && zeroReadaheadRequested) {
              //不校验读取
              nRead = readWithoutBounceBuffer(buf);
            } else {
              //校验读取
              nRead = readWithBounceBuffer(buf, canSkipChecksum);
            }
          } catch (IOException e) {
            if (LOG.isTraceEnabled()) {
              LOG.info(traceString + ": I/O error", e);
            }
            throw e;
          }
          if (LOG.isTraceEnabled()) {
            LOG.info(traceString + ": returning " + nRead);
          }
          return nRead;
        } finally {
          if (canSkipChecksum) releaseNoChecksumContext();
        }
      }
    

    相关文章

      网友评论

        本文标题:hdfs读之read读取解析<二>

        本文链接:https://www.haomeiwen.com/subject/lvnpsctx.html