美文网首页
HBase Bulkload 迁移数据及问题思考

HBase Bulkload 迁移数据及问题思考

作者: 六层楼那么高 | 来源:发表于2021-06-10 16:13 被阅读0次

最近进行 HBase 表跨集群迁移,使用组内同事给的方案 : bulkload,但是 bulkload 完之后出现了一系列预料之外的问题,记录如下:

hbase 表跨集群迁移步骤

  1. 目标集群创建 HBase 表,未进行预分区;
  2. 源集群 hbase 表对应的 hdfs 路径创建 snaphost ,注意此 snapshot 是 hdfs snaphost;
  3. distcp 拷贝 hdfs 快照数据到目标集群
  4. 目标集群遍历拷贝数据目录,逐个 bulkload 到 hbase

问题

bulkload 结束后,这张 700 亿行的大表,只有一个 region,花了近一周的时间 minor compaction,region split 好;

原先以为,拷贝的数据里面除了列族数据,还包含 region 信息(/region_id/.regioninfo),本来以为 bulkload 会自动处理 region,通过了解了一番源码发现,事情并非如此。

源码分析 bulkload 过程

1 初始化一个线程池,线程池 corePoolSize 来源于参数配置 hbase.loadincremental.threads.max,如果未配置,默认取 jvm 可以用到的处理器的个数(Runtime.getRuntime().availableProcessors())。

2 遍历搜索过滤出 HFile 文件:遍历目录,搜索 hfile。途中经过一系列校验,判断是否有 families,family name 是否合法性,跳过非 hfile 文件,例如 _ 开头的文件,引用,HFileLink,最后判断 HFile 格式的有效性。

扫描过程中会检查 HFile 文件的大小是否超出 region 大小的阈值(hbase.hregion.max.filesize,未配置的话默认是 10G),如果超出阈值,会打印提示这可能会导致出现 oversplitting 的问题。

将遍历后的 hfile 以对象 LoadQueueItem(byte[] family, Path hfilePath) 的方式放入队列 :Deque。

这一步就把 .regioninfo 就排除掉了,所以这个拷贝过来的 region 信息对于 bulkload 是无用了。

famliy 存在性校验:再经过一次筛选,判断是否有获取到的 family 是否是即将导入 HBase 表中的 family。

3 groupOrSplitPhase 阶段

这个阶段判断 hfile 判断应该写到表的哪个 region,如果跨 region 了,需要进行 split。
获取当前表所有 region 的 startkeys endkeys

final Pair<byte[][], byte[][]> startEndKeys = regionLocator.getStartEndKeys();

拿到即将导入的 hfile 的 startkey,通过二分查找算法在 startkey 列表里面搜索,如果搜到匹配的 startkey 直接返回数组索引值,没搜到,返回插入点,插入点是第一个大于值的索引位置。

int idx = Arrays.binarySearch(startEndKeys.getFirst(), first,
        Bytes.BYTES_COMPARATOR);
    if (idx < 0) {
      // not on boundary, returns -(insertion index).  Calculate region it
      // would be in.
      idx = -(idx + 1) - 1;
    }
final int indexForCallable = idx;

通过这个索引值,判断是否有跨 region 的 hfile,有的话需要 split。怎么判断是否不需要 split,也就是有合适的 region,满足其中两个条件之一即可:

  1. hfile 的 endkey 小于表 region 的 endkey
  2. 表 region endeky 为空,说明是最后一个 region ,理所当然可以写入
boolean lastKeyInRange =
      Bytes.compareTo(last, startEndKeys.getSecond()[idx]) < 0 ||
      Bytes.equals(startEndKeys.getSecond()[idx], HConstants.EMPTY_BYTE_ARRAY);
    if (!lastKeyInRange) {
      // split key 即为匹配 region 的 endkey
      List<LoadQueueItem> lqis = splitStoreFile(item, table,
          startEndKeys.getFirst()[indexForCallable],
          startEndKeys.getSecond()[indexForCallable]);
      return lqis;
    }

    // group regions.
    regionGroups.put(ByteBuffer.wrap(startEndKeys.getFirst()[idx]), item);
LOG.info("HFile at " + hfilePath + " no longer fits inside a single " +
"region. Splitting...");

拆分是把当前 HFile 拆分成两半,top 和 bottom 两部分,保留元数据,重建 bloom 过滤等,生成新的 HFile ,拆分策略是:根据匹配 region 的 endkey 的位置拆分成两个。

