美文网首页
SIDDHI(西迪) CDC配置

SIDDHI(西迪) CDC配置

作者: 啊布多 | 来源:发表于2021-11-15 13:20 被阅读0次

    CDC介绍

    关于CDC的介绍可参考 cdc(程序CDC类)_百度百科 (baidu.com),CDC 是change data capture,即变化数据捕捉。是数据库进行备份的一种方式,常用于大量数据的备份工作。分为入侵式的和非入侵式的备份方法,入侵式的有基于触发器备份、基于时间戳备份、基于快照备份,非入侵式的备份方法是基于日志的备份。mysql 基于日志的CDC就是要开启mysql binary log。

    SIDDHI 关于CDC的配置参数

    SIDDHI支持“listening”和“polling”两种模式

    listening支持 'INSERT', 'UPDATE'和'DELETE'三种模式,可通过operation参数进行配置。只支持MYSQL数据库

    可使用的参数有:
    url:数据库连接字符串: mysql:jdbc:mysql://192.168.0.10:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8; oracle:jdbc:oracle:thin:@10.1.4.76:1521:ORCL
    mode:CDC模式(listening)
    username:数据库用户名,要有'SELECT', 'RELOAD', 'SHOW DATABASES', 'REPLICATION SLAVE'和'REPLICATION CLIENT' 权限
    GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON . TO 'user'@'%';
    password:数据库用户密码
    table.name:要监听的表名
    operation:支持 'INSERT', 'UPDATE'和'DELETE'
    pool.properties:数据库连接的池参数可以指定为键值对。
    connector.properties:在这里,您可以将Debezium连接器属性指定为逗号分隔的字符串。
    database.server.id加入MySQL数据库集群读取bin日志时使用的ID。这应该是一个介于1到2^32之间的唯一整数。
    database.server.name:标识数据库服务器并为其提供名称空间的逻辑名称

    polling 支持insert、update ; 支持多种数据库

    mode:CDC模式(polling )
    datasource.name:SIDDHI配置文件内配置的数据库源
    url:数据库连接字符串: mysql:jdbc:mysql://192.168.0.10:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8; oracle:jdbc:oracle:thin:@10.1.4.76:1521:ORCL
    username:数据库用户名,要有'SELECT' 权限
    password:数据库用户密码
    polling.column:要监听的字段
    polling.interval:呆监听的记录数,默认为1
    table.name:要监听的表名
    wait.on.missed.record:指示进程是否需要等待丢失/无序记录。
    当此标志设置为“true”时,一旦识别出丢失的记录,流程将被保留。缺少的recrod由polling.column值的顺序标识。这只能用于数字字段,不建议用于时间值,因为它不是连续的。
    仅当记录可能无序写入时(例如并发写入器),才应启用此功能,因为这会降低性能。
    missed.record.waiting.timeout:重试丢失/无序记录的超时时间(以秒为单位)。这应该与wait.on.missed.record参数一起使用。如果未设置参数,进程将无限期地等待丢失的记录。

    数据库CDC配置

    Mysql 5.7.X以后版本支持,默认不支持CDC,需要配置指定参数。8.X.X以后配置默认支持

    show VARIABLES like '%log%':查看日志配置参数


    image.png

    server_id:数据库ID,单机模式下设置 server_id=1,集群模式下不同节点的server_id必须不同
    log_bin:指定binlog文件名和储存位置,可设置为log_bin=mysql-bin
    binlog_format:binlog格式。有3个值可以选择:ROW:记录哪条数据被修改和修改之后的数据,会产生大量日志。STATEMENT:记录修改数据的SQL,日志量较小。MIXED:混合使用上述两个模式。CDC要求必须配置为ROW。binlog_format=row
    expire_logs_days:bin_log过期时间,超过该时间的log会自动删除。这里设置为7天,expire_logs_days=7
    binlog_do_db:binlog记录哪些数据库。如果需要配置多个库,如例子中配置多项。切勿使用逗号分隔。单例数据库可不设置,集群模式需要配置多条

    binlog_do_db=db_a
    binlog_do_db=db_b

    Oracle,默认是开启的。

    SIDDHI驱动程序,试了很多版本,建议使用siddhi-io-cdc-2.0.3.jar ,踩了很多坑。程序默认有2.0.5,2.0.10多多少少都有问题。

    mysql数据源配置实例

    oracle数据源配置实例

    • name: test
      description: The datasource used for test
      jndiConfig:
      name: jdbc/test
      useJndiReference: true
      definition:
      type: RDBMS
      configuration:
      jdbcUrl: 'jdbc:oracle:thin:@192.168.1.10:1521/ORCL'
      username: XXXXX
      password: XXXXX
      driverClassName: oracle.jdbc.driver.OracleDriver
      maxPoolSize: 20
      idleTimeout: 60000
      connectionTestQuery: SELECT 1 FROM DUAL
      validationTimeout: 30000
      isAutoCommit: false

    mysql程序开发实例

    @App:name("test_mysql_cdc")

    @App:description("Description of the plan")

    -- @source(type = 'cdc' , url = 'jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8',
    -- username = 'XXXX', password = 'XXXX',
    -- table.name = 'person', operation = 'insert',
    -- @map(type='keyvalue', @attributes(id = 'ID', name = 'Name', age = 'Age')))
    -- define stream inputStream (id int, name string, age int);

    @source(type = 'cdc', mode='polling', polling.column = 'ID',
    polling.interval = '1',
    jdbc.driver.name = 'com.mysql.jdbc.Driver', url = 'jdbc:mysql://127.0.0.1:3306/test?useUnicode=true&characterEncoding=utf8&zeroDateTimeBehavior=convertToNull&useSSL=false&serverTimezone=GMT%2B8',
    username = 'admin', password = '123456',
    table.name = 'person',
    @map(type='keyvalue'))
    define stream pollingStream (id string, name string, age string);

    -- from inputStream#log()
    -- select *
    -- insert into testString;

    from pollingStream#log()
    select *
    insert into testPolling;

    oracle程序开发实例

    @source(type = 'cdc',
    datasource.name = 'test',
    url = '',
    mode = 'polling',
    polling.column = 'name',
    polling.interval = '1',
    username = '',
    password = '',
    table.name = 'user',
    @map(type = 'keyvalue',fail.on.missing.attribute="false" ))

    相关文章

      网友评论

          本文标题:SIDDHI(西迪) CDC配置

          本文链接:https://www.haomeiwen.com/subject/ijfqtrtx.html