Hbase Get 请求源码分析

作者: pcqlegend | 来源:发表于2018-08-24 19:32 被阅读0次

HBase Get请求源码分析

这儿看的client是包 hbase-client-1.2.0-cdh5.11.1.jar
先回顾一下HBase get 请求的流程

  1. 请求ZK 获得hbase:meta表所在regionserver,这个版本已经没有-root-表了代码中有写
  2. 请求hbase:meta所在的regionserver 获得 表table,row所在的regionserver 信息 这一步会缓存hbase:meta信息
  3. 向对应的包含table,row的的regisonserver 发起请求 这一步会缓存对应table和row的信息
private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
    // if we are changing settings to the get, clone it.
    if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
      get = ReflectionUtils.newInstance(get.getClass(), get);
      if (get.getConsistency() == null){

    if (get.getConsistency() == Consistency.STRONG) {
      // Good old call.
      final Get getReq = get;
      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
          getName(), get.getRow()) {
        public Result call(int callTimeout) throws IOException {
          ClientProtos.GetRequest request =
            RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
          try {
            ClientProtos.GetResponse response = getStub().get(controller, request);
            if (response == null) return null;
            return ProtobufUtil.toResult(response.getResult());
          } catch (ServiceException se) {
            throw ProtobufUtil.getRemoteException(se);
      return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,

首先get的Consistency默认是STRONG,HBASE一致性可以参考 (https://www.w3cschool.cn/hbase_doc/hbase_doc-rm8b2ruf.html

public T callWithRetries(RetryingCallable<T> callable, int callTimeout)
  throws IOException, RuntimeException {
    List<RetriesExhaustedException.ThrowableWithExtraContext> exceptions =
      new ArrayList<RetriesExhaustedException.ThrowableWithExtraContext>();
    this.globalStartTime = EnvironmentEdgeManager.currentTime();
    for (int tries = 0;; tries++) {
      long expectedSleep;
      try {
        callable.prepare(tries != 0); // if called with false, check table status on ZK
        interceptor.intercept(context.prepare(callable, tries));
        return callable.call(getTimeout(callTimeout));
      } catch (PreemptiveFastFailException e) {
        throw e;
      } catch (Throwable t) {
        if (tries > startLogErrorsCnt) {
          LOG.info("Call exception, tries=" + tries + ", retries=" + retries + ", started=" +
              (EnvironmentEdgeManager.currentTime() - this.globalStartTime) + " ms ago, "
              + "cancelled=" + cancelled.get() + ", msg="
              + callable.getExceptionMessageAdditionalDetail());

        // translateException throws exception when should not retry: i.e. when request is bad.
        interceptor.handleFailure(context, t);
        t = translateException(t);
        callable.throwable(t, retries != 1);
        RetriesExhaustedException.ThrowableWithExtraContext qt =
            new RetriesExhaustedException.ThrowableWithExtraContext(t,
                EnvironmentEdgeManager.currentTime(), toString());
        if (tries >= retries - 1) {
          throw new RetriesExhaustedException(tries, exceptions);
        // If the server is dead, we need to wait a little before retrying, to give
        //  a chance to the regions to be
        // tries hasn't been bumped up yet so we use "tries + 1" to get right pause time
        expectedSleep = callable.sleep(pause, tries + 1);

        // If, after the planned sleep, there won't be enough time left, we stop now.
        long duration = singleCallDuration(expectedSleep);
        if (duration > callTimeout) {
          String msg = "callTimeout=" + callTimeout + ", callDuration=" + duration +
              ": " + callable.getExceptionMessageAdditionalDetail();
          throw (SocketTimeoutException)(new SocketTimeoutException(msg).initCause(t));
      } finally {
      try {
        if (expectedSleep > 0) {
          synchronized (cancelled) {
            if (cancelled.get()) return null;
        if (cancelled.get()) return null;
      } catch (InterruptedException e) {
        throw new InterruptedIOException("Interrupted after " + tries + " tries  on " + retries);


callable.prepare(tries != 0); // if called with false, check table status on ZK
interceptor.intercept(context.prepare(callable, tries));
return callable.call(getTimeout(callTimeout));


  public void prepare(final boolean reload) throws IOException {
    try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
      this.location = regionLocator.getRegionLocation(row, reload);
    if (this.location == null) {
      throw new IOException("Failed to find location, tableName=" + tableName +
        ", row=" + Bytes.toString(row) + ", reload=" + reload);

org.apache.hadoop.hbase.client.ConnectionManager.HConnectionImplementation.locateRegion(org.apache.hadoop.hbase.TableName, byte[], boolean, boolean, int)

    public RegionLocations locateRegion(final TableName tableName,
      final byte [] row, boolean useCache, boolean retry, int replicaId)
    throws IOException {
      if (this.closed) throw new IOException(toString() + " closed");
      if (tableName== null || tableName.getName().length == 0) {
        throw new IllegalArgumentException(
            "table name cannot be null or zero length");
      if (tableName.equals(TableName.META_TABLE_NAME)) {
        if (useMetaReplicas) {
          return locateMeta(tableName, useCache, replicaId);
        } else {
          return this.registry.getMetaRegionLocation();
      } else {
        // Region not in the cache - have to go to the meta RS
        return locateRegionInMeta(tableName, row, useCache, retry, replicaId);

根据传入的tablename和row生成一个key,这个key就是meta表中的key。生成的算法 类似如下的样子,tablename,,row.hash
然后发起ClientSmallReversedScanner 调用返回的结果如下

session,,1515750236668.3921e38c29c3e99f8d4b8f857447d2e7 column=info:regioninfo, timestamp=1534252868373, value={ENCODED => 3921e38c29c3e99f8d4b8f857447d2e7, NAME => 'session,,1515750236668.3921e38c29c3e99f8d4b8f857447d2e7.
 .                                                       ', STARTKEY => '', ENDKEY => '2|8620485395|2'}
 session,,1515750236668.3921e38c29c3e99f8d4b8f857447d2e7 column=info:seqnumDuringOpen, timestamp=1534252868373, value=\x00\x00\x00\x00\x06Oq\xE5
 session,,1515750236668.3921e38c29c3e99f8d4b8f857447d2e7 column=info:server, timestamp=1534252868373, value=hb3.hdp.cn:60020
 session,,1515750236668.3921e38c29c3e99f8d4b8f857447d2e7 column=info:serverstartcode, timestamp=1534252868373, value=1531762503774


 private RegionLocations locateRegionInMeta(TableName tableName, byte[] row,
                   boolean useCache, boolean retry, int replicaId) throws IOException {

      // If we are supposed to be using the cache, look in the cache to see if
      // we already have the region.
      if (useCache) {
        RegionLocations locations = getCachedLocation(tableName, row);
        if (locations != null && locations.getRegionLocation(replicaId) != null) {
          return locations;

      // build the key of the meta region we should be looking for.
      // the extra 9's on the end are necessary to allow "exact" matches
      // without knowing the precise region names.
      byte[] metaKey = HRegionInfo.createRegionName(tableName, row, HConstants.NINES, false);

      Scan s = null;
      if (useMetaReplicas) {
        // for CDH-5.0 compatibility, we are not going to use reverse scan
        // unless the user enabled meta replicas. In this case we know that
        // the server support reverse scan.
        s = new Scan();

      int localNumRetries = (retry ? numTries : 1);

      for (int tries = 0; true; tries++) {
        if (tries >= localNumRetries) {
          throw new NoServerForRegionException("Unable to find region for "
              + Bytes.toStringBinary(row) + " in " + tableName +
              " after " + localNumRetries + " tries.");
        if (useCache) {
          RegionLocations locations = getCachedLocation(tableName, row);
          if (locations != null && locations.getRegionLocation(replicaId) != null) {
            return locations;
        } else {
          // If we are not supposed to be using the cache, delete any existing cached location
          // so it won't interfere.
          metaCache.clearCache(tableName, row);

        RegionLocations metaLocation = null;
        // Query the meta region
        try {
          Result regionInfoRow = null;
          if (s != null) {
            ReversedClientScanner rcs = null;
            try {
              rcs = new ClientSmallReversedScanner(conf, s, TableName.META_TABLE_NAME, this,
                rpcCallerFactory, rpcControllerFactory, getMetaLookupPool(), 0);
              regionInfoRow = rcs.next();
            } finally {
              if (rcs != null) {
          // if we are using meta replicas, we may end up with an empty row,
          // if the replica did not received the update yet.
          // In this case we fallback to the old method for safety.
          if (regionInfoRow == null) {
            // locate the meta region (compatible with versions that does not support reverse scan)
            metaLocation = locateRegion(TableName.META_TABLE_NAME, metaKey, false, false);
            // If null still, go around again.
            if (metaLocation == null) continue;

            ClientService.BlockingInterface service =

            // This block guards against two threads trying to load the meta
            // region at the same time. The first will load the meta region and
            // the second will use the value that the first one found.
            if (useCache) {
              if (TableName.META_TABLE_NAME.equals(tableName) && getRegionCachePrefetch(tableName)) {
                // Check the cache again for a hit in case some other thread made the
                // same query while we were waiting on the lock.
                RegionLocations locations = getCachedLocation(tableName, row);
                if (locations != null && locations.getRegionLocation(replicaId) != null) {
                  return locations;
                // If the parent table is META, we may want to pre-fetch some
                // region info into the global region cache for this table.
                prefetchRegionCache(tableName, row);
              RegionLocations locations = getCachedLocation(tableName, row);
              if (locations != null && locations.getRegionLocation(replicaId) != null) {
                return locations;
            } else {
              // If we are not supposed to be using the cache, delete any existing cached location
              // so it won't interfere.
              metaCache.clearCache(tableName, row);

            // Query the meta region for the location of the meta region
            regionInfoRow = ProtobufUtil.getRowOrBefore(service,
              metaKey, HConstants.CATALOG_FAMILY);

          if (regionInfoRow == null) {
            throw new TableNotFoundException(tableName);

          // convert the row result into the HRegionLocation we need!
          RegionLocations locations = MetaTableAccessor.getRegionLocations(regionInfoRow);
          if (locations == null || locations.getRegionLocation(replicaId) == null) {
            throw new IOException("HRegionInfo was null in " +
              tableName + ", row=" + regionInfoRow);
          HRegionInfo regionInfo = locations.getRegionLocation(replicaId).getRegionInfo();
          if (regionInfo == null) {
            throw new IOException("HRegionInfo was null or empty in " +
              TableName.META_TABLE_NAME + ", row=" + regionInfoRow);

          // possible we got a region of a different table...
          if (!regionInfo.getTable().equals(tableName)) {
            throw new TableNotFoundException(
                  "Table '" + tableName + "' was not found, got: " +
                  regionInfo.getTable() + ".");
          if (regionInfo.isSplit()) {
            throw new RegionOfflineException("the only available region for" +
              " the required row is a split parent," +
              " the daughters should be online soon: " +
          if (regionInfo.isOffline()) {
            throw new RegionOfflineException("the region is offline, could" +
              " be caused by a disable table call: " +

          ServerName serverName = locations.getRegionLocation(replicaId).getServerName();
          if (serverName == null) {
            throw new NoServerForRegionException("No server address listed " +
              "in " + TableName.META_TABLE_NAME + " for region " +
              regionInfo.getRegionNameAsString() + " containing row " +

          if (isDeadServer(serverName)){
            throw new RegionServerStoppedException("hbase:meta says the region "+
                regionInfo.getRegionNameAsString()+" is managed by the server " + serverName +
                ", but it is dead.");
          // Instantiate the location
          cacheLocation(tableName, locations);
          return locations;
        } catch (TableNotFoundException e) {
          // if we got this error, probably means the table just plain doesn't
          // exist. rethrow the error immediately. this should always be coming
          // from the HTable constructor.
          throw e;
        } catch (IOException e) {

          if (e instanceof RemoteException) {
            e = ((RemoteException)e).unwrapRemoteException();
          if (tries < localNumRetries - 1) {
            if (LOG.isDebugEnabled()) {
              LOG.debug("locateRegionInMeta parentTable=" +
                  TableName.META_TABLE_NAME + ", metaLocation=" + metaLocation +
                ", attempt=" + tries + " of " +
                localNumRetries + " failed; retrying after sleep of " +
                ConnectionUtils.getPauseTime(this.pause, tries) + " because: " + e.getMessage());
          } else {
            throw e;
          // Only relocate the parent region if necessary
          if(!(e instanceof RegionOfflineException ||
              e instanceof NoServerForRegionException)) {
            relocateRegion(TableName.META_TABLE_NAME, metaKey, replicaId);
          Thread.sleep(ConnectionUtils.getPauseTime(this.pause, tries));
        } catch (InterruptedException e) {
          throw new InterruptedIOException("Giving up trying to location region in " +
            "meta: thread is interrupted.");

因为metaReplicas是false(就是兼容 不支持reverse scan 版本))关于reverse sscan 官方介绍 ,所以这scanner是null,所以调用到
metaLocation = locateRegion(TableName.META_TABLE_NAME, metaKey, false, false);
看locateRegion代码,这次传入的是META_TABLE_NAME,所以会调用 locateMeta

 private RegionLocations locateMeta(final TableName tableName,
        boolean useCache, int replicaId) throws IOException {
      // HBASE-10785: We cache the location of the META itself, so that we are not overloading
      // zookeeper with one request for every region lookup. We cache the META with empty row
      // key in MetaCache.
      byte[] metaCacheKey = HConstants.EMPTY_START_ROW; // use byte[0] as the row for meta
      RegionLocations locations = null;
      if (useCache) {
        locations = getCachedLocation(tableName, metaCacheKey);
        if (locations != null && locations.getRegionLocation(replicaId) != null) {
          return locations;

      // only one thread should do the lookup.
      synchronized (metaRegionLock) {
        // Check the cache again for a hit in case some other thread made the
        // same query while we were waiting on the lock.
        if (useCache) {
          locations = getCachedLocation(tableName, metaCacheKey);
          if (locations != null && locations.getRegionLocation(replicaId) != null) {
            return locations;

        // Look up from zookeeper
        locations = this.registry.getMetaRegionLocation();
        if (locations != null) {
          cacheLocation(tableName, locations);
      return locations;

然后看一下 org.apache.hadoop.hbase.client.ZooKeeperRegistry.getMetaRegionLocation

  public RegionLocations getMetaRegionLocation() throws IOException {
    ZooKeeperKeepAliveConnection zkw = hci.getKeepAliveZooKeeperWatcher();

    try {
      if (LOG.isTraceEnabled()) {
        LOG.trace("Looking up meta region location in ZK," + " connection=" + this);
      List<ServerName> servers = new MetaTableLocator().blockUntilAvailable(zkw, hci.rpcTimeout,
      if (LOG.isTraceEnabled()) {
        if (servers == null) {
          LOG.trace("Looked up meta region location, connection=" + this +
            "; servers = null");
        } else {
          StringBuilder str = new StringBuilder();
          for (ServerName s : servers) {
            str.append(" ");
          LOG.trace("Looked up meta region location, connection=" + this +
            "; servers = " + str.toString());
      if (servers == null) return null;
      HRegionLocation[] locs = new HRegionLocation[servers.size()];
      int i = 0;
      for (ServerName server : servers) {
        HRegionInfo h = RegionReplicaUtil.getRegionInfoForReplica(
                HRegionInfo.FIRST_META_REGIONINFO, i);
        if (server == null) locs[i++] = null;
        else locs[i++] = new HRegionLocation(h, server, 0);
      return new RegionLocations(locs);
    } catch (InterruptedException e) {
      return null;
    } finally {

返回的region location 如下
region=hbase:meta,,1.1588230740, hostname=tr4.hdp.cn,60021,1514197831557, seqNum=0
数据结构很明显,告诉了hbase:meta,表所在的机器信息 tr4.hdp.cn ,端口号60021等信息
然后查询meta的regionserver 获得region的信息

{ENCODED => c6b3becb3eddc91b4ae69d77c1c4eaf5, NAME => 'V3_UserProfile,22592009084298381312286811,1509355646476.c6b3becb3eddc91b4ae69d77c1c4eaf5.', STARTKEY => '22592009084298381312286811', ENDKEY => '34362040585555200000613694'}

这儿的结构也比较明显table sesssion,region,开始key和结束的key

  public void prepare(final boolean reload) throws IOException {
    try (RegionLocator regionLocator = connection.getRegionLocator(tableName)) {
      this.location = regionLocator.getRegionLocation(row, reload);
    if (this.location == null) {
      throw new IOException("Failed to find location, tableName=" + tableName +
        ", row=" + Bytes.toString(row) + ", reload=" + reload);




