美文网首页
hbase读写源码

hbase读写源码

作者: 羊吃白菜 | 来源:发表于2018-12-30 22:43 被阅读0次

    write以put为例
    客户端流程解析
    (1)用户提交put请求后,HBase客户端会将put请求添加到本地buffer中,符合一定条件就会通过AsyncProcess异步批量提交。HBase默认设置autoflush=true,表示put请求直接会提交给服务器进行处理;用户可以设置autoflush=false,这样的话put请求会首先放到本地buffer,等到本地buffer大小超过一定阈值(默认为2M,可以通过配置文件配置)之后才会提交。很显然,后者采用group commit机制提交请求,可以极大地提升写入性能,但是因为没有保护机制,如果客户端崩溃的话会导致提交的请求丢失。

    // 先将put加入到buffer中
      @Override
      public void put(final Put put) throws InterruptedIOException,
          RetriesExhaustedWithDetailsException {
        getBufferedMutator().mutate(put);
        if (autoFlush) {
          flushCommits();
        }
      }
      private void doMutate(Mutation m) throws InterruptedIOException,
          RetriesExhaustedWithDetailsException {
        if (closed) {
          throw new IllegalStateException("Cannot put when the BufferedMutator is closed.");
        }
        if (!(m instanceof Put) && !(m instanceof Delete)) {
          throw new IllegalArgumentException("Pass a Delete or a Put");
        }
    
        // This behavior is highly non-intuitive... it does not protect us against
        // 94-incompatible behavior, which is a timing issue because hasError, the below code
        // and setter of hasError are not synchronized. Perhaps it should be removed.
        if (ap.hasError()) {
          writeAsyncBuffer.add(m);
          backgroundFlushCommits(true);
        }
    
        if (m instanceof Put) {
          validatePut((Put) m);
        }
    
        currentWriteBufferSize += m.heapSize();
        writeAsyncBuffer.add(m);
        // size没有超过阈值异步flush,否则同步flush
        while (currentWriteBufferSize > writeBufferSize) {
          backgroundFlushCommits(false);
        }
      }
    

    (2)在提交之前,HBase会在元数据表.meta.中根据rowkey找到它们归属的region server,这个定位的过程是通过HConnection的locateRegion方法获得的。如果是批量请求的话还会把这些rowkey按照HRegionLocation分组,每个分组可以对应一次RPC请求。

    (3)HBase会为每个HRegionLocation构造一个远程RPC请求MultiServerCallable<Row>,然后通过rpcCallerFactory.<MultiResponse> newCaller()执行调用,忽略掉失败重新提交和错误处理,客户端的提交操作到此结束

    服务器端流程解析
    先简单介绍一下服务端HRegion中的各种lock:
    (1)scannerReadPointsLock:increment操作的时候需要拿到所有scanner中最小的readPoint以保证原子性,这个lock用来控制控制 new RegionScannerImpl和 getSmallestReadPoint的并发
    (2)lock:确保正常关闭
    (3)updatesLock:update公共资源如mvcc/memstore/wal加锁
    (4)RowLockContext:行锁put操作时需要对一行加锁


    服务端流程

    在HRegion中有许多replay的判断,replay是在rs failover和log split时重新将wal中数据持久化到rs时的动作,由于很多操作都是在写wal前已经完成了,所以这里不需要重复处理
    主要操作在HRegion.doMiniBatchMutation这个函数中,以源码步骤为例:
    (1)首先获取行锁

    // 这里加的是读锁,因为mvcc保证了consistence所以这里没必要加写锁,数据一致性由mvcc保证即可,在increment/append/batch中由于需要保证原子性需要加写锁
    rowLock = getRowLock(mutation.getRow(), true);
    

    (2)更新cell时间戳,如果cell中设置了时间则不需要更新,否则更新为当前时间

      public static boolean updateLatestStamp(Cell cell, byte[] ts, int tsOffset) throws IOException {
        if (cell.getTimestamp() == HConstants.LATEST_TIMESTAMP) {
          setTimestamp(cell, ts, tsOffset);
          return true;
        }
        return false;
      }
    

    (3)将变更wal写入buffer中

          // 加了一把读锁
          walEdit = new WALEdit(cellCount, isInReplay);
          lock(this.updatesLock.readLock(), numReadyToWrite);
          .......
          // 写入到buffer中
          addFamilyMapToWALEdit(familyMaps[i], walEdit);
    

    (4)将最后一个改到写入到wal buffer中,但是没有持久化

              // 生产的一个写入wal的key包含sequenceID和mvcc WriteEntry
              walKey = new HLogKey(this.getRegionInfo().getEncodedNameAsBytes(),
                  this.htableDescriptor.getTableName(), WALKey.NO_SEQUENCE_ID, now,
                  mutation.getClusterIds(), currentNonceGroup, currentNonce, mvcc, this.getReplicationScope());
              // 重要txid是一个transactionID用于后续wal 持久化,hdfs写入模型单独开另一章节再介绍
              txid =
                  this.wal.append(this.getRegionInfo(), walKey, walEdit, true);
    

    (5)将数据写入到memstore中,由于wal没有sync写入过程没有complete,由mvcc保证数据不可见

            applyFamilyMapToMemstore(familyMaps[i], memstoreSize);
    

    (6)释放行锁和更新锁

         if (locked) {
            this.updatesLock.readLock().unlock();
            locked = false;
          }
          releaseRowLocks(acquiredRowLocks);
    

    (7)sync wal
    (8)更新mvcc版本号

            mvcc.completeMemstoreInsert(writeEntry);
    

    (9)后续一些coprocessor工作
    put完之后,会根据条件讲一个flush request放入到taskqueue中,有一个单独线程执行request,最终调用HRegion.flush这个函数

    flush:
    如果内存超过一定阈值或者需要compact时,我们将内存中数据flush到磁盘上面,在flush的时候也用到了mvcc,这里是因为memstore中的数据有可能没有写到wal上面,我们插入一个事务保证flush时所有前面的事务已经完成了,保证我们flush到磁盘上面的数据都已经完全写入到wal上面

    // flush 分两个阶段准备阶段和commit阶段
      protected FlushResult internalFlushcache(final WAL wal, final long myseqid,
          final Collection<Store> storesToFlush, MonitoredTask status, boolean writeFlushWalMarker)
              throws IOException {
        PrepareFlushResult result
          = internalPrepareFlushCache(wal, myseqid, storesToFlush, status, writeFlushWalMarker);
        if (result.result == null) {
          return internalFlushCacheAndCommit(wal, status, result, storesToFlush);
        } else {
          return result.result; // early exit due to failure from prepare stage
        }
      }
    
    //首先我们看看memestore的结构,memestore包含两个segement用于写入的mutableSegement和用于flush的immutableSegement,每次flush是先将mutable变为immutableSegement将这个segement用于flush,新生成一个mutableSegement用于新来的写入
    public abstract class AbstractMemStore implements MemStore {
      // active segment absorbs write operations
      protected volatile MutableSegment active;
      // Snapshot of memstore.  Made for flusher.
      protected volatile ImmutableSegment snapshot;
    ......................
    }
    // 每一个cf对应一个Hstore,每一个Hstore对应一个StoreFlushContext,prepare时调用StoreFlushContext.prepare()
        @Override
        public void prepare() {
          // passing the current sequence number of the wal - to allow bookkeeping in the memstore
          this.snapshot = memstore.snapshot();
          this.cacheFlushCount = snapshot.getCellsCount();
          this.cacheFlushSize = snapshot.getDataSize();
          committedFiles = new ArrayList<Path>(1);
        }
    //memstore做snapshot时具体源码如下直接将现有数据变为immutableSegement然后返回:
      @Override
      public MemStoreSnapshot snapshot() {
        // If snapshot currently has entries, then flusher failed or didn't call
        // cleanup.  Log a warning.
        if (!this.snapshot.isEmpty()) {
          LOG.warn("Snapshot called again without clearing previous. " +
              "Doing nothing. Another ongoing flush or did we fail last attempt?");
        } else {
          this.snapshotId = EnvironmentEdgeManager.currentTime();
          if (!this.active.isEmpty()) {
            ImmutableSegment immutableSegment = SegmentFactory.instance().
                createImmutableSegment(this.active);
            this.snapshot = immutableSegment;
            resetActive();
          }
        }
        return new MemStoreSnapshot(this.snapshotId, this.snapshot);
      }
    // prepare阶段对updatesLock.writeLock()进行操作所以此时是禁止写入的
    

    (1)prepare阶段:遍历当前Region中的所有Memstore,将Memstore中当前数据集kvset做一个快照snapshot,然后再新建一个新的kvset。后期的所有写入操作都会写入新的kvset中,而整个flush阶段读操作会首先分别遍历kvset和snapshot,如果查找不到再会到HFile中查找。prepare阶段需要加一把updateLock对写请求阻塞,结束之后会释放该锁。因为此阶段没有任何费时操作,因此持锁时间很短。

           // 循环调用HStore flush
          for (StoreFlushContext flush : storeFlushCtxs.values()) {
            flush.flushCache(status);
          }
    // 具体实现类有两个DefaultStoreFlusher和StripeStoreFlusher,以DefaultStoreFlusher为例
      @Override
      public List<Path> flushSnapshot(MemStoreSnapshot snapshot, long cacheFlushId,
          MonitoredTask status, ThroughputController throughputController) throws IOException {
        ArrayList<Path> result = new ArrayList<Path>();
        int cellsCount = snapshot.getCellsCount();
        if (cellsCount == 0) return result; // don't flush if there are no entries
    
        // Use a store scanner to find which rows to flush.
        // 这里使用的是当前所有scanner中最小的readPoint,这样保证flush到磁盘上的文件一定可读的,flush思路就是从snapshot中scan出kv写到临时 hfile中
        long smallestReadPoint = store.getSmallestReadPoint();
        InternalScanner scanner = createScanner(snapshot.getScanner(), smallestReadPoint);
        if (scanner == null) {
          return result; // NULL scanner returned from coprocessor hooks means skip normal processing
        }
    
        StoreFile.Writer writer;
        try {
          // TODO:  We can fail in the below block before we complete adding this flush to
          //        list of store files.  Add cleanup of anything put on filesystem if we fail.
          synchronized (flushLock) {
            status.setStatus("Flushing " + store + ": creating writer");
            // Write the map out to the disk
            writer = store.createWriterInTmp(cellsCount, store.getFamily().getCompression(),
                /* isCompaction = */ false,
                /* includeMVCCReadpoint = */ true,
                /* includesTags = */ snapshot.isTagsPresent(),
                /* shouldDropBehind = */ false,
                snapshot.getTimeRangeTracker());
            IOException e = null;
            try {
              performFlush(scanner, writer, smallestReadPoint, throughputController);
            } catch (IOException ioe) {
              e = ioe;
              // throw the exception out
              throw ioe;
            } finally {
              if (e != null) {
                writer.close();
              } else {
                finalizeWriter(writer, cacheFlushId, status);
              }
            }
          }
        } finally {
          scanner.close();
        }
        LOG.info("Flushed, sequenceid=" + cacheFlushId +", memsize="
            + StringUtils.humanReadableInt(snapshot.getDataSize()) +
            ", hasBloomFilter=" + writer.hasGeneralBloom() +
            ", into tmp file " + writer.getPath());
        result.add(writer.getPath());
        return result;
      }
    

    (2)flush阶段:遍历所有Memstore,将prepare阶段生成的snapshot持久化为临时文件,临时文件会统一放到目录.tmp下。这个过程因为涉及到磁盘IO操作,因此相对比较耗时。

     /*
       * Change storeFiles adding into place the Reader produced by this new flush.
       * @param sfs Store files
       * @param snapshotId
       * @throws IOException
       * @return Whether compaction is required.
       */
      private boolean updateStorefiles(final List<StoreFile> sfs, final long snapshotId)
          throws IOException {
        this.lock.writeLock().lock();
        try {
          this.storeEngine.getStoreFileManager().insertNewFiles(sfs);
          if (snapshotId > 0) {
            this.memstore.clearSnapshot(snapshotId);
          }
        } finally {
          // We need the lock, as long as we are updating the storeFiles
          // or changing the memstore. Let us release it before calling
          // notifyChangeReadersObservers. See HBASE-4485 for a possible
          // deadlock scenario that could have happened if continue to hold
          // the lock.
          this.lock.writeLock().unlock();
        }
    
        // Tell listeners of the change in readers.
        notifyChangedReadersObservers();
    
        if (LOG.isTraceEnabled()) {
          long totalSize = 0;
          for (StoreFile sf : sfs) {
            totalSize += sf.getReader().length();
          }
          String traceMessage = "FLUSH time,count,size,store size,store files ["
              + EnvironmentEdgeManager.currentTime() + "," + sfs.size() + "," + totalSize
              + "," + storeSize + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
          LOG.trace(traceMessage);
        }
    // move 完文件后判断一下需不需compact
        return needsCompaction();
      }
    

    (3)commit阶段:遍历所有的Memstore,将flush阶段生成的临时文件移到指定的ColumnFamily目录下,针对HFile生成对应的storefile和Reader,把storefile添加到HStore的storefiles列表中,最后再清空prepare阶段生成的snapshot。

    read以scan为例

      protected RegionScanner instantiateRegionScanner(Scan scan,
          List<KeyValueScanner> additionalScanners) throws IOException {
        // scan逆序的
        if (scan.isReversed()) {
          if (scan.getFilter() != null) {
            scan.getFilter().setReversed(true);
          }
    // 支持逆序scan,以下以正序scan为基础
          return new ReversedRegionScannerImpl(scan, additionalScanners, this);
        }
        return new RegionScannerImpl(scan, additionalScanners, this);
      }
    
    // 构造RegionScanScannerImpl中,一个region有多个store,针对每一个store我们有一个对应的KeyValueScanner
          try {
            for (Map.Entry<byte[], NavigableSet<byte[]>> entry : scan.getFamilyMap().entrySet()) {
              Store store = stores.get(entry.getKey());
              KeyValueScanner scanner = store.getScanner(scan, entry.getValue(), this.readPt);
              instantiatedScanners.add(scanner);
              // 1.没有filter 2.??? 3.filter对该cf永远返回true
              if (this.filter == null || !scan.doLoadColumnFamiliesOnDemand()
                  || this.filter.isFamilyEssential(entry.getKey())) {
                scanners.add(scanner);
              } else {
                joinedScanners.add(scanner);
              }
            }
    
    ......
       // 返回下一行
        @Override
        public boolean nextRaw(List<Cell> outResults, ScannerContext scannerContext)
            throws IOException {
          if (storeHeap == null) {
            // scanner is closed
            throw new UnknownScannerException("Scanner was closed");
          }
          boolean moreValues = false;
          if (outResults.isEmpty()) {
            // Usually outResults is empty. This is true when next is called
            // to handle scan or get operation.
            moreValues = nextInternal(outResults, scannerContext);
          } else {
            List<Cell> tmpList = new ArrayList<Cell>();
            moreValues = nextInternal(tmpList, scannerContext);
            outResults.addAll(tmpList);
          }
    
          // If the size limit was reached it means a partial Result is being
          // returned. Returning a
          // partial Result means that we should not reset the filters; filters
          // should only be reset in
          // between rows
          if (!scannerContext.midRowResultFormed())
            resetFilters();
    
          if (isFilterDoneInternal()) {
            moreValues = false;
          }
          return moreValues;
        }
    

    相关文章

      网友评论

          本文标题:hbase读写源码

          本文链接:https://www.haomeiwen.com/subject/gmmblqtx.html