技术准备:
开通dts
开通datahub
开通kafka队列(0.10版),按时付费每小时2.35元
开通hbase serverless版,每小时0.01元
开通flink1.11全托管版
数据流程:
polardb->dts->datahub->flink->kafka->flink->hbase
为什么还需要datahub传输到kafka,而不直接通过dts到kafka,因为dts同步的时候多张表只能选择到一个topic,而datahub可以同步到多个topic。
注意:关于dts同步数据到datahub的说明如下图:
dts.png
将polardb的表rb_test2同步(当然也可以选择多张表),其表结构如下:
CREATE TABLE `rb_test2` (
`id` int(11) unsigned NOT NULL AUTO_INCREMENT,
`name` varchar(20) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',
`password` varchar(20) COLLATE utf8mb4_unicode_ci NOT NULL DEFAULT '',
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=54 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
在Flink中关联datahub中的topic
create table dh_in_testk2 (
id INT,
name VARCHAR,
password VARCHAR,
new_dts_sync_dts_record_id VARCHAR,
new_dts_sync_dts_operation_flag VARCHAR,
new_dts_sync_dts_before_flag VARCHAR,
new_dts_sync_dts_after_flag VARCHAR
) WITH (
'connector' = 'datahub',
'endPoint' = 'http://dh-cn-shenzhen-int-vpc.aliyuncs.com',
'project' = '*****',
'topic' = 'rb_test2',
-- topic是datahub中的
'accessId' = '*****',
'accessKey' = '*****',
'subId' = '******'
--subId是在datahub服务中添加的订阅id
);
在Flink中关联kafka中的topic
create table kk_out_test2 (
id INT,
name VARCHAR,
password VARCHAR,
new_dts_sync_dts_record_id VARCHAR,
new_dts_sync_dts_operation_flag VARCHAR,
new_dts_sync_dts_before_flag VARCHAR,
new_dts_sync_dts_after_flag VARCHAR
) with (
'connector' = 'kafka',
'topic' = 'rb_test2',
'properties.bootstrap.servers' = '***:9092,***9092,***:9092',
'format' = 'json'
)
运行Flink作业,将datahub topic rb_test2数据实时写入kafka,如果是更新操作,只同步更新后的数据。
insert into
kk_out_test2
select
*
from
dh_in_testk2
where
new_dts_sync_dts_operation_flag <> 'U'
or new_dts_sync_dts_after_flag = 'Y'
在hbase中添加表
create 'test2','cf'
在Flink中关联hbase
CREATE TABLE hbase_test2 (
rowkey STRING,
cf ROW < id INT,
name STRING,
password STRING,
new_dts_sync_dts_operation_flag STRING>
) with (
'connector' = 'cloudhbase',
'table-name' = 'test2',
'zookeeper.quorum' = 'https://sh-***-hbase-serverless-in.hbase.rds.aliyuncs.com:443',
'userName'='***',
'password'='***'
);
运行Flink作业同步到hbase
insert into
hbase_test2
select
CONCAT(id,''),
ROW (id, name, password,new_dts_sync_dts_operation_flag)
from
kk_out_test2;
从polardb到hbase,时间大概会延迟2-3秒,如果中间转化过程比较多,那么这个时间会相应的增加。在hbase中获取值的时候,需要判断new_dts_sync_dts_operation_flag 为D的时候,这个值是被删除的。
网友评论