美文网首页
hbase(一) : HTable

hbase(一) : HTable

作者: 以梦为马驾驾驾 | 来源:发表于2019-10-27 16:09 被阅读0次

hbase1.3
HTable 是我们对数据读取,操作的入口, implements HTableInterface, RegionLocator

内部构造

  // connection使用的内部方法的接口, 可以探测集群状态, 表是否可用等
  protected ClusterConnection connection;
  private final TableName tableName;
  private volatile Configuration configuration; //

/**
 * connection的配置参数, 封装了0.9x版本的writeBufferSize(flush缓冲的大小
 *)和maxKeyValueSize
*/
  private ConnectionConfiguration connConfiguration; 

  /**
   *  用来和单个表communicate,但是是给batch用的,用于异步put
   *  虽然可以多线程下使用,但是任然需要十分小心
   */
  protected BufferedMutatorImpl mutator;
  private boolean autoFlush = true;// 自动flush
  private boolean closed = false;
  protected int scannerCaching;
  protected long scannerMaxResultSize;// scanner返回的数据结构最大大小
  private ExecutorService pool;  // For Multi & Scan
  private int operationTimeout; // global timeout for each blocking method with retrying rpc
  private int rpcTimeout; // timeout for each rpc request
  private final boolean cleanupPoolOnClose; // shutdown the pool in close()
  private final boolean cleanupConnectionOnClose; // close the connection in close()
  private Consistency defaultConsistency = Consistency.STRONG;
  private HRegionLocator locator;// 查看单个Hbase table 的region分布, 定位

put操作

有一个检查 的动作待详细查看.

  @Override
  public void put(final Put put) throws IOException {
    getBufferedMutator().mutate(put);
    if (autoFlush) {
      flushCommits(); // 如果自动刷新
    }
  }

关于BufferedMutator, 是用来缓存客户端的操作的, hbase 将客户端的DML抽象成了Mutation, 子类有: Append, Delete, Increment, Put操作.
put方法将Put对象包装成Mutation,交给BufferedMutator, 到达设置的大小限制,或者主动调用flush操作, 会触发backgroundFlushCommits(boolean synchronous)操作, 然后Mutation由AsyncProcess提交,详细查看BufferedMutatorImpl类.
AscncProcess提交后, (注释:Action类是将行与对应操作结合的类), 由connection去寻找每一行对应的region位置, 包装action, server, region等信息添加到MutiAction中去, 这个类持有按照region分组的actions,

       try{
          if (r == null) {
            throw new IllegalArgumentException("#" + id + ", row cannot be null");
          }
          // Make sure we get 0-s replica.
          RegionLocations locs = connection.locateRegion(
              tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID); 
          if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
            throw new IOException("#" + id + ", no location found, aborting submit for"
                + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
          }
          loc = locs.getDefaultRegionLocation();
        } catch (IOException ex) {
          locationErrors = new ArrayList<Exception>();
          locationErrorRows = new ArrayList<Integer>();
          LOG.error("Failed to get region location ", ex);
          // This action failed before creating ars. Retain it, but do not add to submit list.
          // We will then add it to ars in an already-failed state.
          retainedActions.add(new Action<Row>(r, ++posInList));
          locationErrors.add(ex);
          locationErrorRows.add(posInList);
          it.remove();
          break; // Backward compat: we stop considering actions on location error.
        }
.....................................................

   return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
        locationErrors, locationErrorRows, actionsByServer, pool);
  
    ars.sendMultiAction(actionsByServer, 1, null, false);
    

然后会对每个action都创建SingleServerRequestRunnable(rpc caller 和rpc callable, caller call callable), 交给线程池去运行.

Delete

删除操作很简单: 创建RegionServerCallable, 然后rpc工厂类创建rpc caller来调用它

  @Override
  public void delete(final Delete delete)
  throws IOException {
    RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
        tableName, delete.getRow()) {
      @Override
      public Boolean call(int callTimeout) throws IOException {
        PayloadCarryingRpcController controller = rpcControllerFactory.newController();
        controller.setPriority(tableName);
        controller.setCallTimeout(callTimeout);

        try {
          MutateRequest request = RequestConverter.buildMutateRequest(
            getLocation().getRegionInfo().getRegionName(), delete);
          MutateResponse response = getStub().mutate(controller, request);
          return Boolean.valueOf(response.getProcessed());
        } catch (ServiceException se) {
          throw ProtobufUtil.getRemoteException(se);
        }
      }
    };
    rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
        this.operationTimeout);
  }

get

