美文网首页
HBase Compaction-(5)Minor Compac

HBase Compaction-(5)Minor Compac

作者: AlstonWilliams | 来源:发表于2019-01-18 10:26 被阅读24次

    以前,我一直都有一个误解,就是以为我们调用Admin.compact(table)时,它就一定会进行Minor Compaction。我也曾误认为,调用Admin.majorCompact(table)时,一定会进行Major Compaction。

    然后,事实并不是这样子。

    环境

    HBase rel-2.1.0

    解析

    我们先来看类图:


    在这里面,RatioBasedCompactionPolicy,ExploringCompactionPolicy以及FIFOCompactionPolicy判断是否尝试majorCompaction的逻辑都是一样的。而DateTieredCompactionPolicy则更加复杂。同样,DateTieredCompactionPolicy的逻辑我也还没理解,所以这篇文章中,不会介绍DateTieredCompactionPolicy。以后等我理解了,会专门写一系列文章来介绍它。

    HBase在进行Compaction时,满足下面这个条件时,会将minorCompaction转化成majorCompaction:

    • 这次minorCompaction选择了全部的HStoreFiles(这是前提条件,即使你调用Admin.majorCompaction(table),但是不满足这个条件,依然会执行Minor Compaction)
    • 选择出来的HStoreFiles的minTimestamp(代表这堆HStoreFiles中的最小的时间戳),距离现在已经过去了hbase.hregion.majorcompaction(默认是7天,好多文章都说是24小时,其实是不正确的,至少rel-2.1.0中不是这样子)这段时间。

    我们直接看代码.

    @Override
    public ScanType getScanType(CompactionRequestImpl request) {
        return request.isAllFiles() ? COMPACT_DROP_DELETES : COMPACT_RETAIN_DELETES;
    }
    

    这段代码在Compactor中。而request.isAllFiles()的定义为:

    @Override
    public boolean isAllFiles() {
      return this.isMajor == DisplayCompactionType.MAJOR
          || this.isMajor == DisplayCompactionType.ALL_FILES;
    }
    

    而对CompactionRequestImpl.isMajor的设置条件相当严格,我们可以看下代码:

    ArrayList<HStoreFile> filesToCompact = Lists.newArrayList(result.getFiles());
    removeExcessFiles(filesToCompact, isUserCompaction, isTryingMajor);
    result.updateFiles(filesToCompact);
    
    isAllFiles = (candidateFiles.size() == filesToCompact.size());
    result.setOffPeak(!filesToCompact.isEmpty() && !isAllFiles && mayUseOffPeak);
    result.setIsMajor(isTryingMajor && isAllFiles, isAllFiles);
    

    我们再进入CompactionRequestImpl.setIsMajor()看一下:

    /**
     * Specify if this compaction should be a major compaction based on the state of the store
     * @param isMajor <tt>true</tt> if the system determines that this compaction should be a major
     *          compaction
     */
    public void setIsMajor(boolean isMajor, boolean isAllFiles) {
      assert isAllFiles || !isMajor;
      this.isMajor = !isAllFiles ? DisplayCompactionType.MINOR
          : (isMajor ? DisplayCompactionType.MAJOR : DisplayCompactionType.ALL_FILES);
    }
    

    我们可以看到,只要没有选择全部HStoreFile,那么就对不起,我们不能进行Compaction。

    那么,什么时候才能选择全部的HStoreFile呢?

    这段代码在SortedCompactionPolicy.getCurrentEligibleFiles()中:

    protected ArrayList<HStoreFile> getCurrentEligibleFiles(ArrayList<HStoreFile> candidateFiles,
                                                            final List<HStoreFile> filesCompacting) {
      // candidates = all storefiles not already in compaction queue
      if (!filesCompacting.isEmpty()) {
        // exclude all files older than the newest file we're currently
        // compacting. this allows us to preserve contiguity (HBASE-2856)
        HStoreFile last = filesCompacting.get(filesCompacting.size() - 1);
        int idx = candidateFiles.indexOf(last);
        Preconditions.checkArgument(idx != -1);
        candidateFiles.subList(0, idx + 1).clear();
      }
      return candidateFiles;
    }
    

    可以看到,只要这个HStore中,已经有正在进行的Compaction,那么就不能选择全部的HStoreFiles。

    所以,总的来说,就是,只要这个HStore中,有正在执行的Compaction,我们就不能运行Major Compaction。而只能进行Minor Compaction。

    这是第一个条件。

    那么第二个条件呢?即选择出来的HStoreFiles的minTimestamp(代表这堆HStoreFiles中的最小的时间戳),距离现在已经过去了hbase.hregion.majorcompaction这段时间

    我们也是直接看代码。

    // Try a major compaction if this is a user-requested major compaction,
    // or if we do not have too many files to compact and this was requested as a major compaction
    boolean isTryingMajor = (forceMajor && isAllFiles && isUserCompaction)
            || (((forceMajor && isAllFiles) || shouldPerformMajorCompaction(candidateSelection))
            && (candidateSelection.size() < comConf.getMaxFilesToCompact()));
    

    我们可以看到,对于minorCompaction来说,其实最重要的地方,就在shouldPerformMajorCompaction(candidateSelection)这儿。

    RatioBasedCompactionPolicy.shouldPerformMajorCompaction(condidateSelection)代码如下:

    /*
     * @param filesToCompact Files to compact. Can be null.
     * @return True if we should run a major compaction.
     */
    @Override
    public boolean shouldPerformMajorCompaction(Collection<HStoreFile> filesToCompact)
      throws IOException {
      boolean result = false;
      long mcTime = getNextMajorCompactTime(filesToCompact);
      if (filesToCompact == null || filesToCompact.isEmpty() || mcTime == 0) {
        return result;
      }
      // TODO: Use better method for determining stamp of last major (HBASE-2990)
      long lowTimestamp = StoreUtils.getLowestTimestamp(filesToCompact);
      long now = EnvironmentEdgeManager.currentTime();
      if (lowTimestamp > 0L && lowTimestamp < (now - mcTime)) {
        String regionInfo;
        if (this.storeConfigInfo != null && this.storeConfigInfo instanceof HStore) {
          regionInfo = ((HStore)this.storeConfigInfo).getRegionInfo().getRegionNameAsString();
        } else {
          regionInfo = this.toString();
        }
        // Major compaction time has elapsed.
        long cfTTL = HConstants.FOREVER;
        if (this.storeConfigInfo != null) {
           cfTTL = this.storeConfigInfo.getStoreFileTtl();
        }
        if (filesToCompact.size() == 1) {
          // Single file
          HStoreFile sf = filesToCompact.iterator().next();
          OptionalLong minTimestamp = sf.getMinimumTimestamp();
          long oldest = minTimestamp.isPresent() ? now - minTimestamp.getAsLong() : Long.MIN_VALUE;
          /**
           *
           * Reading:
           *  One major compaction is executed just a moment ago,        
           *  and some data in this HStoreFile has expired.
           *
           * */
          if (sf.isMajorCompactionResult() && (cfTTL == Long.MAX_VALUE || oldest < cfTTL)) {
            float blockLocalityIndex =
              sf.getHDFSBlockDistribution().getBlockLocalityIndex(
              RSRpcServices.getHostname(comConf.conf, false));
            if (blockLocalityIndex < comConf.getMinLocalityToForceCompact()) {
              LOG.debug("Major compaction triggered on only store " + regionInfo
                + "; to make hdfs blocks local, current blockLocalityIndex is "
                + blockLocalityIndex + " (min " + comConf.getMinLocalityToForceCompact() + ")");
              result = true;
            } else {
              LOG.debug("Skipping major compaction of " + regionInfo
                + " because one (major) compacted file only, oldestTime " + oldest
                + "ms is < TTL=" + cfTTL + " and blockLocalityIndex is " + blockLocalityIndex
                + " (min " + comConf.getMinLocalityToForceCompact() + ")");
            }
          } else if (cfTTL != HConstants.FOREVER && oldest > cfTTL) {
            LOG.debug("Major compaction triggered on store " + regionInfo
              + ", because keyvalues outdated; time since last major compaction "
              + (now - lowTimestamp) + "ms");
            result = true;
          }
        } else {
          LOG.debug("Major compaction triggered on store " + regionInfo
            + "; time since last major compaction " + (now - lowTimestamp) + "ms");
          result = true;
        }
      }
      return result;
    }
    

    很简单,就是判断是否过去了hbase.hregion.majorcompaction这段时间,如果没有的话,那么不必多说,直接不执行。而如果确实过去了的话,并且仅有一个HStoreFile,那么需要根据HDFS block来确定是否值得做Major Compaction。如果不仅仅只有一个HStoreFile,那么也不多说,直接梭哈就完事了。

    测试代码

    package org.apache.hadoop.hbase.regionserver.compactions;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.hbase.*;
    import org.apache.hadoop.hbase.client.*;
    import org.apache.hadoop.hbase.regionserver.*;
    import org.apache.hadoop.hbase.testclassification.MediumTests;
    import org.apache.hadoop.hbase.testclassification.RegionServerTests;
    import org.apache.hadoop.hbase.util.*;
    import org.junit.BeforeClass;
    import org.junit.ClassRule;
    import org.junit.Rule;
    import org.junit.Test;
    import org.junit.experimental.categories.Category;
    import org.junit.rules.ExpectedException;
    
    import java.io.IOException;
    import java.util.ArrayList;
    import java.util.Collection;
    import java.util.List;
    import java.util.concurrent.ThreadLocalRandom;
    
    @Category({RegionServerTests.class, MediumTests.class})
    public class TestMinorCompactionUpgradeToMajorCompaction {
    
        @ClassRule
        public static final HBaseClassTestRule CLASS_RULE =
                HBaseClassTestRule.forClass(TestMinorCompactionUpgradeToMajorCompaction.class);
    
        private static final HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
    
        private final byte[] family = Bytes.toBytes("f");
    
        private final byte[] qualifier = Bytes.toBytes("q");
    
        @Rule
        public ExpectedException error = ExpectedException.none();
    
        public HStore getStoreWithName(TableName tableName) {
            MiniHBaseCluster cluster = TEST_UTIL.getMiniHBaseCluster();
            List<JVMClusterUtil.RegionServerThread> rsts = cluster.getRegionServerThreads();
            for (int i = 0; i < cluster.getRegionServerThreads().size(); i++) {
                HRegionServer hrs = rsts.get(i).getRegionServer();
                for (HRegion region : hrs.getRegions(tableName)) {
                    return region.getStores().iterator().next();
                }
            }
            return null;
        }
    
        private HStore prepareData(TableName tableName, boolean usedForMajor2Minor) throws IOException {
            Admin admin = TEST_UTIL.getAdmin();
            TableDescriptor desc = TableDescriptorBuilder.newBuilder(tableName)
                    .setValue(DefaultStoreEngine.DEFAULT_COMPACTION_POLICY_CLASS_KEY,
                            RatioBasedCompactionPolicy.class.getName())
                    .setValue(HConstants.HBASE_REGION_SPLIT_POLICY_KEY,
                            DisabledRegionSplitPolicy.class.getName())
                    .setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(family).build())
                    .build();
            admin.createTable(desc);
            Table table = TEST_UTIL.getConnection().getTable(tableName);
            TimeOffsetEnvironmentEdge edge =
                    (TimeOffsetEnvironmentEdge) EnvironmentEdgeManager.getDelegate();
    
            int[] sizes;
    
            if (usedForMajor2Minor) {
                sizes = new int[]{65159, 55159, 45163, 35163, 25167, 15167, 5167};
            } else {
                sizes = new int[]{65159, 55159, 45163, 35163, 25167};
            }
    
            for (int size : sizes) {
                    byte[] value = new byte[size];
                    ThreadLocalRandom.current().nextBytes(value);
                    table.put(new Put(Bytes.toBytes(size)).addColumn(family, qualifier, value));
                admin.flush(tableName);
                edge.increment(1001);
            }
            return getStoreWithName(tableName);
        }
    
        @BeforeClass
        public static void setEnvironmentEdge() throws Exception {
            EnvironmentEdge ee = new TimeOffsetEnvironmentEdge();
            EnvironmentEdgeManager.injectEdge(ee);
            Configuration conf = TEST_UTIL.getConfiguration();
            conf.setInt(HStore.BLOCKING_STOREFILES_KEY, 10000);
            conf.set("hbase.store.compaction.ratio", "1.0");
            conf.set("hbase.hstore.compaction.min", "3");
            conf.set("hbase.hstore.compaction.max", "5");
            conf.set("hbase.hregion.majorcompaction", "1000");
            TEST_UTIL.startMiniCluster(1);
        }
    
        @Test
        public void testMinorCompactionWithoutDelete() throws Exception {
            TableName tableName = TableName.valueOf("1");
            HStore store = prepareData(tableName, false);
    
            // >>>>>>>>>>>>>>>>>>>
            System.out.println("-------- Before compaction");
            for (HStoreFile hStoreFile : store.getStorefiles()) {
                System.out.println("--------- " + hStoreFile);
                StoreFileReader storeFileReader = hStoreFile.getReader();
                System.out.println("--------- size: " + storeFileReader.length());
            }
            // <<<<<<<<<<<<<<<<<<<
    
            Thread.sleep(10 * 1000);
    
            TEST_UTIL.getAdmin().compact(tableName);
    
            Thread.sleep(10 * 1000);
    
            // >>>>>>>>>>>>>>>>>>>
            System.out.println("-------- After compaction");
            for (HStoreFile hStoreFile : store.getStorefiles()) {
                System.out.println("--------- " + hStoreFile);
                StoreFileReader storeFileReader = hStoreFile.getReader();
                System.out.println("--------- size: " + storeFileReader.length());
            }
            // <<<<<<<<<<<<<<<<<<<
    
        }
    
        // Test even though minor compaction, if it choose all files to compact,
        // then deleted data also be removed
        @Test
        public void testMinorCompactionWithDelete() throws Exception {
    
            TableName tableName = TableName.valueOf("2");
    
            HStore store = prepareData(tableName, false);
    
            // >>>>>>>>>>>>>>>>>>>
            System.out.println("-------- Before compaction");
            for (HStoreFile hStoreFile : store.getStorefiles()) {
                System.out.println("--------- " + hStoreFile);
                StoreFileReader storeFileReader = hStoreFile.getReader();
                System.out.println("--------- size: " + storeFileReader.length());
            }
            // <<<<<<<<<<<<<<<<<<<
    
            Table table = TEST_UTIL.getConnection().getTable(tableName);
    
    
            Delete delete = new Delete(Bytes.toBytes(25167));
            delete.addColumns(family, qualifier);
            table.delete(delete);
    
            Admin admin = TEST_UTIL.getAdmin();
            // Don't forget flushing table. Otherwise the row is just deleted in MemStore. Not HStoreFiles.
            admin.flush(tableName);
    
            TEST_UTIL.getAdmin().compact(tableName);
    
            Thread.sleep(10 * 1000);
    
            // >>>>>>>>>>>>>>>>>>>
            System.out.println("-------- After compaction");
            for (HStoreFile hStoreFile : store.getStorefiles()) {
                System.out.println("--------- " + hStoreFile);
                StoreFileReader storeFileReader = hStoreFile.getReader();
                System.out.println("--------- size: " + storeFileReader.length());
            }
            // <<<<<<<<<<<<<<<<<<<
    
        }
    
        // Test if there is a compaction is executed, and issue major compaction,
        // it also be executed as minor compaction.
        // Add breakpoint in {@link SortedCompactionPolicy.selectCompaction} to see if major compaction or minor compaction is executed.
        @Test
        public void testMajorCompaction2MinorCompaction() throws Exception {
            TableName tableName = TableName.valueOf("3");
            HStore store = prepareData(tableName, true);
    
            int index = 0;
            List<HStoreFile> filesCompacting = new ArrayList<>();
    
            // >>>>>>>>>>>>>>>>>>>
            System.out.println("-------- Before compaction");
            List<HStoreFile> allStoreFiles = (List<HStoreFile>) store.getStorefiles();
            for (int i = 0; i < allStoreFiles.size() ; i++) {
                HStoreFile hStoreFile = allStoreFiles.get(i);
                System.out.println("--------- " + hStoreFile);
                StoreFileReader storeFileReader = hStoreFile.getReader();
                System.out.println("--------- size: " + storeFileReader.length());
                if (index < 3) {
                    filesCompacting.add(hStoreFile);
                }
                index++;
            }
            // <<<<<<<<<<<<<<<<<<<
    
            store.addToCompactingFiles(filesCompacting);
            TEST_UTIL.getAdmin().majorCompact(tableName);
    
            Thread.sleep(5 * 1000);
    
            // >>>>>>>>>>>>>>>>>>>
            System.out.println("-------- After compaction");
            for (HStoreFile hStoreFile : store.getStorefiles()) {
                System.out.println("--------- " + hStoreFile);
                StoreFileReader storeFileReader = hStoreFile.getReader();
                System.out.println("--------- size: " + storeFileReader.length());
            }
            // <<<<<<<<<<<<<<<<<<<
    
        }
    }
    

    相关文章

      网友评论

          本文标题:HBase Compaction-(5)Minor Compac

          本文链接:https://www.haomeiwen.com/subject/tdegdqtx.html