HBase的源代码还是比较长的,为了方便大家找到入口,这儿简单列列举client的调用栈,然后主要看server端的代码
client端
1 org.apache.hadoop.hbase.client.HTable#put(org.apache.hadoop.hbase.client.Put)
2 org.apache.hadoop.hbase.client.BufferedMutatorImpl#backgroundFlushCommits
3 org.apache.hadoop.hbase.client.AsyncProcess#submit
4 org.apache.hadoop.hbase.client.AsyncProcess#submitMultiActions
5 org.apache.hadoop.hbase.client.AsyncProcess#createAsyncRequestFuture
6 org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl#sendMultiAction
org.apache.hadoop.hbase.client.MultiServerCallable#call
7org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl.SingleServerRequestRunnable#run
8 org.apache.hadoop.hbase.client.MultiServerCallable#call
9 org.apache.hadoop.hbase.client.RegionServerCallable#getStub
10 org.apache.hadoop.hbase.regionserver.RSRpcServices#mutate
server 端
11 org.apache.hadoop.hbase.regionserver.HRegion#put(org.apache.hadoop.hbase.client.Put)
首先看下put主要逻辑
- 执行batch的mutate
- 判断是否需要flush,如果需要则进行flush操作
*/
OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
boolean initialized = false;
Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
startRegionOperation(op);
try {
while (!batchOp.isDone()) {
if (!batchOp.isInReplay()) {
checkReadOnly();
}
checkResources();
if (!initialized) {
this.writeRequestsCount.add(batchOp.operations.length);
if (!batchOp.isInReplay()) {
doPreMutationHook(batchOp);
}
initialized = true;
}
doMiniBatchMutation(batchOp);
long newSize = this.getMemstoreSize();
if (isFlushSize(newSize)) {
requestFlush();
}
}
} finally {
closeRegionOperation(op);
}
return batchOp.retCodeDetails;
}
以下是主要的doMiniBatchMutation代码逻辑,代码很长,直接翻译的代码中的英文(代码注释还是很多的 ).
代码很长,这儿我只把源码中的几个步骤拿出来说一下,总共9步骤
STEP1 尽可能多的获取行锁,至少获取一个
STEP2 更新每个cell的时间戳
STEP3 创建WAL日志
STEP4 将最后的修改添加到wal中,但是不做同步
STEP5 写入memorystore
STEP6 释放行锁
STEP7 同步wal
STEP8 更新mvcc
STEP9 运行协处理器post hooks,必须在wal同步之后才能执行(协处理器参考https://blog.csdn.net/m0_37636453/article/details/79284138)
然后再看下flush的request
@Override
public void requestFlush(Region r, boolean forceFlushAllStores) {
synchronized (regionsInQueue) {
if (!regionsInQueue.containsKey(r)) {
// This entry has no delay so it will be added at the top of the flush
// queue. It'll come out near immediately.
FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
this.regionsInQueue.put(r, fqe);
this.flushQueue.add(fqe);
}
}
}
加入到flust队列,然后看下flush的线程org.apache.hadoop.hbase.regionserver.MemStoreFlusher.FlushHandler#run
@Override
public void run() {
while (!server.isStopped()) {
FlushQueueEntry fqe = null;
try {
wakeupPending.set(false); // allow someone to wake us up again
fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
if (fqe == null || fqe instanceof WakeupFlushThread) {
if (isAboveLowWaterMark()) {
LOG.debug("Flush thread woke up because memory above low water="
+ TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));
if (!flushOneForGlobalPressure()) {
// Wasn't able to flush any region, but we're above low water mark
// This is unlikely to happen, but might happen when closing the
// entire server - another thread is flushing regions. We'll just
// sleep a little bit to avoid spinning, and then pretend that
// we flushed one, so anyone blocked will check again
Thread.sleep(1000);
wakeUpIfBlocking();
}
// Enqueue another one of these tokens so we'll wake up again
wakeupFlushThread();
}
continue;
}
FlushRegionEntry fre = (FlushRegionEntry) fqe;
if (!flushRegion(fre)) {
break;
}
} catch (InterruptedException ex) {
continue;
} catch (ConcurrentModificationException ex) {
continue;
} catch (Exception ex) {
LOG.error("Cache flusher failed for entry " + fqe, ex);
if (!server.checkFileSystem()) {
break;
}
}
}
synchronized (regionsInQueue) {
regionsInQueue.clear();
flushQueue.clear();
}
// Signal anyone waiting, so they see the close flag
wakeUpIfBlocking();
LOG.info(getName() + " exiting");
}
}
然后调用了flushRegion方法
org.apache.hadoop.hbase.regionserver.MemStoreFlusher#flushRegion(org.apache.hadoop.hbase.regionserver.MemStoreFlusher.FlushRegionEntry)
这会有两个逻辑,如果region下的store files文件太多,则会将flush加入队列,否则直接进行flush,
private boolean flushRegion(final Region region, final boolean emergencyFlush,
boolean forceFlushAllStores) {
long startTime = 0;
synchronized (this.regionsInQueue) {
FlushRegionEntry fqe = this.regionsInQueue.remove(region);
// Use the start time of the FlushRegionEntry if available
if (fqe != null) {
startTime = fqe.createTime;
}
if (fqe != null && emergencyFlush) {
// Need to remove from region from delay queue. When NOT an
// emergencyFlush, then item was removed via a flushQueue.poll.
flushQueue.remove(fqe);
}
}
if (startTime == 0) {
// Avoid getting the system time unless we don't have a FlushRegionEntry;
// shame we can't capture the time also spent in the above synchronized
// block
startTime = EnvironmentEdgeManager.currentTime();
}
lock.readLock().lock();
try {
notifyFlushRequest(region, emergencyFlush);
FlushResult flushResult = region.flush(forceFlushAllStores);
boolean shouldCompact = flushResult.isCompactionNeeded();
// We just want to check the size
boolean shouldSplit = ((HRegion)region).checkSplit() != null;
if (shouldSplit) {
this.server.compactSplitThread.requestSplit(region);
} else if (shouldCompact) {
server.compactSplitThread.requestSystemCompaction(
region, Thread.currentThread().getName());
}
if (flushResult.isFlushSucceeded()) {
long endTime = EnvironmentEdgeManager.currentTime();
server.metricsRegionServer.updateFlushTime(endTime - startTime);
}
} catch (DroppedSnapshotException ex) {
// Cache flush can fail in a few places. If it fails in a critical
// section, we get a DroppedSnapshotException and a replay of wal
// is required. Currently the only way to do this is a restart of
// the server. Abort because hdfs is probably bad (HBASE-644 is a case
// where hdfs was bad but passed the hdfs check).
server.abort("Replay of WAL required. Forcing server shutdown", ex);
return false;
} catch (IOException ex) {
LOG.error("Cache flush failed" + (region != null ? (" for region " +
Bytes.toStringBinary(region.getRegionInfo().getRegionName())) : ""),
RemoteExceptionHandler.checkIOException(ex));
if (!server.checkFileSystem()) {
return false;
}
} finally {
lock.readLock().unlock();
wakeUpIfBlocking();
}
return true;
}
flush主要逻辑
首先获取memStore的读锁, lock.readLock().lock();
进行flush FlushResult flushResult = region.flush(forceFlushAllStores);
判断是否需要compaction boolean shouldCompact = flushResult.isCompactionNeeded();
判断是否需要split则调用split的线程的requestSplit方法 this.server.compactSplitThread.requestSplit(region);
如果是需要进行合并则调用合并算法
server.compactSplitThread.requestSystemCompaction
网友评论