get和scan都是继承了Query
get很简单:首先检查,这个get是否只是检查数据存在否, 并且检查是否指定了一致性等级(默认(Consistency.STRONG)), 之后创建rpc请求Request, 如果不是强一致性Consistency.TIMELINE, 则调用RpcRetryingCallerWithReadReplicas , 它可以从replica上读取, 返回的数据被标记为stale(读操作是通过Consistency.TIMELINE,然后读RPC将会首先发送到主region服务器上,在短时间内(hbase.client.primaryCallTimeout.get默认为10ms),如果主region没有响应RPC会被发送到从region。 之后结果会从第一个完成RPC的返回。如果响应是来自主region副本,我们就会知道数据是最新的,Result.isStale() API是检查过期数据,如果结果是 从region返回,那么Result.isStale()为true,然后用户就可以检查关于过期数据可能的原因。).


  private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
    // if we are changing settings to the get, clone it.
    if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
      get = ReflectionUtils.newInstance(get.getClass(), get);
      get.setCheckExistenceOnly(checkExistenceOnly);
      if (get.getConsistency() == null){
        get.setConsistency(defaultConsistency);
      }
    }

    if (get.getConsistency() == Consistency.STRONG) {
      // Good old call.
      final Get getReq = get;
      RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
          getName(), get.getRow()) {
        @Override
        public Result call(int callTimeout) throws IOException {
          ClientProtos.GetRequest request =
            RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
          PayloadCarryingRpcController controller = rpcControllerFactory.newController();
          controller.setPriority(tableName);
          controller.setCallTimeout(callTimeout);
          try {
            ClientProtos.GetResponse response = getStub().get(controller, request);
            if (response == null) return null;
            return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
          } catch (ServiceException se) {
            throw ProtobufUtil.getRemoteException(se);
          }
        }
      };
      return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
          this.operationTimeout);
    }

    // Call that takes into account the replica
    RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
      rpcControllerFactory, tableName, this.connection, get, pool,
      connConfiguration.getRetriesNumber(),
      operationTimeout,
      connConfiguration.getPrimaryCallTimeoutMicroSecond());
    return callable.call();
  }

当replica_id=0的regin不可以时候, 给所有的replica region发送请求,获取第一个从这些replica返回的数据, 客户端可以 Result.isStale()检查是否是来自副本的数据

      addCallsForReplica(cs, rl, 1, rl.size() - 1);

scan

Scan类可以设置一系列的属性, startkey,endkey, 过滤器, 版本,缓存,最大取回大小等等, 但是获取数据是由 getScanner(Scan)返回的ResultScanner操作的.
返回的ResultScanner有small, Reversed,big和纯client 的不同,

    if (scan.isReversed()) {
      if (scan.isSmall()) {
        return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
            this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
            pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
      } else {
        return new ReversedClientScanner(getConfiguration(), scan, getName(),
            this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
            pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
      }
    }

    if (scan.isSmall()) {
      return new ClientSmallScanner(getConfiguration(), scan, getName(),
          this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
          pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
    } else {
      return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
          this.rpcCallerFactory, this.rpcControllerFactory,
          pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
    }

什么是small scan?

https://issues.apache.org/jira/browse/HBASE-7266

hbase里面有两种读操作:pread and seek+read.
pread是一个函数,用于带偏移量地原子的从文件中读取数据。
读路径:https://yq.aliyun.com/articles/602377/
seek + read is fast but can cause two problem:
(1) resource contention
(2) cause too much network io

另外,一些其他的解释:前者适合小于一个数据块(默认64k)的smallScan(openScanner, next, closeScanner将在同一次rpc中实现);而后者则会使用hdfs预读取,在DN中缓存一些数据(HBASE-7266、HBASE-9488)

查看最普通的ClientScanner, 初始化的时候调用initializeScannerInConstruction, 这个方法去调用nextScanner(), 获取为下一个region准备的scanner , 这个scanner会发起rpc调用, 参数是ScannerCallable对象, 这是scanner 的动作的抽象, reversed 和 small 都有对应的ScannerCallable子类
ScannerCallable 在初始化后, 的prepare阶段, 会从对应的region,获取对应的server, 获取这个server的ClientService.BlockingInterface接口, 并设置, 以便被调用的时候知道该向谁发起rpc请求

    ServerName dest = location.getServerName();
    setStub(super.getConnection().getClient(dest));

call阶段, 在创建 RequestScan的时候, 参数nextCallSeq在client和server端都维持,,每次调用都会增加, 是为了client能正确获取下一批的数据 .

      CellScanner cellScanner = controller.cellScanner();
      rrs = ResponseConverter.getResults(cellScanner, response);

相关文章

  • HBase优化四——HTable优化

    HTable是HBase客户端与HBase服务端通讯的Java API对象,客户端可以通过HTable对象与服务端...

  • HTable和HTablePool使用注意事项

    HTable和HTablePool使用注意事项 HTable和HTablePool都是HBase客户端API的一部...

  • hbase(一) : HTable

    hbase1.3HTable 是我们对数据读取,操作的入口, implements HTableInterface...

  • Hbase客户端API(1)

    Hbase的主要客户端接口通过org.apache.hadoop.hbase.client包中的HTable类来实...

  • HBase CURD之Delete

    HBase CURD之Delete HTable提供了删除方法,同时与之前的方法一样有一个相应的类为Delete。...

  • hBase之HTable踩坑

    刚发布完,异常暴增,报警电话响个不停,看了下异常信息,竟然是这货: 看到这异常第一反应就是,完了,HTable线程...

  • Hbase性能调优

    client端 HTable.setAutoFlush(false) 关闭客户端自动刷新 HTable.setWr...

  • Hbase运行机制

    本文思路 Hbase是什么 Hbase的优劣 Hbase架构 Hbase容错 Hbase使用总结 HBase是什么...

  • 云平台配置4:配置完全分布式的HBase

    配置完全分布式的 HBase 一、 配置 HBase 下载 HBase ,例如 hbase-1.2.5.tar.g...

  • HBase原理I-HLog文件

    一、Hbase介绍 1、Hbase简介 Hbase是Hadoop Database的简称 ,Hbase项目是由Po...

网友评论

      本文标题:hbase(一) : HTable

      本文链接:https://www.haomeiwen.com/subject/vjcuvctx.html