/**
   * Split a storefile into a top and bottom half, maintaining
   * the metadata, recreating bloom filters, etc.
   */
  static void splitStoreFile(
      Configuration conf, Path inFile,
      HColumnDescriptor familyDesc, byte[] splitKey,
      Path bottomOut, Path topOut) throws IOException
  {
    // Open reader with no block cache, and not in-memory
    Reference topReference = Reference.createTopReference(splitKey);
    Reference bottomReference = Reference.createBottomReference(splitKey);

    copyHFileHalf(conf, inFile, topOut, topReference, familyDesc);
    copyHFileHalf(conf, inFile, bottomOut, bottomReference, familyDesc);
  }

bulkLoadPhase:bulkload 阶段

计算出 region 信息之后,就是正式的 load 阶段,最终定位到 HStore 里面的 bulkLoadFile 方法

通过 StoreFile reader 读取 StoreFile ,获取写锁,往 storefile 中新增数据。

 private void bulkLoadHFile(StoreFile sf) throws IOException {
    StoreFile.Reader r = sf.getReader();
    this.storeSize += r.length();
    this.totalUncompressedBytes += r.getTotalUncompressedBytes();

    // Append the new storefile into the list
    this.lock.writeLock().lock();
    try {
      this.storeEngine.getStoreFileManager().insertNewFiles(Lists.newArrayList(sf));
    } finally {
      // We need the lock, as long as we are updating the storeFiles
      // or changing the memstore. Let us release it before calling
      // notifyChangeReadersObservers. See HBASE-4485 for a possible
      // deadlock scenario that could have happened if continue to hold
      // the lock.
      this.lock.writeLock().unlock();
    }
    notifyChangedReadersObservers();
    LOG.info("Loaded HFile " + sf.getFileInfo() + " into store '" + getColumnFamilyName());
    if (LOG.isTraceEnabled()) {
      String traceMessage = "BULK LOAD time,size,store size,store files ["
          + EnvironmentEdgeManager.currentTime() + "," + r.length() + "," + storeSize
          + "," + storeEngine.getStoreFileManager().getStorefileCount() + "]";
      LOG.trace(traceMessage);
    }
  }

结论

对于待迁移的 HBase 大表, bulkload 前尽可能在建表时做好预分区

相关文章

  • HBase Bulkload 迁移数据及问题思考

    最近进行 HBase 表跨集群迁移,使用组内同事给的方案 : bulkload,但是 bulkload 完之后出现...

  • Hbase Compression的配置与生效

    0. 问题 最近碰到一个问题: hbase使用bulkload方式, 两次把全量数据load到hbase表,并且手...

  • HBase数据导入-Bulkload问题

    使用bulkload导入,执行以下命令 最终执行速度特别慢,300G数据需要半个小时左右,原因是生成的hfile数...

  • Hbase BulkLoad方式导入百亿级数据

    众所周知Hbase的BulkLoad是最快导入数据的方式,在导入历史数据的时候,我们一般会选择使用BulkLoad...

  • 使用BulkLoad 向 HBase 中批量导入数据

    1 使用 BulkLoad 向 HBase 中批量导入数据 2 背景介绍 2.1 概述 我们经常面临向 HBase...

  • Hbase BulkLoad用法

    要导入大量数据,Hbase的BulkLoad是必不可少的,在导入历史数据的时候,我们一般会选择使用BulkLoad...

  • Hbase BulkLoad用法

    要导入大量数据,Hbase的BulkLoad是必不可少的,在导入历史数据的时候,我们一般会选择使用BulkLoad...

  • Hbase BulkLoad用法

    要导入大量数据,Hbase的BulkLoad是必不可少的,在导入历史数据的时候,我们一般会选择使用BulkLoad...

  • Hbase BulkLoad用法

    要导入大量数据,Hbase的BulkLoad是必不可少的,在导入历史数据的时候,我们一般会选择使用BulkLoad...

  • Hbase BulkLoad用法

    要导入大量数据,Hbase的BulkLoad是必不可少的,在导入历史数据的时候,我们一般会选择使用BulkLoad...

网友评论

      本文标题:HBase Bulkload 迁移数据及问题思考

      本文链接:https://www.haomeiwen.com/subject/aegueltx.html