美文网首页
在阿里云用Flink Sql同步polardb数据到hbase

在阿里云用Flink Sql同步polardb数据到hbase

作者: lodestar | 来源:发表于2020-09-27 22:26 被阅读0次

    技术准备:
    开通dts
    开通datahub
    开通kafka队列(0.10版),按时付费每小时2.35元
    开通hbase serverless版,每小时0.01元
    开通flink1.11全托管版

    数据流程:
    polardb->dts->datahub->flink->kafka->flink->hbase

    为什么还需要datahub传输到kafka,而不直接通过dts到kafka,因为dts同步的时候多张表只能选择到一个topic,而datahub可以同步到多个topic。

    注意:关于dts同步数据到datahub的说明如下图:


    dts.png

    将polardb的表rb_test2同步(当然也可以选择多张表),其表结构如下:

    CREATE TABLE `rb_test2` (
      `id` int(11) unsigned NOT NULL AUTO_INCREMENT,
      `name` varchar(20) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',
      `password` varchar(20) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB AUTO_INCREMENT=54 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
    

    在Flink中关联datahub中的topic

    create table dh_in_testk2 (
        id INT,
        name VARCHAR,
        password VARCHAR,
        new_dts_sync_dts_record_id VARCHAR,
        new_dts_sync_dts_operation_flag VARCHAR,
        new_dts_sync_dts_before_flag VARCHAR,
        new_dts_sync_dts_after_flag VARCHAR
    ) WITH (
        'connector' = 'datahub',
        'endPoint' = 'http://dh-cn-shenzhen-int-vpc.aliyuncs.com',
        'project' = '*****',
        'topic' = 'rb_test2',
        -- topic是datahub中的
        'accessId' = '*****',
        'accessKey' = '*****',
        'subId' = '******'
        --subId是在datahub服务中添加的订阅id
    );
    

    在Flink中关联kafka中的topic

    create table kk_out_test2 (
        id INT,
        name VARCHAR,
        password VARCHAR,
        new_dts_sync_dts_record_id VARCHAR,
        new_dts_sync_dts_operation_flag VARCHAR,
        new_dts_sync_dts_before_flag VARCHAR,
        new_dts_sync_dts_after_flag VARCHAR
    ) with (
        'connector' = 'kafka',
        'topic' = 'rb_test2',
        'properties.bootstrap.servers' = '***:9092,***9092,***:9092',
        'format' = 'json'
    ) 
    

    运行Flink作业,将datahub topic rb_test2数据实时写入kafka,如果是更新操作,只同步更新后的数据。

    insert into
        kk_out_test2
    select
        *
    from
        dh_in_testk2
    where
        new_dts_sync_dts_operation_flag <> 'U'
        or new_dts_sync_dts_after_flag = 'Y'
    

    在hbase中添加表

    create 'test2','cf'
    

    在Flink中关联hbase

    CREATE TABLE hbase_test2 (
        rowkey STRING,
        cf ROW < id INT,
        name STRING,
        password STRING,
        new_dts_sync_dts_operation_flag STRING>
    ) with (
        'connector' = 'cloudhbase',
        'table-name' = 'test2',
        'zookeeper.quorum' = 'https://sh-***-hbase-serverless-in.hbase.rds.aliyuncs.com:443',
        'userName'='***',
        'password'='***'
    );
    

    运行Flink作业同步到hbase

    insert into
        hbase_test2
    select
        CONCAT(id,''),
        ROW (id, name, password,new_dts_sync_dts_operation_flag)
    from
        kk_out_test2;
    

    从polardb到hbase,时间大概会延迟2-3秒,如果中间转化过程比较多,那么这个时间会相应的增加。在hbase中获取值的时候,需要判断new_dts_sync_dts_operation_flag 为D的时候,这个值是被删除的。

    相关文章

      网友评论

          本文标题:在阿里云用Flink Sql同步polardb数据到hbase

          本文链接:https://www.haomeiwen.com/subject/jhahuktx.html