hbase1.3
HTable 是我们对数据读取,操作的入口, implements HTableInterface, RegionLocator
内部构造
// connection使用的内部方法的接口, 可以探测集群状态, 表是否可用等
protected ClusterConnection connection;
private final TableName tableName;
private volatile Configuration configuration; //
/**
* connection的配置参数, 封装了0.9x版本的writeBufferSize(flush缓冲的大小
*)和maxKeyValueSize
*/
private ConnectionConfiguration connConfiguration;
/**
* 用来和单个表communicate,但是是给batch用的,用于异步put
* 虽然可以多线程下使用,但是任然需要十分小心
*/
protected BufferedMutatorImpl mutator;
private boolean autoFlush = true;// 自动flush
private boolean closed = false;
protected int scannerCaching;
protected long scannerMaxResultSize;// scanner返回的数据结构最大大小
private ExecutorService pool; // For Multi & Scan
private int operationTimeout; // global timeout for each blocking method with retrying rpc
private int rpcTimeout; // timeout for each rpc request
private final boolean cleanupPoolOnClose; // shutdown the pool in close()
private final boolean cleanupConnectionOnClose; // close the connection in close()
private Consistency defaultConsistency = Consistency.STRONG;
private HRegionLocator locator;// 查看单个Hbase table 的region分布, 定位
put操作
有一个检查 的动作待详细查看.
@Override
public void put(final Put put) throws IOException {
getBufferedMutator().mutate(put);
if (autoFlush) {
flushCommits(); // 如果自动刷新
}
}
关于BufferedMutator, 是用来缓存客户端的操作的, hbase 将客户端的DML抽象成了Mutation
, 子类有: Append, Delete, Increment, Put
操作.
put方法将Put对象包装成Mutation,交给BufferedMutator, 到达设置的大小限制,或者主动调用flush操作, 会触发backgroundFlushCommits(boolean synchronous)
操作, 然后Mutation由AsyncProcess
提交,详细查看BufferedMutatorImpl
类.
由AscncProcess
提交后, (注释:Action类是将行与对应操作结合的类), 由connection去寻找每一行对应的region位置, 包装action, server, region等信息添加到MutiAction
中去, 这个类持有按照region分组的actions,
try{
if (r == null) {
throw new IllegalArgumentException("#" + id + ", row cannot be null");
}
// Make sure we get 0-s replica.
RegionLocations locs = connection.locateRegion(
tableName, r.getRow(), true, true, RegionReplicaUtil.DEFAULT_REPLICA_ID);
if (locs == null || locs.isEmpty() || locs.getDefaultRegionLocation() == null) {
throw new IOException("#" + id + ", no location found, aborting submit for"
+ " tableName=" + tableName + " rowkey=" + Bytes.toStringBinary(r.getRow()));
}
loc = locs.getDefaultRegionLocation();
} catch (IOException ex) {
locationErrors = new ArrayList<Exception>();
locationErrorRows = new ArrayList<Integer>();
LOG.error("Failed to get region location ", ex);
// This action failed before creating ars. Retain it, but do not add to submit list.
// We will then add it to ars in an already-failed state.
retainedActions.add(new Action<Row>(r, ++posInList));
locationErrors.add(ex);
locationErrorRows.add(posInList);
it.remove();
break; // Backward compat: we stop considering actions on location error.
}
.....................................................
return submitMultiActions(tableName, retainedActions, nonceGroup, callback, null, needResults,
locationErrors, locationErrorRows, actionsByServer, pool);
ars.sendMultiAction(actionsByServer, 1, null, false);
然后会对每个action都创建SingleServerRequestRunnable
(rpc caller 和rpc callable, caller call callable), 交给线程池去运行.
Delete
删除操作很简单: 创建RegionServerCallable
, 然后rpc工厂类创建rpc caller来调用它
@Override
public void delete(final Delete delete)
throws IOException {
RegionServerCallable<Boolean> callable = new RegionServerCallable<Boolean>(connection,
tableName, delete.getRow()) {
@Override
public Boolean call(int callTimeout) throws IOException {
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
MutateRequest request = RequestConverter.buildMutateRequest(
getLocation().getRegionInfo().getRegionName(), delete);
MutateResponse response = getStub().mutate(controller, request);
return Boolean.valueOf(response.getProcessed());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
rpcCallerFactory.<Boolean> newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
get
get和scan都是继承了Query
get很简单:首先检查,这个get是否只是检查数据存在否, 并且检查是否指定了一致性等级(默认(Consistency.STRONG)), 之后创建rpc请求Request, 如果不是强一致性Consistency.TIMELINE, 则调用RpcRetryingCallerWithReadReplicas
, 它可以从replica上读取, 返回的数据被标记为stale(读操作是通过Consistency.TIMELINE,然后读RPC将会首先发送到主region服务器上,在短时间内(hbase.client.primaryCallTimeout.get默认为10ms),如果主region没有响应RPC会被发送到从region。 之后结果会从第一个完成RPC的返回。如果响应是来自主region副本,我们就会知道数据是最新的,Result.isStale() API是检查过期数据,如果结果是 从region返回,那么Result.isStale()为true,然后用户就可以检查关于过期数据可能的原因。).
private Result get(Get get, final boolean checkExistenceOnly) throws IOException {
// if we are changing settings to the get, clone it.
if (get.isCheckExistenceOnly() != checkExistenceOnly || get.getConsistency() == null) {
get = ReflectionUtils.newInstance(get.getClass(), get);
get.setCheckExistenceOnly(checkExistenceOnly);
if (get.getConsistency() == null){
get.setConsistency(defaultConsistency);
}
}
if (get.getConsistency() == Consistency.STRONG) {
// Good old call.
final Get getReq = get;
RegionServerCallable<Result> callable = new RegionServerCallable<Result>(this.connection,
getName(), get.getRow()) {
@Override
public Result call(int callTimeout) throws IOException {
ClientProtos.GetRequest request =
RequestConverter.buildGetRequest(getLocation().getRegionInfo().getRegionName(), getReq);
PayloadCarryingRpcController controller = rpcControllerFactory.newController();
controller.setPriority(tableName);
controller.setCallTimeout(callTimeout);
try {
ClientProtos.GetResponse response = getStub().get(controller, request);
if (response == null) return null;
return ProtobufUtil.toResult(response.getResult(), controller.cellScanner());
} catch (ServiceException se) {
throw ProtobufUtil.getRemoteException(se);
}
}
};
return rpcCallerFactory.<Result>newCaller(rpcTimeout).callWithRetries(callable,
this.operationTimeout);
}
// Call that takes into account the replica
RpcRetryingCallerWithReadReplicas callable = new RpcRetryingCallerWithReadReplicas(
rpcControllerFactory, tableName, this.connection, get, pool,
connConfiguration.getRetriesNumber(),
operationTimeout,
connConfiguration.getPrimaryCallTimeoutMicroSecond());
return callable.call();
}
当replica_id=0的regin不可以时候, 给所有的replica region发送请求,获取第一个从这些replica返回的数据, 客户端可以 Result.isStale()检查是否是来自副本的数据
addCallsForReplica(cs, rl, 1, rl.size() - 1);
scan
Scan
类可以设置一系列的属性, startkey,endkey, 过滤器, 版本,缓存,最大取回大小等等, 但是获取数据是由 getScanner(Scan)返回的ResultScanner
操作的.
返回的ResultScanner
有small, Reversed,big和纯client 的不同,
if (scan.isReversed()) {
if (scan.isSmall()) {
return new ClientSmallReversedScanner(getConfiguration(), scan, getName(),
this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
} else {
return new ReversedClientScanner(getConfiguration(), scan, getName(),
this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
}
}
if (scan.isSmall()) {
return new ClientSmallScanner(getConfiguration(), scan, getName(),
this.connection, this.rpcCallerFactory, this.rpcControllerFactory,
pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
} else {
return new ClientScanner(getConfiguration(), scan, getName(), this.connection,
this.rpcCallerFactory, this.rpcControllerFactory,
pool, connConfiguration.getReplicaCallTimeoutMicroSecondScan());
}
什么是small scan?
见https://issues.apache.org/jira/browse/HBASE-7266
hbase里面有两种读操作:pread and seek+read.
pread是一个函数,用于带偏移量地原子的从文件中读取数据。
读路径:https://yq.aliyun.com/articles/602377/
seek + read is fast but can cause two problem:
(1) resource contention
(2) cause too much network io
另外,一些其他的解释:前者适合小于一个数据块(默认64k)的smallScan(openScanner, next, closeScanner将在同一次rpc中实现);而后者则会使用hdfs预读取,在DN中缓存一些数据(HBASE-7266、HBASE-9488)
查看最普通的ClientScanner
, 初始化的时候调用initializeScannerInConstruction
, 这个方法去调用nextScanner()
, 获取为下一个region准备的scanner , 这个scanner会发起rpc调用, 参数是ScannerCallable
对象, 这是scanner 的动作的抽象, reversed 和 small 都有对应的ScannerCallable
子类
ScannerCallable
在初始化后, 的prepare阶段, 会从对应的region,获取对应的server, 获取这个server的ClientService.BlockingInterface接口, 并设置, 以便被调用的时候知道该向谁发起rpc请求
ServerName dest = location.getServerName();
setStub(super.getConnection().getClient(dest));
call阶段, 在创建 RequestScan的时候, 参数nextCallSeq在client和server端都维持,,每次调用都会增加, 是为了client能正确获取下一批的数据 .
CellScanner cellScanner = controller.cellScanner();
rrs = ResponseConverter.getResults(cellScanner, response);
网友评论