美文网首页
HBase 源码阅读-put操作(region server 端

HBase 源码阅读-put操作(region server 端

作者: pcqlegend | 来源:发表于2018-12-10 21:17 被阅读0次

HBase的源代码还是比较长的,为了方便大家找到入口,这儿简单列列举client的调用栈,然后主要看server端的代码

client端

1 org.apache.hadoop.hbase.client.HTable#put(org.apache.hadoop.hbase.client.Put)
2 org.apache.hadoop.hbase.client.BufferedMutatorImpl#backgroundFlushCommits
3 org.apache.hadoop.hbase.client.AsyncProcess#submit
4 org.apache.hadoop.hbase.client.AsyncProcess#submitMultiActions
5 org.apache.hadoop.hbase.client.AsyncProcess#createAsyncRequestFuture
6 org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl#sendMultiAction
org.apache.hadoop.hbase.client.MultiServerCallable#call
7org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl.SingleServerRequestRunnable#run
8 org.apache.hadoop.hbase.client.MultiServerCallable#call
9 org.apache.hadoop.hbase.client.RegionServerCallable#getStub
10 org.apache.hadoop.hbase.regionserver.RSRpcServices#mutate

server 端

11 org.apache.hadoop.hbase.regionserver.HRegion#put(org.apache.hadoop.hbase.client.Put)
首先看下put主要逻辑

  • 执行batch的mutate
  • 判断是否需要flush,如果需要则进行flush操作
 */
  OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
    boolean initialized = false;
    Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
    startRegionOperation(op);
    try {
      while (!batchOp.isDone()) {
        if (!batchOp.isInReplay()) {
          checkReadOnly();
        }
        checkResources();

        if (!initialized) {
          this.writeRequestsCount.add(batchOp.operations.length);
          if (!batchOp.isInReplay()) {
            doPreMutationHook(batchOp);
          }
          initialized = true;
        }
        doMiniBatchMutation(batchOp);
        long newSize = this.getMemstoreSize();
        if (isFlushSize(newSize)) {
          requestFlush();
        }
      }
    } finally {
      closeRegionOperation(op);
    }
    return batchOp.retCodeDetails;
  }

以下是主要的doMiniBatchMutation代码逻辑,代码很长,直接翻译的代码中的英文(代码注释还是很多的 ).

代码很长,这儿我只把源码中的几个步骤拿出来说一下,总共9步骤
STEP1 尽可能多的获取行锁,至少获取一个
STEP2 更新每个cell的时间戳
STEP3 创建WAL日志
STEP4 将最后的修改添加到wal中,但是不做同步
STEP5 写入memorystore
STEP6 释放行锁
STEP7 同步wal
STEP8 更新mvcc
STEP9 运行协处理器post hooks,必须在wal同步之后才能执行(协处理器参考https://blog.csdn.net/m0_37636453/article/details/79284138

然后再看下flush的request

  @Override
  public void requestFlush(Region r, boolean forceFlushAllStores) {
    synchronized (regionsInQueue) {
      if (!regionsInQueue.containsKey(r)) {
        // This entry has no delay so it will be added at the top of the flush
        // queue.  It'll come out near immediately.
        FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
        this.regionsInQueue.put(r, fqe);
        this.flushQueue.add(fqe);
      }
    }
  }

加入到flust队列,然后看下flush的线程org.apache.hadoop.hbase.regionserver.MemStoreFlusher.FlushHandler#run

 @Override
    public void run() {
      while (!server.isStopped()) {
        FlushQueueEntry fqe = null;
        try {
          wakeupPending.set(false); // allow someone to wake us up again
          fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
          if (fqe == null || fqe instanceof WakeupFlushThread) {
            if (isAboveLowWaterMark()) {
              LOG.debug("Flush thread woke up because memory above low water="
                  + TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));
              if (!flushOneForGlobalPressure()) {
                // Wasn't able to flush any region, but we're above low water mark
                // This is unlikely to happen, but might happen when closing the
                // entire server - another thread is flushing regions. We'll just
                // sleep a little bit to avoid spinning, and then pretend that
                // we flushed one, so anyone blocked will check again
                Thread.sleep(1000);
                wakeUpIfBlocking();
              }
              // Enqueue another one of these tokens so we'll wake up again
              wakeupFlushThread();
            }
            continue;
          }
          FlushRegionEntry fre = (FlushRegionEntry) fqe;
          if (!flushRegion(fre)) {
            break;
          }
        } catch (InterruptedException ex) {
          continue;
        } catch (ConcurrentModificationException ex) {
          continue;
        } catch (Exception ex) {
          LOG.error("Cache flusher failed for entry " + fqe, ex);
          if (!server.checkFileSystem()) {
            break;
          }
        }
      }
      synchronized (regionsInQueue) {
        regionsInQueue.clear();
        flushQueue.clear();
      }

      // Signal anyone waiting, so they see the close flag
      wakeUpIfBlocking();
      LOG.info(getName() + " exiting");
    }
  }

