美文网首页大数据大数据,机器学习,人工智能玩转大数据
Kylin构建流程分析-加载HFile到Hbase中(Load

Kylin构建流程分析-加载HFile到Hbase中(Load

作者: b00d1f0f0afd | 来源:发表于2018-04-11 21:18 被阅读271次
    麒麟出没,必有祥瑞

    环境信息
    系统:win10
    代码编辑器:IDEA
    kylin:2.3.0
    hadoop:2.7.1

    本文介绍了kylin构建的第四个阶段,根据cuboid文件创建Hfile并且将cuboid Hfile加载到hbase之中,也就是BatchCubingJobBuilder2类中的build方法的第四个阶段。

    outputSide.addStepPhase3_BuildCube(result);
    
    @Override
        public IMRBatchCubingOutputSide2 getBatchCubingOutputSide(final CubeSegment seg) {
            return new IMRBatchCubingOutputSide2() {
                HBaseMRSteps steps = new HBaseMRSteps(seg);
    
                @Override
                public void addStepPhase2_BuildDictionary(DefaultChainedExecutable jobFlow) {
                    jobFlow.addTask(steps.createCreateHTableStepWithStats(jobFlow.getId()));
                }
    
                @Override
                public void addStepPhase3_BuildCube(DefaultChainedExecutable jobFlow) {
                    jobFlow.addTask(steps.createConvertCuboidToHfileStep(jobFlow.getId()));
                    jobFlow.addTask(steps.createBulkLoadStep(jobFlow.getId()));
                }
    
                @Override
                public void addStepPhase4_Cleanup(DefaultChainedExecutable jobFlow) {
                    // nothing to do
                }
    
                @Override
                public IMROutputFormat getOuputFormat() {
                    return new HBaseMROutputFormat();
                }
            };
        }
    

    用到的是HBaseMROutput2Transition类中的内部类,分为两个步骤:

    • createConvertCuboidToHfileStep 加载前面生成的cuboid文件,并生成Hfile
    • createBulkLoadStep 根据生成的Hfile,使用Hbase bulkload 将hfile快速加载进入到Htable中。

    1.createConvertCuboidToHfileStep

    2.createBulkLoadStep

    下面分析一下bulk load的代码。首先入口是 BatchCubingJobBuilder2 类中createBulkLoadStep方法。

    public HadoopShellExecutable createBulkLoadStep(String jobId) {
            //实例化一个hadoop任务
            HadoopShellExecutable bulkLoadStep = new HadoopShellExecutable();
            bulkLoadStep.setName(ExecutableConstants.STEP_NAME_BULK_LOAD_HFILE);
    
            StringBuilder cmd = new StringBuilder();
            //设置前面保存的hfile路径
            appendExecCmdParameters(cmd, BatchConstants.ARG_INPUT, getHFilePath(jobId));
            //设置htable name
            appendExecCmdParameters(cmd, BatchConstants.ARG_HTABLE_NAME, seg.getStorageLocationIdentifier());
            //设置cube name
            appendExecCmdParameters(cmd, BatchConstants.ARG_CUBE_NAME, seg.getRealization().getName());
            //设置cmd 参数
            bulkLoadStep.setJobParams(cmd.toString());
            //设置job 类
            bulkLoadStep.setJobClass(BulkLoadJob.class);
            return bulkLoadStep;
        }
    

    上面代码中生成cmd.toString 参数的实例如下:

     -input hdfs://server1.fibo.com:8020/apps/kylin/kylin_metadata/kylin-c2974055-2ccf-4b06-a98b-6f14e946e1ca/unload/hfile
     -htablename KYLIN_BYH4SABC2Y -cubename unload
    

    进入到BulkLoadJob类中,主要的run方法如下:

       @Override
        public int run(String[] args) throws Exception {
            Options options = new Options();
    
            options.addOption(OPTION_INPUT_PATH);
            options.addOption(OPTION_HTABLE_NAME);
            options.addOption(OPTION_CUBE_NAME);
            parseOptions(options, args);
            String tableName = getOptionValue(OPTION_HTABLE_NAME);
            // /tmp/kylin-3f150b00-3332-41ca-9d3d-652f67f044d7/test_kylin_cube_with_slr_ready_2_segments/hfile/
            // end with "/"
            String input = getOptionValue(OPTION_INPUT_PATH);
            Configuration conf = HBaseConnection.getCurrentHBaseConfiguration();
            FsShell shell = new FsShell(conf);
            int exitCode = -1;
            int retryCount = 10;
            // 给上面的hfile文件赋予读权限
            while (exitCode != 0 && retryCount >= 1) {
                exitCode = shell.run(new String[] { "-chmod", "-R", "777", input });
                retryCount--;
                Thread.sleep(5000);
            } 
            if (exitCode != 0) {
                logger.error("Failed to change the file permissions: " + input);
                throw new IOException("Failed to change the file permissions: " + input);
            }
            String[] newArgs = new String[2];
            newArgs[0] = input;
            newArgs[1] = tableName;
            logger.debug("Start to run LoadIncrementalHFiles");
            //将Hfile输出格式的输出加载到现有表中的工具。
            int ret = ToolRunner.run(new LoadIncrementalHFiles(conf), newArgs);
            logger.debug("End to run LoadIncrementalHFiles");
            return ret;
        }
    

    可以看到,最后调用的是Hbase包的类LoadIncrementalHFiles,这是一个工具类,官方解释是“Tool to load the output of HFileOutputFormat into an existing table.” 。对于这个类的解释可以参考这个文章 HBase 写优化之 BulkLoad 实现数据快速入库

    本文作者: 彭双宝
    原文链接: http://blog.lovedata.net/5a9a3875.html

    相关文章

      网友评论

        本文标题:Kylin构建流程分析-加载HFile到Hbase中(Load

        本文链接:https://www.haomeiwen.com/subject/jmrzhftx.html