美文网首页BigData技术学习
Hbase BulkLoad Snapshot On Table

Hbase BulkLoad Snapshot On Table

作者: sunTengSt | 来源:发表于2019-06-20 15:10 被阅读27次

    一:目的:

    主从集群replication双写下,为了不影响增量数据,将全量数据snapshot导入新集群表中

    二:方案:

    2.1 .cdh6以下hbase版本:

    方案一:clone_snapshot +copyTable(整表执行,操作次数少;操作时间长;copy Table会影响到正常读写,由于region的split)
    方案二:bulkload ,但是受region个数与列族个数影响,操作次数太多,不方便(优点就是快,因为没有写hbase那些流程,直接复制的HFILE文件)

    2.2 cdh6以上hbase版本

    方案:bulkload可以整表HFile导入,解决了CDH6以下的操作频繁问题;
    缺点就是:操作时,HFile文件个数受限制,默认最大32;需要修改参数:hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily,如果参数修改不起作用,就修改源码,重新打hbase-server-xxx.jar吧。

    三:CDH6以上如何支持整表bulkload操作:

    3.1 首先查看CDH5.9源码实现:

    类路径:hbase-server包下org.apache.hadoop.hbase.mapreduce下面

    @Override
    public int run(String[] args) throws Exception {
        //因为没有loadtable参数,所以不会有三个参数的判断
      if (args.length != 2) {
        usage();
        return -1;
      }
    
      initialize();
    //第一个参数是路径,也就是region的路径
      String dirPath = args[0];
      TableName tableName = TableName.valueOf(args[1]);
    
      boolean tableExists = this.doesTableExist(tableName);
      if (!tableExists) {
        if ("yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) {
          this.createTable(tableName, dirPath);
        } else {
          String errorMsg = format("Table '%s' does not exist.", tableName);
          LOG.error(errorMsg);
          throw new TableNotFoundException(errorMsg);
        }
      }
    
      Path hfofDir = new Path(dirPath);
    
      try (Connection connection = ConnectionFactory.createConnection(getConf());
          HTable table = (HTable) connection.getTable(tableName);) {
        //执行操作
        doBulkLoad(hfofDir, table);
      }
      return 0;
    }
    

    3.2 CDH6.2判断源码实现:

    类路径:hbase-server包下面org.apache.hadoop.hbase.tool,旧路径代码已经建议弃用了类路径

    @Override
    public int run(String[] args) throws Exception {
    //参数判断
        if (args.length != 2 && args.length != 3) {
        usage();
        return -1;
      }
      //获取参数中的路径
      String dirPath = args[0];
      TableName tableName = TableName.valueOf(args[1]);
    
      //非loadtable
      if (args.length == 2) {
        return !run(dirPath, tableName).isEmpty() ? 0 : -1;
      } else {//loadtable
      //loadtable操作下一级目录region的遍历
        Map<byte[], List<Path>> family2Files = Maps.newHashMap();
        FileSystem fs = FileSystem.get(getConf());
        for (FileStatus regionDir : fs.listStatus(new Path(dirPath))) {
          FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> {
            Path path = new Path(regionDir.getPath(), new Path(family, hfileName));
            byte[] familyName = Bytes.toBytes(family);
            if (family2Files.containsKey(familyName)) {
              family2Files.get(familyName).add(path);
            } else {
              family2Files.put(familyName, Lists.newArrayList(path));
            }
          });
        }
        return !run(family2Files, tableName).isEmpty() ? 0 : -1;
      }
    }
    

    3.3 在低版本中支持loadtable功能:

    只能修改源码,加入三级目录判断。然后再遍历导入bulkload HFile.

    相关文章

      网友评论

        本文标题:Hbase BulkLoad Snapshot On Table

        本文链接:https://www.haomeiwen.com/subject/nyfjqctx.html