然后调用了flushRegion方法
org.apache.hadoop.hbase.regionserver.MemStoreFlusher#flushRegion(org.apache.hadoop.hbase.regionserver.MemStoreFlusher.FlushRegionEntry)
这会有两个逻辑,如果region下的store files文件太多,则会将flush加入队列,否则直接进行flush,

  private boolean flushRegion(final Region region, final boolean emergencyFlush,
      boolean forceFlushAllStores) {
    long startTime = 0;
    synchronized (this.regionsInQueue) {
      FlushRegionEntry fqe = this.regionsInQueue.remove(region);
      // Use the start time of the FlushRegionEntry if available
      if (fqe != null) {
        startTime = fqe.createTime;
      }
      if (fqe != null && emergencyFlush) {
        // Need to remove from region from delay queue.  When NOT an
        // emergencyFlush, then item was removed via a flushQueue.poll.
        flushQueue.remove(fqe);
     }
    }
    if (startTime == 0) {
      // Avoid getting the system time unless we don't have a FlushRegionEntry;
      // shame we can't capture the time also spent in the above synchronized
      // block
      startTime = EnvironmentEdgeManager.currentTime();
    }
    lock.readLock().lock();
    try {
      notifyFlushRequest(region, emergencyFlush);
      FlushResult flushResult = region.flush(forceFlushAllStores);
      boolean shouldCompact = flushResult.isCompactionNeeded();
      // We just want to check the size
      boolean shouldSplit = ((HRegion)region).checkSplit() != null;
      if (shouldSplit) {
        this.server.compactSplitThread.requestSplit(region);
      } else if (shouldCompact) {
        server.compactSplitThread.requestSystemCompaction(
            region, Thread.currentThread().getName());
      }
      if (flushResult.isFlushSucceeded()) {
        long endTime = EnvironmentEdgeManager.currentTime();
        server.metricsRegionServer.updateFlushTime(endTime - startTime);
      }
    } catch (DroppedSnapshotException ex) {
      // Cache flush can fail in a few places. If it fails in a critical
      // section, we get a DroppedSnapshotException and a replay of wal
      // is required. Currently the only way to do this is a restart of
      // the server. Abort because hdfs is probably bad (HBASE-644 is a case
      // where hdfs was bad but passed the hdfs check).
      server.abort("Replay of WAL required. Forcing server shutdown", ex);
      return false;
    } catch (IOException ex) {
      LOG.error("Cache flush failed" + (region != null ? (" for region " +
          Bytes.toStringBinary(region.getRegionInfo().getRegionName())) : ""),
        RemoteExceptionHandler.checkIOException(ex));
      if (!server.checkFileSystem()) {
        return false;
      }
    } finally {
      lock.readLock().unlock();
      wakeUpIfBlocking();
    }
    return true;
  }
flush主要逻辑

首先获取memStore的读锁, lock.readLock().lock();
进行flush FlushResult flushResult = region.flush(forceFlushAllStores);
判断是否需要compaction boolean shouldCompact = flushResult.isCompactionNeeded();
判断是否需要split则调用split的线程的requestSplit方法 this.server.compactSplitThread.requestSplit(region);
如果是需要进行合并则调用合并算法
server.compactSplitThread.requestSystemCompaction

相关文章

  • HBase 源码阅读-put操作(region server 端

    HBase的源代码还是比较长的,为了方便大家找到入口,这儿简单列列举client的调用栈,然后主要看server端...

  • Hbase

    HBase存储架构图 HBase Master 为Region server分配region 负责Region s...

  • HBase学习 - HRegionServer启动

    本文基于hbase-1.3.0源码 1. 前言 本文主要介绍hbase里region server的创建和启动过程...

  • hbase-put流程剖析

    众所周知hbase是一个写性能非常优越的NOSQL,今天从源码中分析一下put操作到了region中是如何进行...

  • HBase 和 Phoenix 的结构

    HBase 结构 可以看到 HBase 集群由 Master、Region Server、ZooKeeper、HD...

  • hbase存储相关浅析

    ​Hbase存储相关介绍 Region Server: 不同Region数据互斥(Table+StartKey+T...

  • HBase 预分区 & Phoenix 加盐

    HBase 热点问题 刚创建 HBase 表的时候默认只有一个 Region 由一个 Region Server ...

  • Hbase

    预分区 在创建Hbase表的时候默认一张表只有一个region,所有的put操作都会往这一个region中填充数据...

  • Hbase_读写流程

    Hbase架构图 Hbase写操作 执行put命令,put 'namspace:table','rowkey',...

  • 07. HBase数据存取流程解析

    客户端数据存取流程 客户端与HBase系统的写入交互阶段 用户提交put请求后,HBase客户端会将put请求添加...

网友评论

      本文标题:HBase 源码阅读-put操作(region server 端

      本文链接:https://www.haomeiwen.com/subject/fcudhqtx.html