美文网首页
HBase 源码阅读-put操作(region server 端

HBase 源码阅读-put操作(region server 端

作者: pcqlegend | 来源:发表于2018-12-10 21:17 被阅读0次

    HBase的源代码还是比较长的,为了方便大家找到入口,这儿简单列列举client的调用栈,然后主要看server端的代码

    client端

    1 org.apache.hadoop.hbase.client.HTable#put(org.apache.hadoop.hbase.client.Put)
    2 org.apache.hadoop.hbase.client.BufferedMutatorImpl#backgroundFlushCommits
    3 org.apache.hadoop.hbase.client.AsyncProcess#submit
    4 org.apache.hadoop.hbase.client.AsyncProcess#submitMultiActions
    5 org.apache.hadoop.hbase.client.AsyncProcess#createAsyncRequestFuture
    6 org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl#sendMultiAction
    org.apache.hadoop.hbase.client.MultiServerCallable#call
    7org.apache.hadoop.hbase.client.AsyncProcess.AsyncRequestFutureImpl.SingleServerRequestRunnable#run
    8 org.apache.hadoop.hbase.client.MultiServerCallable#call
    9 org.apache.hadoop.hbase.client.RegionServerCallable#getStub
    10 org.apache.hadoop.hbase.regionserver.RSRpcServices#mutate

    server 端

    11 org.apache.hadoop.hbase.regionserver.HRegion#put(org.apache.hadoop.hbase.client.Put)
    首先看下put主要逻辑

    • 执行batch的mutate
    • 判断是否需要flush,如果需要则进行flush操作
     */
      OperationStatus[] batchMutate(BatchOperationInProgress<?> batchOp) throws IOException {
        boolean initialized = false;
        Operation op = batchOp.isInReplay() ? Operation.REPLAY_BATCH_MUTATE : Operation.BATCH_MUTATE;
        startRegionOperation(op);
        try {
          while (!batchOp.isDone()) {
            if (!batchOp.isInReplay()) {
              checkReadOnly();
            }
            checkResources();
    
            if (!initialized) {
              this.writeRequestsCount.add(batchOp.operations.length);
              if (!batchOp.isInReplay()) {
                doPreMutationHook(batchOp);
              }
              initialized = true;
            }
            doMiniBatchMutation(batchOp);
            long newSize = this.getMemstoreSize();
            if (isFlushSize(newSize)) {
              requestFlush();
            }
          }
        } finally {
          closeRegionOperation(op);
        }
        return batchOp.retCodeDetails;
      }
    

    以下是主要的doMiniBatchMutation代码逻辑,代码很长,直接翻译的代码中的英文(代码注释还是很多的 ).

    代码很长,这儿我只把源码中的几个步骤拿出来说一下,总共9步骤
    STEP1 尽可能多的获取行锁,至少获取一个
    STEP2 更新每个cell的时间戳
    STEP3 创建WAL日志
    STEP4 将最后的修改添加到wal中,但是不做同步
    STEP5 写入memorystore
    STEP6 释放行锁
    STEP7 同步wal
    STEP8 更新mvcc
    STEP9 运行协处理器post hooks,必须在wal同步之后才能执行(协处理器参考https://blog.csdn.net/m0_37636453/article/details/79284138

    然后再看下flush的request

      @Override
      public void requestFlush(Region r, boolean forceFlushAllStores) {
        synchronized (regionsInQueue) {
          if (!regionsInQueue.containsKey(r)) {
            // This entry has no delay so it will be added at the top of the flush
            // queue.  It'll come out near immediately.
            FlushRegionEntry fqe = new FlushRegionEntry(r, forceFlushAllStores);
            this.regionsInQueue.put(r, fqe);
            this.flushQueue.add(fqe);
          }
        }
      }
    

    加入到flust队列,然后看下flush的线程org.apache.hadoop.hbase.regionserver.MemStoreFlusher.FlushHandler#run

     @Override
        public void run() {
          while (!server.isStopped()) {
            FlushQueueEntry fqe = null;
            try {
              wakeupPending.set(false); // allow someone to wake us up again
              fqe = flushQueue.poll(threadWakeFrequency, TimeUnit.MILLISECONDS);
              if (fqe == null || fqe instanceof WakeupFlushThread) {
                if (isAboveLowWaterMark()) {
                  LOG.debug("Flush thread woke up because memory above low water="
                      + TraditionalBinaryPrefix.long2String(globalMemStoreLimitLowMark, "", 1));
                  if (!flushOneForGlobalPressure()) {
                    // Wasn't able to flush any region, but we're above low water mark
                    // This is unlikely to happen, but might happen when closing the
                    // entire server - another thread is flushing regions. We'll just
                    // sleep a little bit to avoid spinning, and then pretend that
                    // we flushed one, so anyone blocked will check again
                    Thread.sleep(1000);
                    wakeUpIfBlocking();
                  }
                  // Enqueue another one of these tokens so we'll wake up again
                  wakeupFlushThread();
                }
                continue;
              }
              FlushRegionEntry fre = (FlushRegionEntry) fqe;
              if (!flushRegion(fre)) {
                break;
              }
            } catch (InterruptedException ex) {
              continue;
            } catch (ConcurrentModificationException ex) {
              continue;
            } catch (Exception ex) {
              LOG.error("Cache flusher failed for entry " + fqe, ex);
              if (!server.checkFileSystem()) {
                break;
              }
            }
          }
          synchronized (regionsInQueue) {
            regionsInQueue.clear();
            flushQueue.clear();
          }
    
          // Signal anyone waiting, so they see the close flag
          wakeUpIfBlocking();
          LOG.info(getName() + " exiting");
        }
      }
    

    然后调用了flushRegion方法
    org.apache.hadoop.hbase.regionserver.MemStoreFlusher#flushRegion(org.apache.hadoop.hbase.regionserver.MemStoreFlusher.FlushRegionEntry)
    这会有两个逻辑,如果region下的store files文件太多,则会将flush加入队列,否则直接进行flush,

      private boolean flushRegion(final Region region, final boolean emergencyFlush,
          boolean forceFlushAllStores) {
        long startTime = 0;
        synchronized (this.regionsInQueue) {
          FlushRegionEntry fqe = this.regionsInQueue.remove(region);
          // Use the start time of the FlushRegionEntry if available
          if (fqe != null) {
            startTime = fqe.createTime;
          }
          if (fqe != null && emergencyFlush) {
            // Need to remove from region from delay queue.  When NOT an
            // emergencyFlush, then item was removed via a flushQueue.poll.
            flushQueue.remove(fqe);
         }
        }
        if (startTime == 0) {
          // Avoid getting the system time unless we don't have a FlushRegionEntry;
          // shame we can't capture the time also spent in the above synchronized
          // block
          startTime = EnvironmentEdgeManager.currentTime();
        }
        lock.readLock().lock();
        try {
          notifyFlushRequest(region, emergencyFlush);
          FlushResult flushResult = region.flush(forceFlushAllStores);
          boolean shouldCompact = flushResult.isCompactionNeeded();
          // We just want to check the size
          boolean shouldSplit = ((HRegion)region).checkSplit() != null;
          if (shouldSplit) {
            this.server.compactSplitThread.requestSplit(region);
          } else if (shouldCompact) {
            server.compactSplitThread.requestSystemCompaction(
                region, Thread.currentThread().getName());
          }
          if (flushResult.isFlushSucceeded()) {
            long endTime = EnvironmentEdgeManager.currentTime();
            server.metricsRegionServer.updateFlushTime(endTime - startTime);
          }
        } catch (DroppedSnapshotException ex) {
          // Cache flush can fail in a few places. If it fails in a critical
          // section, we get a DroppedSnapshotException and a replay of wal
          // is required. Currently the only way to do this is a restart of
          // the server. Abort because hdfs is probably bad (HBASE-644 is a case
          // where hdfs was bad but passed the hdfs check).
          server.abort("Replay of WAL required. Forcing server shutdown", ex);
          return false;
        } catch (IOException ex) {
          LOG.error("Cache flush failed" + (region != null ? (" for region " +
              Bytes.toStringBinary(region.getRegionInfo().getRegionName())) : ""),
            RemoteExceptionHandler.checkIOException(ex));
          if (!server.checkFileSystem()) {
            return false;
          }
        } finally {
          lock.readLock().unlock();
          wakeUpIfBlocking();
        }
        return true;
      }
    
    flush主要逻辑

    首先获取memStore的读锁, lock.readLock().lock();
    进行flush FlushResult flushResult = region.flush(forceFlushAllStores);
    判断是否需要compaction boolean shouldCompact = flushResult.isCompactionNeeded();
    判断是否需要split则调用split的线程的requestSplit方法 this.server.compactSplitThread.requestSplit(region);
    如果是需要进行合并则调用合并算法
    server.compactSplitThread.requestSystemCompaction

    相关文章

      网友评论

          本文标题:HBase 源码阅读-put操作(region server 端

          本文链接:https://www.haomeiwen.com/subject/fcudhqtx.html