美文网首页
hbase(一) : HTable

hbase(一) : HTable

作者: 以梦为马驾驾驾 | 来源:发表于2019-10-27 16:09 被阅读0次

    hbase1.3
    HTable 是我们对数据读取,操作的入口, implements HTableInterface, RegionLocator

    内部构造

      // connection使用的内部方法的接口, 可以探测集群状态, 表是否可用等
      protected ClusterConnection connection;
      private final TableName tableName;
      private volatile Configuration configuration; //
    
    /**
     * connection的配置参数, 封装了0.9x版本的writeBufferSize(flush缓冲的大小
     *)和maxKeyValueSize
    */
      private ConnectionConfiguration connConfiguration; 
    
      /**
       *  用来和单个表communicate,但是是给batch用的,用于异步put
       *  虽然可以多线程下使用,但是任然需要十分小心
       */
      protected BufferedMutatorImpl mutator;
      private boolean autoFlush = true;// 自动flush
      private boolean closed = false;
      protected int scannerCaching;
      protected long scannerMaxResultSize;// scanner返回的数据结构最大大小
      private ExecutorService pool;  // For Multi & Scan
      private int operationTimeout; // global timeout for each blocking method with retrying rpc
      private int rpcTimeout; // timeout for each rpc request
      private final boolean cleanupPoolOnClose; // shutdown the pool in close()
      private final boolean cleanupConnectionOnClose; // close the connection in close()
      private Consistency defaultConsistency = Consistency.STRONG;
      private HRegionLocator locator;// 查看单个Hbase table 的region分布, 定位
    

    put操作

    有一个检查 的动作待详细查看.

      @Override
      public void put(final Put put) throws IOException {
        getBufferedMutator().mutate(put);
        if (autoFlush) {
          flushCommits(); // 如果自动刷新
        }
      }
    

    关于BufferedMutator, 是用来缓存客户端的操作的, hbase 将客户端的DML抽象成了Mutation, 子类有: Append, Delete, Increment, Put操作.
    put方法将Put对象包装成Mutation,交给BufferedMutator, 到达设置的大小限制,或者主动调用flush操作, 会触发backgroundFlushCommits(boolean synchronous)操作, 然后Mutation由AsyncProcess提交,详细查看BufferedMutatorImpl类.
    AscncProcess提交后, (注释:Action类是将行与对应操作结合的类), 由connection去寻找每一行对应的region位置, 包装action, server, region等信息添加到MutiAction中去, 这个类持有按照region分组的actions,

           try{
              if (r == null) {
                throw new IllegalArgumentException("#" + id + ", row cannot be null");
              }
              // Make sure we get 0-s replica.
              RegionLocations locs = connection.locateRegion(
                  tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID); 
              if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
                throw new IOException("#" + id + ", no location found, aborting submit for"
                    + " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
              }
              loc = locs.getDefaultRegionLocation();
            } catch (IOException ex) {
              locationErrors = new ArrayList<Exception>();
              locationErrorRows = new ArrayList<Integer>();
              LOG.error("Failed to get region location ", ex);
              // This action failed before creating ars. Retain it, but do not add to submit list.
              // We will then add it to ars in an already-failed state.
              retainedActions.add(new Action<Row>(r, ++posInList));
              locationErrors.add(ex);
              locationErrorRows.add(posInList);
              it.remove();
              break; // Backward compat: we stop considering actions on location error.
            }
    .....................................................
    
       return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
            locationErrors, locationErrorRows, actionsByServer, pool);
      
    
        ars.sendMultiAction(actionsByServer, 1, null, false);
        
    

    然后会对每个action都创建SingleServerRequestRunnable(rpc caller 和rpc callable, caller call callable), 交给线程池去运行.

    Delete

    删除操作很简单: 创建RegionServerCallable, 然后rpc工厂类创建rpc caller来调用它

      @Override
      public void delete(final Delete delete)
      throws IOException {
        RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
            tableName, delete.getRow()) {
          @Override
          public Boolean call(int callTimeout) throws IOException {
            PayloadCarryingRpcController controller = rpcControllerFactory.newController();
            controller.setPriority(tableName);
            controller.setCallTimeout(callTimeout);
    
            try {
              MutateRequest request = RequestConverter.buildMutateRequest(
                getLocation().getRegionInfo().getRegionName(), delete);
              MutateResponse response = getStub().mutate(controller, request);
              return Boolean.valueOf(response.getProcessed());
            } catch (ServiceException se) {
              throw ProtobufUtil.getRemoteException(se);
            }
          }
        };
        rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
            this.operationTimeout);
      }
    
    

    get

    get和scan都是继承了Query
    get很简单:首先检查,这个get是否只是检查数据存在否, 并且检查是否指定了一致性等级(默认(Consistency.STRONG)), 之后创建rpc请求Request, 如果不是强一致性Consistency.TIMELINE, 则调用RpcRetryingCallerWithReadReplicas , 它可以从replica上读取, 返回的数据被标记为stale(读操作是通过Consistency.TIMELINE,然后读RPC将会首先发送到主region服务器上,在短时间内(hbase.client.primaryCallTimeout.get默认为10ms),如果主region没有响应RPC会被发送到从region。 之后结果会从第一个完成RPC的返回。如果响应是来自主region副本,我们就会知道数据是最新的,Result.isStale() API是检查过期数据,如果结果是 从region返回,那么Result.isStale()为true,然后用户就可以检查关于过期数据可能的原因。).

    
      private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
        // if we are changing settings to the get, clone it.
        if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
          get = ReflectionUtils.newInstance(get.getClass(), get);
          get.setCheckExistenceOnly(checkExistenceOnly);
          if (get.getConsistency() == null){
            get.setConsistency(defaultConsistency);
          }
        }
    
        if (get.getConsistency() == Consistency.STRONG) {
          // Good old call.
          final Get getReq = get;
          RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
              getName(), get.getRow()) {
            @Override
            public Result call(int callTimeout) throws IOException {
              ClientProtos.GetRequest request =
                RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
              PayloadCarryingRpcController controller = rpcControllerFactory.newController();
              controller.setPriority(tableName);
              controller.setCallTimeout(callTimeout);
              try {
                ClientProtos.GetResponse response = getStub().get(controller, request);
                if (response == null) return null;
                return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
              } catch (ServiceException se) {
                throw ProtobufUtil.getRemoteException(se);
              }
            }
          };
          return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
              this.operationTimeout);
        }
    
        // Call that takes into account the replica
        RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
          rpcControllerFactory, tableName, this.connection, get, pool,
          connConfiguration.getRetriesNumber(),
          operationTimeout,
          connConfiguration.getPrimaryCallTimeoutMicroSecond());
        return callable.call();
      }
    

    当replica_id=0的regin不可以时候, 给所有的replica region发送请求,获取第一个从这些replica返回的数据, 客户端可以 Result.isStale()检查是否是来自副本的数据

          addCallsForReplica(cs, rl, 1, rl.size() - 1);
    

    scan

    Scan类可以设置一系列的属性, startkey,endkey, 过滤器, 版本,缓存,最大取回大小等等, 但是获取数据是由 getScanner(Scan)返回的ResultScanner操作的.
    返回的ResultScanner有small, Reversed,big和纯client 的不同,

        if (scan.isReversed()) {
          if (scan.isSmall()) {
            return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
                this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
                pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
          } else {
            return new ReversedClientScanner(getConfiguration(), scan, getName(),
                this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
                pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
          }
        }
    
        if (scan.isSmall()) {
          return new ClientSmallScanner(getConfiguration(), scan, getName(),
              this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
              pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
        } else {
          return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
              this.rpcCallerFactory, this.rpcControllerFactory,
              pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
        }
    

    什么是small scan?

    https://issues.apache.org/jira/browse/HBASE-7266

    hbase里面有两种读操作:pread and seek+read.
    pread是一个函数,用于带偏移量地原子的从文件中读取数据。
    读路径:https://yq.aliyun.com/articles/602377/
    seek + read is fast but can cause two problem:
    (1) resource contention
    (2) cause too much network io

    另外,一些其他的解释:前者适合小于一个数据块(默认64k)的smallScan(openScanner, next, closeScanner将在同一次rpc中实现);而后者则会使用hdfs预读取,在DN中缓存一些数据(HBASE-7266、HBASE-9488)

    查看最普通的ClientScanner, 初始化的时候调用initializeScannerInConstruction, 这个方法去调用nextScanner(), 获取为下一个region准备的scanner , 这个scanner会发起rpc调用, 参数是ScannerCallable对象, 这是scanner 的动作的抽象, reversed 和 small 都有对应的ScannerCallable子类
    ScannerCallable 在初始化后, 的prepare阶段, 会从对应的region,获取对应的server, 获取这个server的ClientService.BlockingInterface接口, 并设置, 以便被调用的时候知道该向谁发起rpc请求

        ServerName dest = location.getServerName();
        setStub(super.getConnection().getClient(dest));
    

    call阶段, 在创建 RequestScan的时候, 参数nextCallSeq在client和server端都维持,,每次调用都会增加, 是为了client能正确获取下一批的数据 .

          CellScanner cellScanner = controller.cellScanner();
          rrs = ResponseConverter.getResults(cellScanner, response);
    

    相关文章

      网友评论

          本文标题:hbase(一) : HTable

          本文链接:https://www.haomeiwen.com/subject/vjcuvctx.html