环境信息
系统:win10
代码编辑器:IDEA
kylin:2.3.0
hadoop:2.7.1
本文介绍了kylin构建的第四个阶段,根据cuboid文件创建Hfile并且将cuboid Hfile加载到hbase之中,也就是BatchCubingJobBuilder2类中的build方法的第四个阶段。
outputSide.addStepPhase3_BuildCube(result);
@Override
public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(final CubeSegment seg) {
return new IMRBatchCubingOutputSide2() {
HBaseMRSteps steps = new HBaseMRSteps(seg);
@Override
public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId()));
}
@Override
public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
}
@Override
public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
// nothing to do
}
@Override
public IMROutputFormat getOuputFormat() {
return new HBaseMROutputFormat();
}
};
}
用到的是HBaseMROutput2Transition类中的内部类,分为两个步骤:
- createConvertCuboidToHfileStep 加载前面生成的cuboid文件,并生成Hfile
- createBulkLoadStep 根据生成的Hfile,使用Hbase bulkload 将hfile快速加载进入到Htable中。
1.createConvertCuboidToHfileStep
2.createBulkLoadStep
下面分析一下bulk load的代码。首先入口是 BatchCubingJobBuilder2 类中createBulkLoadStep方法。
public HadoopShellExecutable createBulkLoadStep(String jobId) {
//实例化一个hadoop任务
HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
StringBuilder cmd = new StringBuilder();
//设置前面保存的hfile路径
appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getHFilePath(jobId));
//设置htable name
appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());
//设置cube name
appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
//设置cmd 参数
bulkLoadStep.setJobParams(cmd.toString());
//设置job 类
bulkLoadStep.setJobClass(BulkLoadJob.class);
return bulkLoadStep;
}
上面代码中生成cmd.toString 参数的实例如下:
-input hdfs://server1.fibo.com:8020/apps/kylin/kylin_metadata/kylin-c2974055-2ccf-4b06-a98b-6f14e946e1ca/unload/hfile
-htablename KYLIN_BYH4SABC2Y -cubename unload
进入到BulkLoadJob类中,主要的run方法如下:
@Override
public int run(String[] args) throws Exception {
Options options = new Options();
options.addOption(OPTION_INPUT_PATH);
options.addOption(OPTION_HTABLE_NAME);
options.addOption(OPTION_CUBE_NAME);
parseOptions(options, args);
String tableName = getOptionValue(OPTION_HTABLE_NAME);
// /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
// end with "/"
String input = getOptionValue(OPTION_INPUT_PATH);
Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
FsShell shell = new FsShell(conf);
int exitCode = -1;
int retryCount = 10;
// 给上面的hfile文件赋予读权限
while (exitCode != 0 && retryCount >= 1) {
exitCode = shell.run(new String[] { "-chmod", "-R", "777", input });
retryCount--;
Thread.sleep(5000);
}
if (exitCode != 0) {
logger.error("Failed to change the file permissions: " + input);
throw new IOException("Failed to change the file permissions: " + input);
}
String[] newArgs = new String[2];
newArgs[0] = input;
newArgs[1] = tableName;
logger.debug("Start to run LoadIncrementalHFiles");
//将Hfile输出格式的输出加载到现有表中的工具。
int ret = ToolRunner.run(new LoadIncrementalHFiles(conf), newArgs);
logger.debug("End to run LoadIncrementalHFiles");
return ret;
}
可以看到,最后调用的是Hbase包的类LoadIncrementalHFiles,这是一个工具类,官方解释是“Tool to load the output of HFileOutputFormat into an existing table.” 。对于这个类的解释可以参考这个文章 HBase 写优化之 BulkLoad 实现数据快速入库
本文作者: 彭双宝
原文链接: http://blog.lovedata.net/5a9a3875.html
网友评论