一:目的:
主从集群replication双写下,为了不影响增量数据,将全量数据snapshot导入新集群表中
二:方案:
2.1 .cdh6以下hbase版本:
方案一:clone_snapshot +copyTable(整表执行,操作次数少;操作时间长;copy Table会影响到正常读写,由于region的split)
方案二:bulkload ,但是受region个数与列族个数影响,操作次数太多,不方便(优点就是快,因为没有写hbase那些流程,直接复制的HFILE文件)
2.2 cdh6以上hbase版本
方案:bulkload可以整表HFile导入,解决了CDH6以下的操作频繁问题;
缺点就是:操作时,HFile文件个数受限制,默认最大32;需要修改参数:hbase.mapreduce.bulkload.max.hfiles.perRegion.perFamily,如果参数修改不起作用,就修改源码,重新打hbase-server-xxx.jar吧。
三:CDH6以上如何支持整表bulkload操作:
3.1 首先查看CDH5.9源码实现:
类路径:hbase-server包下org.apache.hadoop.hbase.mapreduce下面
@Override
public int run(String[] args) throws Exception {
//因为没有loadtable参数,所以不会有三个参数的判断
if (args.length != 2) {
usage();
return -1;
}
initialize();
//第一个参数是路径,也就是region的路径
String dirPath = args[0];
TableName tableName = TableName.valueOf(args[1]);
boolean tableExists = this.doesTableExist(tableName);
if (!tableExists) {
if ("yes".equalsIgnoreCase(getConf().get(CREATE_TABLE_CONF_KEY, "yes"))) {
this.createTable(tableName, dirPath);
} else {
String errorMsg = format("Table '%s' does not exist.", tableName);
LOG.error(errorMsg);
throw new TableNotFoundException(errorMsg);
}
}
Path hfofDir = new Path(dirPath);
try (Connection connection = ConnectionFactory.createConnection(getConf());
HTable table = (HTable) connection.getTable(tableName);) {
//执行操作
doBulkLoad(hfofDir, table);
}
return 0;
}
3.2 CDH6.2判断源码实现:
类路径:hbase-server包下面org.apache.hadoop.hbase.tool,旧路径代码已经建议弃用了类路径
@Override
public int run(String[] args) throws Exception {
//参数判断
if (args.length != 2 && args.length != 3) {
usage();
return -1;
}
//获取参数中的路径
String dirPath = args[0];
TableName tableName = TableName.valueOf(args[1]);
//非loadtable
if (args.length == 2) {
return !run(dirPath, tableName).isEmpty() ? 0 : -1;
} else {//loadtable
//loadtable操作下一级目录region的遍历
Map<byte[], List<Path>> family2Files = Maps.newHashMap();
FileSystem fs = FileSystem.get(getConf());
for (FileStatus regionDir : fs.listStatus(new Path(dirPath))) {
FSVisitor.visitRegionStoreFiles(fs, regionDir.getPath(), (region, family, hfileName) -> {
Path path = new Path(regionDir.getPath(), new Path(family, hfileName));
byte[] familyName = Bytes.toBytes(family);
if (family2Files.containsKey(familyName)) {
family2Files.get(familyName).add(path);
} else {
family2Files.put(familyName, Lists.newArrayList(path));
}
});
}
return !run(family2Files, tableName).isEmpty() ? 0 : -1;
}
}
3.3 在低版本中支持loadtable功能:
只能修改源码,加入三级目录判断。然后再遍历导入bulkload HFile.
网友评论