美文网首页Java玩转大数据flink
Flink 源码之数据写入HBase

Flink 源码之数据写入HBase

作者: AlienPaul | 来源:发表于2021-03-10 16:47 被阅读0次

    Flink源码分析系列文档目录

    请点击:Flink 源码分析系列文档目录

    前言

    近期有同事询问Flink写入数据到HBase的方法。借着这个机会分析下Flink 写入HBase相关部分的源代码。

    HBaseSinkFunction

    HBaseSinkFunction是Flink写入数据到HBase的官方功能实现。

    当然,作为一个数据落地端,HBaseSinkFunction毫无疑问需要实现SinkFunction接口。SinkFunction的使用参见:Flink 源码之两阶段提交

    作为一个有状态的数据落地端,HBaseSinkFunction继承自RichSinkFunction。我们从它的open方法开始,分析下Flink如何做到:

    • 如何初始化和关闭HBase连接
    • 如何高效率将数据插入HBase
    • 如何处理checkpoint问题

    open方法

    我们查看下它的open方法,如下所示:

    @Override
    public void open(Configuration parameters) throws Exception {
        LOG.info("start open ...");
        // 获取HBase的配置
        org.apache.hadoop.conf.Configuration config = prepareRuntimeConfiguration();
        try {
            // 初始化mutationConverter
            // mutationConverter负责转换Flink元素为HBase Mutation
            this.mutationConverter.open();
            // 记录尚未flush到HBase的数据条数
            this.numPendingRequests = new AtomicLong(0);
    
            // 创建HBase连接
            if (null == connection) {
                this.connection = ConnectionFactory.createConnection(config);
            }
            // create a parameter instance, set the table name and custom listener reference.
            // 创建BufferedMutator构建参数
            // 包含写入HBase的目标表名称和ExceptionListener
            BufferedMutatorParams params =
                new BufferedMutatorParams(TableName.valueOf(hTableName)).listener(this);
            // 设置BufferedMutator的写缓存字节数
            if (bufferFlushMaxSizeInBytes > 0) {
                params.writeBufferSize(bufferFlushMaxSizeInBytes);
            }
            // 根据前面的参数,创建一个BufferedMutator
            this.mutator = connection.getBufferedMutator(params);
    
            // 如果flush间隔大于0ms,且最大待flush数据条数不等于1时
            // 创建一个定时flush BufferedMutator的任务
            if (bufferFlushIntervalMillis > 0 && bufferFlushMaxMutations != 1) {
                this.executor =
                    Executors.newScheduledThreadPool(
                    1, new ExecutorThreadFactory("hbase-upsert-sink-flusher"));
                this.scheduledFuture =
                    this.executor.scheduleWithFixedDelay(
                    () -> {
                        if (closed) {
                            return;
                        }
                        try {
                            flush();
                        } catch (Exception e) {
                            // fail the sink and skip the rest of the items
                            // if the failure handler decides to throw an exception
                            // 保存发生的异常,等到检查的时候抛出
                            failureThrowable.compareAndSet(null, e);
                        }
                    },
                    bufferFlushIntervalMillis,
                    bufferFlushIntervalMillis,
                    TimeUnit.MILLISECONDS);
            }
        } catch (TableNotFoundException tnfe) {
            LOG.error("The table " + hTableName + " not found ", tnfe);
            throw new RuntimeException("HBase table '" + hTableName + "' not found.", tnfe);
        } catch (IOException ioe) {
            LOG.error("Exception while creating connection to HBase.", ioe);
            throw new RuntimeException("Cannot create connection to HBase.", ioe);
        }
        LOG.info("end open.");
    }
    

    由以上分析可知,open方法创建了HBase的BufferedMutator,用于缓存待插入的数据,以批量方式将这些数据插入。相比依次插入单条数据而言,极大的提升了操作HBase的效率。

    获取HBase配置的过程

    这部分逻辑位于prepareRuntimeConfiguration方法。

    获取HBase配置文件的逻辑如下列表所示。按照从上到下的顺序查找hbase-site.xml文件,无论找到与否,都会尝试整个列表。列表后者找到的配置项会覆盖前者。

    • 首先从classpath中查找hbase-site.xmlhbase-default.xml文件。
    • HBASE_HOME环境变量的conf目录查找hbase-site.xmlhbase-default.xml文件。
    • HBASE_CONF_DIR环境变量查找hbase-site.xmlhbase-default.xml文件。

    prepareRuntimeConfiguration方法代码如下所示:

    private org.apache.hadoop.conf.Configuration prepareRuntimeConfiguration() throws IOException {
        // create default configuration from current runtime env (`hbase-site.xml` in classpath)
        // first,
        // and overwrite configuration using serialized configuration from client-side env
        // (`hbase-site.xml` in classpath).
        // user params from client-side have the highest priority
        
        // 获取Configuration
        org.apache.hadoop.conf.Configuration runtimeConfig =
                HBaseConfigurationUtil.deserializeConfiguration(
                        serializedConfig, HBaseConfigurationUtil.getHBaseConfiguration());
    
        // do validation: check key option(s) in final runtime configuration
        // 检查获取到的HBase配置
        // 如果配置中不包含hbase.zookeeper.quorum配置项(HBase必须),抛出异常
        if (StringUtils.isNullOrWhitespaceOnly(runtimeConfig.get(HConstants.ZOOKEEPER_QUORUM))) {
            LOG.error(
                    "Can not connect to HBase without {} configuration",
                    HConstants.ZOOKEEPER_QUORUM);
            throw new IOException(
                    "Check HBase configuration failed, lost: '"
                            + HConstants.ZOOKEEPER_QUORUM
                            + "'!");
        }
    
        return runtimeConfig;
    }
    

    获取配置信息的具体逻辑位于HBaseConfigurationUtil.getHBaseConfiguration()。我们继续跟踪。

    public static Configuration getHBaseConfiguration() {
    
        // Instantiate an HBaseConfiguration to load the hbase-default.xml and hbase-site.xml from
        // the classpath.
        // 默认方式,从classpath读取hbase-default.xml和hbase-site.xml
        Configuration result = HBaseConfiguration.create();
        boolean foundHBaseConfiguration = false;
    
        // We need to load both hbase-default.xml and hbase-site.xml to the hbase configuration
        // The properties of a newly added resource will override the ones in previous resources, so
        // a configuration
        // file with higher priority should be added later.
    
        // Approach 1: HBASE_HOME environment variables
        String possibleHBaseConfPath = null;
    
        // 读取HBASE_HOME环境变量
        final String hbaseHome = System.getenv("HBASE_HOME");
        // 如果该环境变量存在,尝试从${HBASE_HOME}/conf读取配置
        if (hbaseHome != null) {
            LOG.debug("Searching HBase configuration files in HBASE_HOME: {}", hbaseHome);
            possibleHBaseConfPath = hbaseHome + "/conf";
        }
    
        if (possibleHBaseConfPath != null) {
            foundHBaseConfiguration = addHBaseConfIfFound(result, possibleHBaseConfPath);
        }
    
        // Approach 2: HBASE_CONF_DIR environment variable
        // 读取HBASE_CONF_DIR环境变量,如果该变量存在,尝试从该路径下读取配置
        String hbaseConfDir = System.getenv("HBASE_CONF_DIR");
        if (hbaseConfDir != null) {
            LOG.debug("Searching HBase configuration files in HBASE_CONF_DIR: {}", hbaseConfDir);
            foundHBaseConfiguration =
                addHBaseConfIfFound(result, hbaseConfDir) || foundHBaseConfiguration;
        }
    
        if (!foundHBaseConfiguration) {
            LOG.warn(
                "Could not find HBase configuration via any of the supported methods "
                + "(Flink configuration, environment variables).");
        }
    
        return result;
    }
    

    Invoke方法

    HBaseSinkFunction每次接收到一个数据,都会调用invoke方法。该方法负责将数据转换为HBase的mutation,放入到BufferedMutator中。当BufferedMutator累积的mutation数超过bufferFlushMaxMutations时候,强制flush到HBase中。

    @Override
    public void invoke(T value, Context context) throws Exception {
        // 检查是否有异常发生
        checkErrorAndRethrow();
    
        // 使用mutationConverter转换数据为HBase的mutation
        // mutationConverter在后面章节介绍
        // 把这个mutation加入到BufferedMutator中
        mutator.mutate(mutationConverter.convertToMutation(value));
    
        // flush when the buffer number of mutations greater than the configured max size.
        // numPendingRequests记录了未发送的写入请求个数
        // 如果numPendingRequests大于bufferFlushMaxMutations,强制flush到HBase
        if (bufferFlushMaxMutations > 0
            && numPendingRequests.incrementAndGet() >= bufferFlushMaxMutations) {
            flush();
        }
    }
    

    接下来分析下强制把数据写入HBase的flush方法。

    private void flush() throws IOException {
        // BufferedMutator is thread-safe
        // 调用BufferedMutator的flush方法
        mutator.flush();
        // 重置pendingRequest计数器为0
        numPendingRequests.set(0);
        // 检查是否有异常发生
        checkErrorAndRethrow();
    }
    

    到此为止,HBaseSinkFunction中有关RichSinkFunction的方法已经分析完毕。除了一个close方法,包含关闭BufferedMutatorConnection和周期性flush HBase的定时任务,不再详细分析。

    checkpoint相关方法实现

    HBaseSinkFunction在checkpoint的时候,如果有尚未写入HBase的数据,必须flush到HBase中。以防止从checkpoint恢复的时候数据丢失。

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {
        // 如果pendingRequest不为0,即有缓存的数据
        // 一直执行flush操作,直到数据全存入HBase为止
        while (numPendingRequests.get() != 0) {
            flush();
        }
    }
    
    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        // 从checkpoint中恢复,什么都不用做
        // nothing to do.
    }
    

    MutationConverter

    MutationConverter用于将Flink的数据转换为HBase的Mutation

    该接口有两个方法:

    • open:初始化converter。
    • convertToMutation:将record转换为Mutation
    @Internal
    public interface HBaseMutationConverter<T> extends Serializable {
    
        /** Initialization method for the function. It is called once before conversion method. */
        void open();
    
        /**
         * Converts the input record into HBase {@link Mutation}. A mutation can be a {@link Put} or
         * {@link Delete}.
         */
        Mutation convertToMutation(T record);
    }
    

    HBaseMutationConverter有两个实现类:

    • LegacyMutationConverter:转换Tuple2<Boolean, Row>类型的数据。如果boolean为true,则创建Put操作,如果为false创建Delete操作。
    • RowDataToMutationConverter:转换RowData类型数据。通过RowKind来控制创建出Put操作还是Delete操作。

    除此之外,这两个类中大量的逻辑为从HBaseTableSchema获取列族,qualifier和charset等信息,然后从数据解析出每个cf:qualifier对应的value,构造出PutDelete对象。此处不再一一分析。

    本博客为作者原创,欢迎大家参与讨论和批评指正。如需转载请注明出处。

    相关文章

      网友评论

        本文标题:Flink 源码之数据写入HBase

        本文链接:https://www.haomeiwen.com/subject/cpwwqltx.html