hdfs读之block读取解析<一>

作者: 古语1 | 来源:发表于2019-08-15 10:52 被阅读37次

    一、hdfs读取流程

    • 先获取文件流
      FSDataInputStream fsIn = FileSystem.open("path")
    • 然后读取文件内容
      fsIn.read(buf, off, toRead)
    • 流程图如下


      image.png

    二、hdfs客户端打开文件流过程

    1. 打开文件流FileSystem.open
      /**
       * Opens an FSDataInputStream at the indicated Path.
       * @param f the file to open
       */
      public FSDataInputStream open(Path f) throws IOException {
        //该方法被DistributedFileSystem.open实现了
        return open(f, getConf().getInt("io.file.buffer.size", 4096));
      }
    
    2.继承了FileSystem的 DistributedFileSystem.open
      @Override
      public FSDataInputStream open(Path f, final int bufferSize)
          throws IOException {
        statistics.incrementReadOps(1);
        Path absF = fixRelativePart(f);
        return new FileSystemLinkResolver<FSDataInputStream>() {
          @Override
          public FSDataInputStream doCall(final Path p) throws IOException, UnresolvedLinkException {
              //该方法调用DFSClient的open
            final DFSInputStream dfsis =
              dfs.open(getPathName(p), bufferSize, verifyChecksum);
            return dfs.createWrappedInputStream(dfsis);
          }
          @Override
          public FSDataInputStream next(final FileSystem fs, final Path p)
              throws IOException {
            return fs.open(p, bufferSize);
          }
        }.resolve(this, absF);
      }
    
    
    3. DFSClient.open如下
      /**
       * Create an input stream that obtains a nodelist from the
       * namenode, and then reads from all the right places.  Creates
       * inner subclass of InputStream that does the right out-of-band
       * work.
       */
      public DFSInputStream open(String src, int buffersize, boolean verifyChecksum)
          throws IOException, UnresolvedLinkException {
        //检查客户端读取文件是否关闭
        checkOpen();
        //    Get block info from namenode
        TraceScope scope = getPathTraceScope("newDFSInputStream", src);
        try {
          //返回构造方法,下面到这个构造方法看看逻辑
          return new DFSInputStream(this, src, verifyChecksum);
        } finally {
          scope.close();
        }
      }
    
    4. DFSInputStream 构造方法
     DFSInputStream(DFSClient dfsClient, String src, boolean verifyChecksum
                     ) throws IOException, UnresolvedLinkException {
        this.dfsClient = dfsClient;
        this.verifyChecksum = verifyChecksum;
        this.src = src;
        synchronized (infoLock) {
          /**
           * 读缓存策略,readDropBehind和readahead两个参数控制读缓存策略,数据读取通常为磁盘操作,每次read将会读取一页数据(512b或者更大),这些数据加载到内存并传输给Client。
           * readDropBehind表示读后即弃,即数据读取后立即丢弃cache数据,这可以在多用户并发文件读取时有效节约内存,不过会导致更频繁的磁盘操作,
           * 如果关闭此特性,read操作后数据会被cache在内存,对于同一个文件的多次读取可以有效的提升性能,但会消耗更多内存。readahead为预读,
           * 如果开启,那么Datanode将会在一次磁盘读取操作中向前额外的多读取一定字节的数据,在线性读取时,这可以有效降低IO操作延迟。
           * 这个特性需要在Datanode上开启Native libaries,否则不会生效
           */
          this.cachingStrategy = dfsClient.getDefaultReadCachingStrategy();
        }
        //读取block信息
        openInfo();
      }
    
    5. DFSInputStream.openInfo()
     void openInfo() throws IOException, UnresolvedLinkException {
        synchronized(infoLock) {
          //读取block块信息并且获得最后一个block块的长度
          lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
          //默认三次,dfs.client.retry.times.get-last-block-length
          int retriesForLastBlockLength = dfsClient.getConf().retryTimesForGetLastBlockLength;
          while (retriesForLastBlockLength > 0) {
            // Getting last block length as -1 is a special case. When cluster
            // restarts, DNs may not report immediately. At this time partial block
            // locations will not be available with NN for getting the length. Lets
            // retry for 3 times to get the length.
            if (lastBlockBeingWrittenLength == -1) {
              DFSClient.LOG.warn("Last block locations not available. "
                  + "Datanodes might not have reported blocks completely."
                  + " Will retry for " + retriesForLastBlockLength + " times");
              //默认4000毫秒,dfs.client.retry.interval-ms.get-last-block-length
              waitFor(dfsClient.getConf().retryIntervalForGetLastBlockLength);
              lastBlockBeingWrittenLength = fetchLocatedBlocksAndGetLastBlockLength();
            } else {
              break;
            }
            retriesForLastBlockLength--;
          }
          if (retriesForLastBlockLength == 0) {
            throw new IOException("Could not obtain the last block locations.");
          }
        }
      }
    
    6. DFSInputStream.fetchLocatedBlocksAndGetLastBlockLength()
    private long fetchLocatedBlocksAndGetLastBlockLength() throws IOException {
        //客户端向namenode请求获取block信息
        final LocatedBlocks newInfo = dfsClient.getLocatedBlocks(src, 0);
        if (DFSClient.LOG.isDebugEnabled()) {
          DFSClient.LOG.debug("newInfo = " + newInfo);
        }
        if (newInfo == null) {
          throw new IOException("Cannot open filename " + src);
        }
    
        if (locatedBlocks != null) {
          Iterator<LocatedBlock> oldIter = locatedBlocks.getLocatedBlocks().iterator();
          Iterator<LocatedBlock> newIter = newInfo.getLocatedBlocks().iterator();
          while (oldIter.hasNext() && newIter.hasNext()) {
            if (! oldIter.next().getBlock().equals(newIter.next().getBlock())) {
              throw new IOException("Blocklist for " + src + " has changed!");
            }
          }
        }
        locatedBlocks = newInfo;
        long lastBlockBeingWrittenLength = 0;
        if (!locatedBlocks.isLastBlockComplete()) {
          final LocatedBlock last = locatedBlocks.getLastLocatedBlock();
          if (last != null) {
            if (last.getLocations().length == 0) {
              if (last.getBlockSize() == 0) {
                // if the length is zero, then no data has been written to
                // datanode. So no need to wait for the locations.
                return 0;
              }
              return -1;
            }
            final long len = readBlockLength(last);
            last.getBlock().setNumBytes(len);
            lastBlockBeingWrittenLength = len; 
          }
        }
        fileEncryptionInfo = locatedBlocks.getFileEncryptionInfo();
        return lastBlockBeingWrittenLength;
      }
    
    7. 获取block信息DFSClient.getLocatedBlocks()
      public LocatedBlocks getLocatedBlocks(String src, long start)
          throws IOException {
        // DFSClient.callGetBlockLocations调用
        return getLocatedBlocks(src, start, dfsClientConf.prefetchSize);
      }
    
      /*
       * This is just a wrapper around callGetBlockLocations, but non-static so that
       * we can stub it out for tests.
       */
      @VisibleForTesting
      public LocatedBlocks getLocatedBlocks(String src, long start, long length)
          throws IOException {
        TraceScope scope = getPathTraceScope("getBlockLocations", src);
        try {
          // 远程RPC调用namenode server
          return callGetBlockLocations(namenode, src, start, length);
        } finally {
          scope.close();
        }
      }
    
    8. 远程RPC调用DFSClient.callGetBlockLocations()
    /**
       * @see ClientProtocol#getBlockLocations(String, long, long)
       */
      static LocatedBlocks callGetBlockLocations(ClientProtocol namenode,
          String src, long start, long length) 
          throws IOException {
        try {
        //通过ClientProtocol(ClientNamenodeProtocolTranslatorPB)的协议向namenode请求
          return namenode.getBlockLocations(src, start, length);
        } catch(RemoteException re) {
          throw re.unwrapRemoteException(AccessControlException.class,
                                         FileNotFoundException.class,
                                         UnresolvedPathException.class);
        }
      }
    
    
    9. RPC NN ClientNamenodeProtocolTranslatorPB.getBlockLocations()
    @Override
      public LocatedBlocks getBlockLocations(String src, long offset, long length)
          throws AccessControlException, FileNotFoundException,
          UnresolvedLinkException, IOException {
         
        GetBlockLocationsRequestProto req = GetBlockLocationsRequestProto
            .newBuilder()
            .setSrc(src)
            .setOffset(offset)
            .setLength(length)
            .build();
        try {
          // rpc调用暂时不分析了
          //调用NameNodeRpcServer.getBlockLocations
          //rpcProxy: localhost/127.0.0.1:51397, ProtobufRpcEngine, ClientNamenodeProtocolPB
          GetBlockLocationsResponseProto resp = rpcProxy.getBlockLocations(null, req);
          return resp.hasLocations() ? PBHelper.convert(resp.getLocations()) : null;
        } catch (ServiceException e) {
          throw ProtobufHelper.getRemoteException(e);
        }
      }
    

    三、hdfs namenode服务端获取block信息

    1. namenode获取block信息NameNodeRpcServer.getBlockLocations()
      public LocatedBlocks getBlockLocations(String src, long offset,  long length) 
          throws IOException {
        checkNNStartup();
        metrics.incrGetBlockLocations();
        //调用FSNamesystem.getBlockLocations()
        return namesystem.getBlockLocations(getClientMachine(),  src, offset, length);
      }
    
    2. 获取排序后block FSNamesystem.getBlockLocations()
    /**
       * Get block locations within the specified range.
       * @see ClientProtocol#getBlockLocations(String, long, long)
       */
      LocatedBlocks getBlockLocations(String clientMachine, String srcArg,
          long offset, long length) throws IOException {
        checkOperation(OperationCategory.READ);
        GetBlockLocationsResult res = null;
        FSPermissionChecker pc = getPermissionChecker();
        readLock();
        try {
          checkOperation(OperationCategory.READ);
         //获取指定区间的block
          res = getBlockLocations(pc, srcArg, offset, length, true, true);
        } catch (AccessControlException e) {
          logAuditEvent(false, "open", srcArg);
          throw e;
        } finally {
          readUnlock();
        }
    
        logAuditEvent(true, "open", srcArg);
    
        if (res.updateAccessTime()) {
          byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(
              srcArg);
          String src = srcArg;
          writeLock();
          final long now = now();
          try {
            checkOperation(OperationCategory.WRITE);
            /**
             * Resolve the path again and update the atime only when the file
             * exists.
             *
             * XXX: Races can still occur even after resolving the path again.
             * For example:
             *
             * <ul>
             *   <li>Get the block location for "/a/b"</li>
             *   <li>Rename "/a/b" to "/c/b"</li>
             *   <li>The second resolution still points to "/a/b", which is
             *   wrong.</li>
             * </ul>
             *
             * The behavior is incorrect but consistent with the one before
             * HDFS-7463. A better fix is to change the edit log of SetTime to
             * use inode id instead of a path.
             */
            src = dir.resolvePath(pc, srcArg, pathComponents);
            final INodesInPath iip = dir.getINodesInPath(src, true);
            INode inode = iip.getLastINode();
            boolean updateAccessTime = inode != null &&
                now > inode.getAccessTime() + getAccessTimePrecision();
            if (!isInSafeMode() && updateAccessTime) {
              boolean changed = FSDirAttrOp.setTimes(dir,
                  inode, -1, now, false, iip.getLatestSnapshotId());
              if (changed) {
                getEditLog().logTimes(src, -1, now);
              }
            }
          } catch (Throwable e) {
            LOG.warn("Failed to update the access time of " + src, e);
          } finally {
            writeUnlock();
          }
        }
    
        LocatedBlocks blocks = res.blocks;
        // 对block副本所在的datanode节点按照到client的网络拓扑距离排序
        if (blocks != null) {
          blockManager.getDatanodeManager().sortLocatedBlocks(
              clientMachine, blocks.getLocatedBlocks());
          // lastBlock is not part of getLocatedBlocks(), might need to sort it too
          LocatedBlock lastBlock = blocks.getLastLocatedBlock();
          if (lastBlock != null) {
            ArrayList<LocatedBlock> lastBlockList = Lists.newArrayList(lastBlock);
            blockManager.getDatanodeManager().sortLocatedBlocks(
                clientMachine, lastBlockList);
          }
        }
        //将排序后的blocks返回客户端
        return blocks;
      }
    
    
    3. 获取blocks FSNamesystem.getBlockLocations()
    /**
       * Get block locations within the specified range.
       * @see ClientProtocol#getBlockLocations(String, long, long)
       * @throws IOException
       */
      GetBlockLocationsResult getBlockLocations(
          FSPermissionChecker pc, String src, long offset, long length,
          boolean needBlockToken, boolean checkSafeMode) throws IOException {
        if (offset < 0) {
          throw new HadoopIllegalArgumentException(
              "Negative offset is not supported. File: " + src);
        }
        if (length < 0) {
          throw new HadoopIllegalArgumentException(
              "Negative length is not supported. File: " + src);
        }
        final GetBlockLocationsResult ret = getBlockLocationsInt(
            pc, src, offset, length, needBlockToken);
    
        if (checkSafeMode && isInSafeMode()) {
          for (LocatedBlock b : ret.blocks.getLocatedBlocks()) {
            // if safemode & no block locations yet then throw safemodeException
            if ((b.getLocations() == null) || (b.getLocations().length == 0)) {
              SafeModeException se = new SafeModeException(
                  "Zero blocklocations for " + src, safeMode);
              if (haEnabled && haContext != null &&
                  haContext.getState().getServiceState() == HAServiceState.ACTIVE) {
                throw new RetriableException(se);
              } else {
                throw se;
              }
            }
          }
        }
        return ret;
      }
    
    4. 最后调用FSNamesystem.getBlockLocationsInt()
    private GetBlockLocationsResult getBlockLocationsInt(
          FSPermissionChecker pc, final String srcArg, long offset, long length,
          boolean needBlockToken)
          throws IOException {
        String src = srcArg;
        byte[][] pathComponents = FSDirectory.getPathComponentsForReservedPath(src);
        src = dir.resolvePath(pc, srcArg, pathComponents);
        final INodesInPath iip = dir.getINodesInPath(src, true);
        // 将path解析成INodeFile
        final INodeFile inode = INodeFile.valueOf(iip.getLastINode(), src);
        if (isPermissionEnabled) {
          dir.checkPathAccess(pc, iip, FsAction.READ);
          checkUnreadableBySuperuser(pc, inode, iip.getPathSnapshotId());
        }
    
        // 计算文件长度
        final long fileSize = iip.isSnapshot() ? inode.computeFileSize(iip.getPathSnapshotId()) : inode.computeFileSizeNotIncludingLastUcBlock();
        boolean isUc = inode.isUnderConstruction();
        if (iip.isSnapshot()) {
          // if src indicates a snapshot file, we need to make sure the returned
          // blocks do not exceed the size of the snapshot file.
          length = Math.min(length, fileSize - offset);
          isUc = false;
        }
    
        final FileEncryptionInfo feInfo =
            FSDirectory.isReservedRawName(srcArg) ? null
                : dir.getFileEncryptionInfo(inode, iip.getPathSnapshotId(), iip);
        //通过blockManager创建LocatedBlocks
        final LocatedBlocks blocks = blockManager.createLocatedBlocks(
            inode.getBlocks(iip.getPathSnapshotId()), fileSize,
            isUc, offset, length, needBlockToken, iip.isSnapshot(), feInfo);
        // Set caching information for the located blocks.
        for (LocatedBlock lb : blocks.getLocatedBlocks()) {
          cacheManager.setCachedLocations(lb);
        }
        final long now = now();
        boolean updateAccessTime = isAccessTimeSupported() && !isInSafeMode()
            && !iip.isSnapshot()
            && now > inode.getAccessTime() + getAccessTimePrecision();
        return new GetBlockLocationsResult(updateAccessTime, blocks);
      }
    

    四、 BlockManager创建LocatedBlocks

    1. 获取locaktedBolocks方法BlockManager.createLocatedBlocks()
    /** Create a LocatedBlocks. */
      public LocatedBlocks createLocatedBlocks(final BlockInfoContiguous[] blocks,
          final long fileSizeExcludeBlocksUnderConstruction,
          final boolean isFileUnderConstruction, final long offset,
          final long length, final boolean needBlockToken,
          final boolean inSnapshot, FileEncryptionInfo feInfo)
          throws IOException {
        assert namesystem.hasReadLock();
        if (blocks == null) {
          return null;
        } else if (blocks.length == 0) {
          return new LocatedBlocks(0, isFileUnderConstruction,
              Collections.<LocatedBlock>emptyList(), null, false, feInfo);
        } else {
          if (LOG.isDebugEnabled()) {
            LOG.debug("blocks = " + java.util.Arrays.asList(blocks));
          }
          final AccessMode mode = needBlockToken? AccessMode.READ: null;
          //根据blocks获取LocatedBlock集合
          //createLocatedBlockList也调用了createLocatedBlock方法
          final List<LocatedBlock> locatedblocks = createLocatedBlockList(
              blocks, offset, length, Integer.MAX_VALUE, mode);
    
          final LocatedBlock lastlb;
          final boolean isComplete;
          if (!inSnapshot) {
            final BlockInfoContiguous last = blocks[blocks.length - 1];
            final long lastPos = last.isComplete()?
                fileSizeExcludeBlocksUnderConstruction - last.getNumBytes()
                : fileSizeExcludeBlocksUnderConstruction;
            lastlb = createLocatedBlock(last, lastPos, mode);
            isComplete = last.isComplete();
          } else {
            lastlb = createLocatedBlock(blocks,
                fileSizeExcludeBlocksUnderConstruction, mode);
            isComplete = true;
          }
          return new LocatedBlocks(
              fileSizeExcludeBlocksUnderConstruction, isFileUnderConstruction,
              locatedblocks, lastlb, isComplete, feInfo);
        }
      }
    
    2. 最终获取block信息BlockManager.createLocatedBlock()
      /** @return a LocatedBlock for the given block */
      private LocatedBlock createLocatedBlock(final BlockInfoContiguous blk, final long pos
          ) throws IOException {
        if (blk instanceof BlockInfoContiguousUnderConstruction) {
          if (blk.isComplete()) {
            throw new IOException(
                "blk instanceof BlockInfoUnderConstruction && blk.isComplete()"
                + ", blk=" + blk);
          }
          final BlockInfoContiguousUnderConstruction uc =
              (BlockInfoContiguousUnderConstruction) blk;
          final DatanodeStorageInfo[] storages = uc.getExpectedStorageLocations();
          final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
          return new LocatedBlock(eb, storages, pos, false);
        }
    
    
        // get block locations
        // 计算blk但无法读取该Block的Datanode节点数
        final int numCorruptNodes = countNodes(blk).corruptReplicas();
        // 计算FSNamesystem在内存中维护的Block=>Datanode映射的列表中,无法读取该Block的Datanode节点数
        // corruptReplicasMap存储损坏数据块Block与它对应每个数据节点与损坏原因集合映射关系的集合
        //计算blk邻近信息块损坏的副本个数,正常情况下和numCorruptNodes一个相等
        final int numCorruptReplicas = corruptReplicas.numCorruptReplicas(blk);
        if (numCorruptNodes != numCorruptReplicas) {
          LOG.warn("Inconsistent number of corrupt replicas for "
              + blk + " blockMap has " + numCorruptNodes
              + " but corrupt replicas map has " + numCorruptReplicas);
        }
        // 获取Block所在位置(Datanode节点)
        // 计算文件blk邻近信息块存储在哪些Datanode节点上
        final int numNodes = blocksMap.numNodes(blk);
        // 如果损坏的数和副本数一样,则标识此block为坏的block
        final boolean isCorrupt = numCorruptNodes == numNodes;
        // 如果isCorrupt是true,则返回所有Datanode节点,否则,只返回可用的Block副本所在的Datanode节点
        final int numMachines = isCorrupt ? numNodes: numNodes - numCorruptNodes;
        final DatanodeStorageInfo[] machines = new DatanodeStorageInfo[numMachines];
        int j = 0;
        if (numMachines > 0) {
          for(DatanodeStorageInfo storage : blocksMap.getStorages(blk)) {
            final DatanodeDescriptor d = storage.getDatanodeDescriptor();
            final boolean replicaCorrupt = corruptReplicas.isReplicaCorrupt(blk, d);
            if (isCorrupt || (!replicaCorrupt))
              machines[j++] = storage;
          }
        }
        assert j == machines.length :
          "isCorrupt: " + isCorrupt + 
          " numMachines: " + numMachines +
          " numNodes: " + numNodes +
          " numCorrupt: " + numCorruptNodes +
          " numCorruptRepls: " + numCorruptReplicas;
        final ExtendedBlock eb = new ExtendedBlock(namesystem.getBlockPoolId(), blk);
        //返回blk和datanode实例化的LocatedBlock
        return new LocatedBlock(eb, machines, pos, isCorrupt);
      }
    
    

    相关文章

      网友评论

        本文标题:hdfs读之block读取解析<一>

        本文链接:https://www.haomeiwen.com/subject/dksajctx.html