美文网首页
数据传输 | 如何使用 DTLE 将 Oracle 数据同步到

数据传输 | 如何使用 DTLE 将 Oracle 数据同步到

作者: 爱可生开源社区 | 来源:发表于2022-03-07 14:00 被阅读0次

    作者:刘安

    爱可生测试团队成员,主要负责 DTLE 开源项目相关测试任务,擅长 Python 自动化测试开发。

    本文来源:原创投稿

    *爱可生开源社区出品,原创内容未经授权不得随意使用,转载请联系小编并注明来源。


    前言:过年前 DTLE 发布了 4.22.01.0 版本,该版本最重要的特性是支持 Oracle-MySQL 增量数据同步。今天我就来给大家介绍一下这个功能。

    一、现状

    1. 目前只支持增量同步

    a. 可以根据 SCN 节点开启增量复制
    b. 从任务启动时间开启增量复制

    2. 类型映射

    a. 已支持类型
    Oracle MySQL 限制
    BINARY_DOUBLE float mysql 不支持Inf/-Inf/Nan数据,用NULL来存储
    CHAR(n), CHARACTER(n) CHAR(n), CHARACTER(n)
    DATE datetime
    DECIMAL(p,s), DEC(p,s) DECIMAL(p,s), DEC(p,s)
    DOUBLE PRECISION DOUBLE PRECISION
    FLOAT(p) DOUBLE
    INTEGER, INT INT 极值问题 (https://github.com/actiontech/dtle/issues/825)
    INTERVAL DAY(p) TO SECOND(s) VARCHAR(30)
    INTERVAL YEAR(p) TO MONTH VARCHAR(30)
    NCHAR VARYING(n) NCHAR VARYING(n)
    NCHAR(n) NCHAR(n)/NVARCHAR(n)
    NUMBER(p,0), NUMBER(p) TINYINT/SMALLINT/INT/BIGINT/DECIMAL(p)
    NUMBER(p,s) DECIMAL(p,s)
    NUMBER, NUMBER(*) DOUBLE
    NUMERIC(p,s) NUMERIC(p,s)
    NVARCHAR2(n) NVARCHAR(n)
    RAW(n) VARBINARY(n)
    REAL DOUBLE
    ROWID CHAR(100)
    SMALLINT DECIMAL(38)
    TIMESTAMP(p) datetime
    VARCHAR2(n) VARCHAR(n)
    VARCHAR(n) VARCHAR(n)

    b. 待支持类型

    Oracle MySQL 当前不支持原因
    BINARY_FLOAT float MySQL不支持Inf/-Inf/Nan数据, MySQL float类型无法精确匹配,导致更新失败
    BLOB BLOB 当前实现逻辑,无法从redoSQL获取足够的值
    CLOB CLOB 当前实现逻辑,无法从redoSQL获取足够的值
    LONG LONGTEXT 只支持insert
    LONG RAW LONGBLOB 只支持insert
    NCLOB TEXT 无法从redoSQL获取足够的值
    TIMESTAMP(p) WITH TIME ZONE datetime 时区问题未处理

    c. 不支持类型

    Oracle MySQL 不支持原因
    BFILE VARCHAR(255) logminer不支持
    UROWID(n) VARCHAR(n) logminer读取的数据不足以构造新SQL
    XMLTYPE VARCHAR(30) logminer不支持

    3. DML支持度

    a. DML类型

    DML类型 Oracle SQL MySQL SQL
    INSERT INSERT INTO ACTION_DB.CHAR_255_COLUMNS VALUES (0, NULL) replace into ACTION_DB.CHAR_255_COLUMNS (COL1,COL2`) values ('0', NULL)
    UPDATE UPDATE ACTION_DB.CHAR_255_COLUMNS SET COL2='abcdefghijklmnopqrstuvwxyz' WHERE COL1=0 update ACTION_DB.CHAR_255_COLUMNS set COL1='0', COL2='abcdefghijklmnopqrstuvwxyz' where ((COL1 = '0') and (COL2 is NULL)) limit 1
    DELETE DELETE FROM ACTION_DB.CHARACTER_255_COLUMNS WHERE COL1=0 delete from ACTION_DB.CHAR_255_COLUMNS where ((COL1 = '0') and (COL2 = 'abcdefghijklmnopqrstuvwxyz')) limit 1

    b. DML函数支持度

    函数名 是否支持 其他
    CURRENT_TIMESTAMP
    DATE
    EMPTY_BLOB 函数支持解析为NULL
    EMPTY_CLOB 函数支持解析为NULL
    HEXTORAW
    LOCALTIMESTAMP
    RAWTOHEX
    RAWTOHEX(CHR())
    SYSTIMESTAMP
    TO_DATE
    TO_DSINTERVAL
    TO_TIMESTAMP
    TO_YMINTERVAL
    UNISTR

    4. DDL支持度

    DDL Target Option
    CREATE 表 TABLE DEFAULT CREATE
    ALTER 表 TABLE 增加字段
    删除字段
    重命名字段 (当前仅支持MySQL 8.0语法)
    变更字段类型
    DROP 表 TABLE DEFAULT DROP

    二、环境准备

    1. Oracle数据库开启归档日志

    shell> su oracle
    shell> mkdir /u01/app/oracle/oradata/archive_log
    shell> sqlplus sys/oracle as sysdba
    
    SQL> alter system set log_archive_dest_1='location=/u01/app/oracle/oradata/archive_log' scope=spfile;
    SQL> alter system set db_recovery_file_dest_size = 10G;
    SQL> shutdown immediate;
    SQL> startup mount;
    SQL> alter database add logfile group 3 '/u01/app/oracle/fast_recovery_area/XE/onlinelog/redo01.log' size 500m;
    SQL> alter database add logfile group 4 '/u01/app/oracle/fast_recovery_area/XE/onlinelog/redo02.log' size 500m;
    SQL> alter database add logfile group 5 '/u01/app/oracle/fast_recovery_area/XE/onlinelog/redo03.log' size 500m;
    SQL> alter database archivelog;
    SQL> alter database add supplemental log data (all) columns;
    SQL> alter database open;
    SQL> archive log list;
    # Archive Mode表示已开启归档模式,Archive destination表示归档日志储存路径
    

    2. 安装 LogMiner 工具(Oracle 安装时默认安装)

    可查看系统中是否存在运行 LogMiner 所需要的 dbms_logmnr 、dbms_logmnr_d 包,如果没有安装 LogMiner 工具需要的包,需要运行下面两个命令:

    shell> cat $ORACLE_HOME/rdbms/admin/dbmslm.sql | sqlplus sys/oracle as sysdba
    shell> cat $ORACLE_HOME/rdbms/admin/dbmslmd.sql | sqlplus sys/oracle as sysdba
    

    3. 创建 logminer 需要角色权限

    shell> su oracle
    shell> sqlplus sys/oracle as sysdba
    
    SQL> create user roma_logminer identified by oracle default tablespace users;
    
    SQL> GRANT CREATE SESSION TO roma_logminer;
    SQL> GRANT SET CONTAINER TO roma_logminer;
    SQL> GRANT SELECT ON V_$DATABASE TO roma_logminer;
    SQL> GRANT FLASHBACK ANY TABLE TO roma_logminer;
    SQL> GRANT SELECT ANY TABLE TO roma_logminer;
    SQL> GRANT SELECT_CATALOG_ROLE TO roma_logminer;
    SQL> GRANT EXECUTE_CATALOG_ROLE TO roma_logminer;
    SQL> GRANT SELECT ANY TRANSACTION TO roma_logminer;
    SQL> GRANT CREATE TABLE TO roma_logminer;
    SQL> GRANT LOCK ANY TABLE TO roma_logminer;
    SQL> GRANT CREATE SEQUENCE TO roma_logminer;
    SQL> GRANT EXECUTE ON DBMS_LOGMNR TO roma_logminer;
    SQL> GRANT EXECUTE ON DBMS_LOGMNR_D TO roma_logminer;
    
    SQL> GRANT SELECT ON V_$LOG TO roma_logminer;
    SQL> GRANT SELECT ON V_$LOG_HISTORY TO roma_logminer;
    SQL> GRANT SELECT ON V_$LOGMNR_LOGS TO roma_logminer;
    SQL> GRANT SELECT ON V_$LOGMNR_CONTENTS TO roma_logminer;
    SQL> GRANT SELECT ON V_$LOGMNR_PARAMETERS TO roma_logminer;
    SQL> GRANT SELECT ON V_$LOGFILE TO roma_logminer;
    SQL> GRANT SELECT ON V_$ARCHIVED_LOG TO roma_logminer;
    SQL> GRANT SELECT ON V_$ARCHIVE_DEST_STATUS TO roma_logminer;
    
    SQL> alter user roma_logminer quota unlimited ON users;
    

    4. 部署一个单节点的 4.22.01.0版本 DTLE

    https://github.com/actiontech/dtle/releases/download/v4.22.01.0/dtle-ce-4.22.01.0.x86_64.rpm

    三、创建 Oracle-MySQL 任务

    1. 获取 Token

    shell> curl -s -X POST "http://172.100.9.11:8190/v2/loginWithoutVerifyCode" -H "accept: application/json" -H "Content-Type: application/json" -d "{ \"password\": \"admin\", \"tenant\": \"platform\", \"username\": \"admin\"}" | jq
    {
      "message": "ok",
      "data": {
        "token": "eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2NDYxOTE2MzksImdyb3VwIjoicGxhdGZvcm0iLCJuYW1lIjoiYWRtaW4ifQ.-c_jVkxd_vP5Ka3gocdfGZIudWPujutdHpQYx8srX00"
      }
    }
    

    2. 准备 job 文件

    shell> cat job.json
    {
      "job_id": "test_oracle",
      "src_task": {
        "connection_config": {
          "database_type": "Oracle",
          "host": "172.100.9.31",
          "port": 1521,
          "user": "roma_logminer",
          "password": "oracle",
          "service_name": "XE"
        },
        "node_id": "96d28881-f91b-19f4-1614-4d8a0e718e2f",
        "binlog_relay": false,
        "repl_chan_buffer_size": 120,
        "group_max_size": 1,
        "group_timeout": 100,
        "oracle_src_task_config": {
          "scn": 0
        },
        "task_name": "src",
        "replicate_do_db": [
          {
            "table_schema": "ACTION_DB"
          }
        ]
      },
      "is_password_encrypted": false,
      "dest_task": {
        "connection_config": {
          "database_type": "MySQL",
          "host": "172.100.9.1",
          "port": 3306,
          "user": "test_dest",
          "password": "test_dest"
        },
        "node_id": "96d28881-f91b-19f4-1614-4d8a0e718e2f",
        "task_name": "dest",
        "mysql_dest_task_config": {}
      },
      "task_step_name": "job_stage_full",
      "failover": true,
      "retry": 2
    }
    

    3. 创建 Oracle-MySQL job

    shell> curl -s -X POST "http://172.100.9.11:8190/v2/job/migration/create" -H "accept: application/json" -H "Authorization: eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJleHAiOjE2NDYxOTE2MzksImdyb3VwIjoicGxhdGZvcm0iLCJuYW1lIjoiYWRtaW4ifQ.-c_jVkxd_vP5Ka3gocdfGZIudWPujutdHpQYx8srX00" -H "Content-Type: application/json" -d @job.json | jq
    {
      "job": {
        "job_id": "test_oracle-migration",
        "task_step_name": "job_stage_full",
        "reverse": false,
        "failover": true,
        "is_password_encrypted": false,
        "src_task": {
          "task_name": "src",
          "node_id": "96d28881-f91b-19f4-1614-4d8a0e718e2f",
          "replicate_do_db": [
            {
              "table_schema": "ACTION_DB",
              "table_schema_regex": "",
              "table_schema_rename": "",
              "tables": null
            }
          ],
          "replicate_ignore_db": null,
          "skip_create_db_table": false,
          "drop_table_if_exists": false,
          "mysql_src_task_config": null,
          "oracle_src_task_config": {
            "scn": 0
          },
          "group_max_size": 1,
          "group_timeout": 100,
          "repl_chan_buffer_size": 120,
          "chunk_size": 2000,
          "connection_config": {
            "host": "172.100.9.31",
            "port": 1521,
            "user": "roma_logminer",
            "password": "*",
            "service_name": "XE",
            "database_type": "Oracle"
          }
        },
        "dest_task": {
          "task_name": "dest",
          "node_id": "96d28881-f91b-19f4-1614-4d8a0e718e2f",
          "database_type": "",
          "mysql_dest_task_config": {
            "parallel_workers": 1,
            "use_my_sql_dependency": false,
            "dependency_history_size": 2500
          },
          "connection_config": {
            "host": "172.100.9.1",
            "port": 3306,
            "user": "test_dest",
            "password": "*",
            "service_name": "",
            "database_type": "MySQL"
          }
        },
        "retry": 2
      },
      "eval_create_index": 12,
      "job_modify_index": 12,
      "message": "ok"
    }
    

    4. 源端 Oracle 写入数据

    SQL> create tablespace ACTION_DB datafile 'ACTION_DB.dbf' size 100M;
    SQL> create user ACTION_DB identified by ACTION_DB default tablespace ACTION_DB;
    SQL> grant unlimited tablespace to ACTION_DB;
    SQL> CREATE TABLE ACTION_DB.CHAR_255_COLUMNS(col1 INT, col2 CHAR(255));
    SQL> INSERT INTO ACTION_DB.CHAR_255_COLUMNS VALUES (0, NULL);
    SQL> INSERT INTO ACTION_DB.CHAR_255_COLUMNS VALUES (1, 'abcdefghijklmnopqrstuvwxyz');
    SQL> INSERT INTO ACTION_DB.CHAR_255_COLUMNS VALUES (2, 'ABCDEFGHIJKLMNOPQRSTUVWXYZ');
    SQL> INSERT INTO ACTION_DB.CHAR_255_COLUMNS VALUES (3, '1234567890');
    SQL> INSERT INTO ACTION_DB.CHAR_255_COLUMNS VALUES (4, 1234567890);
    SQL> INSERT INTO ACTION_DB.CHAR_255_COLUMNS VALUES (5, '~`!@#$%^&*()-=_+{}[]|\:;<>,.?/');
    SQL> INSERT INTO ACTION_DB.CHAR_255_COLUMNS VALUES (6, '中文测试abc');
    SQL> INSERT INTO ACTION_DB.CHAR_255_COLUMNS VALUES (7, '·~!@#¥%……&*()-=——+{}【】、|;‘:“,。/《》?');
    

    5. 目标端 MySQL 检查同步情况

    mysql> SHOW CREATE TABLE ACTION_DB.CHAR_255_COLUMNS\G
    *************************** 1. row ***************************
           Table: CHAR_255_COLUMNS
    Create Table: CREATE TABLE `CHAR_255_COLUMNS` (
      `COL1` int(11) DEFAULT NULL,
      `COL2` char(255) DEFAULT NULL
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4
    1 row in set (0.00 sec)
    
    mysql> SELECT * FROM ACTION_DB.CHAR_255_COLUMNS;
    +------+-----------------------------------------------------------------------------+
    | COL1 | COL2                                                                        |
    +------+-----------------------------------------------------------------------------+
    |    0 | NULL                                                                        |
    |    1 | abcdefghijklmnopqrstuvwxyz                                                  |
    |    2 | ABCDEFGHIJKLMNOPQRSTUVWXYZ                                                  |
    |    3 | 1234567890                                                                  |
    |    4 | 1234567890                                                                  |
    |    5 | ~`!@#$%^&*()-=_+{}[]|\:;<>,.?/                                              |
    |    6 | 中文测试abc                                                                  |
    |    7 | ·~!@#¥%……&*()-=——+{}【】、|;‘:“,。/《》?                                 |
    +------+-----------------------------------------------------------------------------+
    8 rows in set (0.01 sec)
    

    四、使用限制

    因为 Oracle 和 MySQL 是异构数据库,所以在源端 Oracle 能执行的 Oracle SQL 语句通过 DTLE 转换到目标端的 MySQL SQL 语句后有可能无法正确执行。比如 Oracle 支持数值范围 MySQL 不支持,Oracle 的 DATE 类型支持公元前的年份而 MySQL 的 datetime 只能为公元后的年份等等。遇到这些情况,DTLE 的默认行为是报错并且停止同步。

    如果需要跳过这种阻塞情况,可以修改 DTLE 的环境变量然后重新启动 DTLE 服务。

    shell> vi /etc/systemd/system/multi-user.target.wants/dtle-nomad.service
    # 添加
    [Service]
    Environment="SkipErr=true"
    
    shell> systemctl daemon-reload
    shell> systemctl restart dtle-nomad
    

    如果在使用 DTLE 时发现了任何问题,请及时联系我们。

    相关文章

      网友评论

          本文标题:数据传输 | 如何使用 DTLE 将 Oracle 数据同步到

          本文链接:https://www.haomeiwen.com/subject/micurrtx.html