HBase 1.2.0源码分析:Compact

作者: Alex90 | 来源:发表于2018-06-25 12:19 被阅读48次

    Compact是指HBase表中HRegion上某个Column Family下,部分或全部HFiles的合并。由于数据持续写入的过程中,MemStore达到一定阈值,被flush到磁盘上,形成许多的小文件,这些文件如果不做处理,将会严重影响HBase数据读取的效率。所以,在HBase系统内部,需要定期在满足一定条件的情况下,或者由人为手动触发,将这许多文件合并成一个大文件,称为Compact。

    1. Compact通过RPC调用触发,RSRpcServices(RegionServer RPC Service),org.apache.hadoop.hbase.regionserver.RSRpcServices
    @QosPriority(priority=HConstants.ADMIN_QOS)     // 表示服务的相对优先级,Provides a basic notion of quality of service (QOS).
    public CompactRegionResponse compactRegion(final RpcController controller,
        final CompactRegionRequest request) throws ServiceException {
        
      try {
        checkOpen();        // 检查RegionServer的状态:isOnline,isAborted、isStopped、fsOk
        Region region = getRegion(request.getRegion());     // 获取要操作的Region
        ...
        boolean major = false;      // 是否执行major compact
        byte [] family = null;      // Request是否有column family信息
        Store store = null;         // Column family对应的Store
        if (request.hasFamily()) {  // 获取存储列族的Store
          family = request.getFamily().toByteArray();
          store = region.getStore(family);
          if (store == null) {
            throw new ServiceException(...);
          }
        }
    
        if (request.hasMajor()) { 
          major = request.getMajor();
        }
    
        if (major) {        // 如果有列族信息对列族的Store执行,否则对整个Region执行Major Compaction
        // 这里没有真正执行Compaction,只是设置 this.forceMajor = true;
          if (family != null) {
            store.triggerMajorCompaction(); 
          } else {
            region.triggerMajorCompaction();
          }
        }
    
        // 差别就是是否有Store
        if(family != null) {
          regionServer.compactSplitThread.requestCompaction(region, store, log, Store.PRIORITY_USER, null, RpcServer.getRequestUser());
        } else {
          regionServer.compactSplitThread.requestCompaction(region, log, Store.PRIORITY_USER, null, RpcServer.getRequestUser());
        }
        
        return CompactRegionResponse.newBuilder().build(); 
      } catch (IOException ie) {
        throw new ServiceException(ie);
      }
    }
    

    获取Region

    protected Region getRegionByEncodedName(byte[] regionName, String encodedRegionName)
      throws NotServingRegionException {
      Region region = this.onlineRegions.get(encodedRegionName);    // 从online的region列表中获取
      if (region == null) {
        throw new Excetion(....);
        // 不是online的region执行compact操作失败
        // 根据region的状态提示异常,例如:正在move的region
      }
      return region;
    }
    

    Map包含所有的因为move操作关闭的region维护在Map中

    protected Map<String, MovedRegionInfo> movedRegions;
    
    1. 进入Compact逻辑
      org.apache.hadoop.hbase.regionserver.CompactSplitThread

    如果没有传入Column family,遍历所有的store,执行requestCompactionInternal

    private List<CompactionRequest> requestCompactionInternal(final Region r, final String why,
        int p, List<Pair<CompactionRequest, Store>> requests, boolean selectNow, User user)
            throws IOException {
        
        for (Store s : r.getStores()) {
          CompactionRequest cr = requestCompactionInternal(r, s, why, p, null, selectNow, user);
            ...
        }
    
    }
    
    • HStore请求requestCompaction
      org.apache.hadoop.hbase.regionserver.HStore
    @Override
    public CompactionContext requestCompaction(int priority, final CompactionRequest baseRequest,
        User user) throws IOException {
      if (!this.areWritesEnabled()) {   // 如果禁用写入,不进行压缩。
        return null;
      }
      
      removeUnneededFiles();    // 在进行压缩之前,试着去掉不需要的文件来简化事情。
      // 根据hbase.store.delete.expired.storefile判断是否删除过期的文件
    
      final CompactionContext compaction = storeEngine.createCompaction();
      CompactionRequest request = null;
      this.lock.readLock().lock();  // 只读锁
      try {
        synchronized (filesCompacting) {
          final Store thisStore = this;
          if (this.getCoprocessorHost() != null) {
            // Coprocessor是0.92之后引入的协处理器,实现一些特性:建立二次索引、复杂过滤器以及访问控制等,先不看这部分逻辑
          }
    
          // 通用情况
          if (!compaction.hasSelection()) {         // this.request != null;
            boolean isUserCompaction = priority == Store.PRIORITY_USER;     // true
            boolean mayUseOffPeak = offPeakHours.isOffPeakHour() &&
                offPeakCompactionTracker.compareAndSet(false, true);        // 判断是否是使用高峰
            try {
              compaction.select(this.filesCompacting, isUserCompaction,
                mayUseOffPeak, forceMajor && filesCompacting.isEmpty());    // 调用以选择用于压缩的文件
            } catch (IOException e) {
                ...
              throw e;
            }
            ...
          }
    
          if (baseRequest != null) {
            // 如果baseRequest不是null,比较baseReques和compaction的Request,判断哪些文件需要压缩
            ...
          }
    
          //得到结果文件列表
          request = compaction.getRequest();
          final Collection<StoreFile> selectedFiles = request.getFiles();
          if (selectedFiles.isEmpty()) {
            return null;
          }
    
          addToCompactingFiles(selectedFiles);      // 添加到filesCompacting
    
          // 根据request判断是否是major compacti
          this.forceMajor = this.forceMajor && !request.isMajor();
    
          // 设置公共请求属性,设置优先级
          request.setPriority((priority != Store.NO_PRIORITY) ? priority : getCompactPriority());   
          request.setDescription(getRegionInfo().getRegionNameAsString(), getColumnFamilyName());
        }
      } finally {
        this.lock.readLock().unlock();
      }
      this.region.reportCompactionRequestStart(request.isMajor());      // 计数
      return compaction;
    }
    

    storeEngine根据hbase.hstore.engine.class配置获取,默认是DefaultStoreEngine
    所以compaction默认是org.apache.hadoop.hbase.regionserver.DefaultCompactionContext实现

    this.storeEngine = createStoreEngine(this, this.conf, this.comparator);
    CompactionContext compaction = storeEngine.createCompaction();
    

    compactionPolicy根据hbase.hstore.defaultengine.compactionpolicy.class获取,默认是ExploringCompactionPolicy
    selectCompaction由父类org.apache.hadoop.hbase.regionserver.compactions.RatioBasedCompactionPolicy实现,不深入介绍

    request = compactionPolicy.selectCompaction(storeFileManager.getStorefiles(),
            filesCompacting, isUserCompaction, mayUseOffPeak, forceMajor);
    
    • requestCompactionInternal
    // request = null, selectNow = true
    private synchronized CompactionRequest requestCompactionInternal(final Region r, final Store s,
        final String why, int priority, CompactionRequest request, boolean selectNow, User user)
            throws IOException {
            
      if (this.server.isStopped()
          || (r.getTableDesc() != null && !r.getTableDesc().isCompactionEnabled())) {
        // 如果Region所属的table设置了COMPACTION_ENABLED=false,不会执行任何Compaction
        return null;
      }
    
      CompactionContext compaction = null;
      if (selectNow) {
        compaction = selectCompaction(r, s, priority, request, user);   // 对Store获取CompactionContext,包含需要压缩的文件
        // CompactionContext是合并的上下文类。该类含有运行一个合并所必需的全部“物理”细节
        if (compaction == null) return null; 
      }
    
      //这里假设大多数压缩是小的。因此,将系统压缩放入小池中,在必要时移动到大型池中。
      // throttleCompaction判断compactionSize > comConf.getThrottlePoint(); 
      // hbase.regionserver.thread.compaction.throttle参数设置
      ThreadPoolExecutor pool = (selectNow && s.throttleCompaction(compaction.getRequest().getSize()))
        ? longCompactions : shortCompactions;
      pool.execute(new CompactionRunner(s, r, compaction, pool, user));    // 多线程执行Compact,执行逻辑在CompactionRunner
      return selectNow ? compaction.getRequest() : null;
    }
    
    1. 线程执行Compact
    private class CompactionRunner implements Runnable, Comparable<CompactionRunner> {
      @Override
      public void run() {
        if (server.isStopped()
            || (region.getTableDesc() != null && !region.getTableDesc().isCompactionEnabled())) {
            return;         //判断RegionServer状态和Tabel是否开启Compaction
          }
          doCompaction(user);
        } 
    
        private void doCompaction(User user) {
          // 通用逻辑,系统compaction,不包含file selection
          if (this.compaction == null) {
            // 这里判断Store的优先级是否改变,以避免阻塞潜在的更高优先级。
          }
    
          this.compaction = selectCompaction(this.region, this.store, queuedPriority, null, user);
          // 这里重复执行,之前selectNow=true时已经执行过一次,可能为了防止数据改变
        }
            
        // 接下来可以进行压缩
        this.compaction.getRequest().beforeExecute();     //现在没有做任何操作
        try {
          boolean completed = region.compact(compaction, store,   compactionThroughputController, user);
          if (completed) {
            if (store.getCompactPriority() <= 0) {
              // 退化情况:重新执行requestCompactionInternal
              requestSystemCompaction(region, store, "Recursive enqueue");
            } else {
              // 查看压缩后是否导致超出最大区域大小,需要进行Split。参考Split操作
              requestSplit(region);
            }
          }
        } catch (Exception ex) {
        ...
        } finally {
          LOG.debug("CompactSplitThread Status: " + CompactSplitThread.this);
        }
        this.compaction.getRequest().afterExecute();   //现在没有做任何操作
      }
    }
    
    • 真正的compact逻辑
      org.apache.hadoop.hbase.regionserver.HRegion
    public boolean compact(CompactionContext compaction, Store store,
        CompactionThroughputController throughputController, User user) throws IOException {
      if (this.closing.get() || this.closed.get()) {    //判断Region是否close
        store.cancelRequestedCompaction(compaction);
        return false;
      }
    
      MonitoredTask status = null;
      boolean requestNeedsCancellation = true;
      lock.readLock().lock();       
      try {
        byte[] cf = Bytes.toBytes(store.getColumnFamilyName());
        if (stores.get(cf) != store) {  // 如果因为各种情况导致:根据cf获取的store和之前获取的store已经不一样了,退出compact
          return false;
        }
    
        if (this.closed.get()) {        //再判断Region是否close
          return false;
        }
    
          try {
            store.compact(compaction, throughputController, user);      // Store执行compact
          } catch (InterruptedIOException iioe) {
            ...
          }
       
        return true;
      } finally {
        try {
          if (requestNeedsCancellation) store.cancelRequestedCompaction(compaction);
          if (status != null) status.cleanup();
        } finally {
          lock.readLock().unlock();
        }
      }
    }
    
    • 文件Compact
      org.apache.hadoop.hbase.regionserver.HStore
    @Override
    public List<StoreFile> compact(CompactionContext compaction,
      CompactionThroughputController throughputController, User user) throws IOException {
      List<StoreFile> sfs = null;
      CompactionRequest cr = compaction.getRequest();
      try {
        // 如果有一个有效的压缩请求,在这里做所有明智性检查(sanity check),因为我们需要在下面的最后一个块中清除它之后的清理。
        Collection<StoreFile> filesToCompact = cr.getFiles();
        synchronized (filesCompacting) {
          // sanity check:正在压缩这个Store的文件
          Preconditions.checkArgument(filesCompacting.containsAll(filesToCompact));
        }
    
        // 开始压缩
        List<Path> newFiles = compaction.compact(throughputController, user);
    
        long outputBytes = 0L;
        if (!this.conf.getBoolean("hbase.hstore.compaction.complete", true)) {
            // 压缩被停止的处理
            ...
        }
    
        // 完成压缩所需的步骤。
        sfs = moveCompatedFilesIntoPlace(cr, newFiles, user);
        writeCompactionWalRecord(filesToCompact, sfs);
        replaceStoreFiles(filesToCompact, sfs);
        ...
        // 这时候Store将使用所有新的文件。
        completeCompaction(filesToCompact, true);       // 存档旧文件和更新存储大小。
    
        if (region.getRegionServerServices() != null
            && region.getRegionServerServices().getMetrics() != null) {
          region.getRegionServerServices().getMetrics().updateCompaction(cr.isMajor(),
            now - compactionStartTime, cr.getFiles().size(), newFiles.size(), cr.getSize(),
            outputBytes);
        }
    
        return sfs;
      } finally {
        finishCompactionRequest(cr);
      }
    }
    

    相关文章

      网友评论

        本文标题:HBase 1.2.0源码分析:Compact

        本文链接:https://www.haomeiwen.com/subject/xyxyyftx.html