背景
flink cdc抽取cdb rac模式的oracle集群,抽取一直存在账号权限问题
前提条件
-
开启归档日志
-
rac 模式归档日志需要在共享存储中。如果配置在本地,会导致归档日志找不到
一、开启归档日志
位置要指定为共享存储,位置可以自行调整
alter system set db_recovery_file_dest='+DATA' scope=spfile sid='*';
alter system set db_recovery_file_dest_size=2048M scope=spfile sid='*';
重启数据库实例
备注: rac19cdb是数据库实例名称,rac19cdb1数据库节点名称,需自行调整,下同
su oracle
srvctl stop database -d rac19cdb
srvctl start instance -d rac19cdb -i rac19cdb1 -o mount
执行sql,开启归档
alter database archivelog;
启动另外一台数据库实例
srvctl start instance -d rac19cdb -i rac19cdb2 -o mount
二、 在cdb账号下创建cdb表空间
create tablespace logminer_tbs datafile '+DATA' size 100M reuse autoextend on next 100M maxsize unlimited
备注: +DATA是共享存储位置,大小,可自行调整
三、 切换到pdb
alter session set container=RACDBPDB
备注: RACDBPDB是pdb名称,需自行调整
四、 在pdb创建表空间
create tablespace logminer_tbs datafile '+DATA' size 100M reuse autoextend on next 100M maxsize unlimited
备注: +DATA是共享存储位置,可自行调整
五、在cdb下创建cdb用户并授权
CREATE USER c##flinkuser IDENTIFIED BY flinkpw DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs CONTAINER=ALL;
GRANT CREATE SESSION TO c##flinkuser CONTAINER=ALL;
GRANT SET CONTAINER TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$DATABASE to c##flinkuser CONTAINER=ALL;
GRANT FLASHBACK ANY TABLE TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ANY TABLE TO c##flinkuser CONTAINER=ALL;
GRANT SELECT_CATALOG_ROLE TO c##flinkuser CONTAINER=ALL;
GRANT EXECUTE_CATALOG_ROLE TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ANY TRANSACTION TO c##flinkuser CONTAINER=ALL;
GRANT LOGMINING TO c##flinkuser CONTAINER=ALL;
GRANT CREATE TABLE TO c##flinkuser CONTAINER=ALL;
GRANT LOCK ANY TABLE TO c##flinkuser CONTAINER=ALL;
GRANT CREATE SEQUENCE TO c##flinkuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR TO c##flinkuser CONTAINER=ALL;
GRANT EXECUTE ON DBMS_LOGMNR_D TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOG_HISTORY TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_LOGS TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_CONTENTS TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGMNR_PARAMETERS TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$LOGFILE TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVED_LOG TO c##flinkuser CONTAINER=ALL;
GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO c##flinkuser CONTAINER=ALL;
备注:授权语句需要全量执行
六、 在pdb下创建pdb用户
create USER test IDENTIFIED BY test123 DEFAULT TABLESPACE logminer_tbs QUOTA UNLIMITED ON logminer_tbs
GRANT CREATE SESSION TO test;
GRANT CREATE TABLE TO test;
GRANT LOCK ANY TABLE TO test;
GRANT ALTER ANY TABLE TO test;
GRANT CREATE SEQUENCE TO test;
七、 在pdb账号下测试数据准备
1、 创建表
CREATE TABLE TEST."gx_fen" (
"id" NUMBER(11,0),
"name" VARCHAR2(255),
"price" VARCHAR2(20),
"description" VARCHAR2(255),
"inc_day" VARCHAR2(255),
"ts" TIMESTAMP,
"dt" TIMESTAMP WITH LOCAL TIME ZONE,
CONSTRAINT SYS_C007562 CHECK ("id" IS NOT NULL)
);
2、 表开启附加日志
ALTER TABLE TEST."gx_fen" ADD SUPPLEMENTAL LOG DATA (ALL) COLUMNS;
3、插入数据
INSERT INTO TEST."gx_fen" ("id", "name", "price", "description", "inc_day", "ts", "dt") VALUES(1, '螺', '10.00', 'luo si fen', '2022-05-25', TIMESTAMP '2022-05-25 13:07:25.000000', TIMESTAMP '2022-05-25 05:07:25.000000');
INSERT INTO TEST."gx_fen" ("id", "name", "price", "description", "inc_day", "ts", "dt") VALUES(2, '桂', '9.00', 'gui lin mi fen', '2022-05-25', TIMESTAMP '2022-05-25 13:07:37.000000', TIMESTAMP '2022-05-25 05:07:37.000000');
4、创建finksql任务并启动,观察日志,抽取到全量数据则为正常
CREATE TABLE products (
id INT,
name STRING
) WITH (
'connector' = 'oracle-cdc',
'hostname' = '1111111111',
'port' = '111111111',
'username' = 'c##flinkuser',
'password' = 'flinkpw',
'database-name' = 'racdb',
'schema-name' = 'test',
'table-name' = 'gx_fen',
'debezium.database.pdb.name' = 'racdbpdb'
)
5、在pdb账号下再次插入数据,等待日志更新,抽取到新增数据则为正常
INSERT INTO TEST."gx_fen" ("id", "name", "price", "description", "inc_day", "ts", "dt") VALUES(4, '老友', '13', '南宁', '2023-01-09', TIMESTAMP '2023-01-09 15:10:39.000000', TIMESTAMP '2023-01-09 07:10:39.000000');
INSERT INTO TEST."gx_fen" ("id", "name", "price", "description", "inc_day", "ts", "dt") VALUES(6, '老友1', '13', '老友', '2023-01-09', TIMESTAMP '2023-01-09 16:16:39.000000', TIMESTAMP '2023-01-09 09:16:39.000000');
INSERT INTO TEST."gx_fen" ("id", "name", "price", "description", "inc_day", "ts", "dt") VALUES(7, '老友2', '10', '老友', '2023-01-10', TIMESTAMP '2023-01-10 16:16:39.000000', TIMESTAMP '2023-01-10 08:16:39.000000');
网友评论