美文网首页Flink学习指南
Flink-CDC(Change Data Capture)指定

Flink-CDC(Change Data Capture)指定

作者: 小胡子哥灬 | 来源:发表于2020-09-04 00:09 被阅读0次

    前言:

    Flink从1.10开始支持从canal或者debezium消费binlog数据,从而实现数据的同步;但是无论是canal和debezium,都需要预先搭建相对应的服务和创建kafka topic,使用上造成的链路比较长。那么能否直接通过flink的source直接读取binlog呢?Flink-CDC就是这样一个连接器,它借助于debezium engine来捕获数据更改,目前支持的数据库有MySQL和PostgreSQL;详细介绍及使用可以访问github的wiki:https://github.com/ververica/flink-cdc-connectors

    Exacty-Once

    Flink-CDC默认启动时执行一次全量snapshot,把所有数据读取做为INSERT的change mode,在1.1版本还可以指定snapshot.mode为schema_only来禁用第一次启动时的snapshot。但是现在还不支持指定位点消费,不过Flink-CDC执行checkpoint时,会把位点存储到state以支持exacty-once语义。所以从另一个角度来讲,指定位点消费就类似于从savepoint恢复启动,想法上是可行的,但是实现上却少了一步。这就与debezium的实现有关了,拿mysql connector来讲,debezium会保存mysql表的schema信息以便能监听到ddl的变更,会把这些数据封装成HistoryDabase对象存储,所以恢复位点的同时,必须先恢复HistoryDatabase,Flink执行checkpoint存储offset的同时,也把database存储到state。相关代码如下:

    // DebeziumSourceFunction.java
    private void snapshotHistoryRecordsState() throws Exception {
            historyRecordsState.clear();
    
            if (engineInstanceName != null) {
                historyRecordsState.add(engineInstanceName);
                ConcurrentLinkedQueue<HistoryRecord> historyRecords = FlinkDatabaseHistory.getRegisteredHistoryRecord(engineInstanceName);
                if (historyRecords != null) {
                    DocumentWriter writer = DocumentWriter.defaultWriter();
                    for (HistoryRecord record : historyRecords) {
                        historyRecordsState.add(writer.write(record.document()));
                    }
                }
            }
        }
    
    指定位点恢复

    现在开始讲指定位点消费binlog的实现,目前只实现了mysql,postgresql的应该大同小异。通过代码的debug,我发现mysql的位点信息是一个json格式的数据,主要内容如下:

    {
        "sourcePartition":{"server":"mysql-binlog-source"},
        "sourceOffset":{
            "file":"mysql-binlog-000001",
            "pos":2344,
            "ts_sec":1599147495831
        }
    }
    

    目前测试mysql只能通过file和pos来恢复位点读取。所以我的步骤如下:

    1. 增加两个options,分别指定binlog文件名称和位点:

      private static final ConfigOption<String> SOURCE_OFFSET_FILE = ConfigOptions.key("source-offset-file")
               .stringType()
               .noDefaultValue()
               .withDescription("File Name of the MySQL binlog.");
      
       private static final ConfigOption<Integer> SOURCE_OFFSET_POSITION = ConfigOptions.key("source-offset-pos")
               .intType()
               .noDefaultValue()
               .withDescription("Position of the MySQL binlog.");
      
    2. 组装位点json数据:

      // MysqlSource.java#build()
      if (sourceOffsetFile != null && sourceOffsetPosition != null) {
                   // 指定位点恢复时必须指定snapshot.mode为schema_only_recovery,让debezium恢复database,拿到表的schema信息
                   props.setProperty("snapshot.mode", "schema_only_recovery");
      
                   DebeziumState debeziumState = new DebeziumState();
                   Map<String, String> sourcePartition = new HashMap<>();
                   sourcePartition.put("server", props.getProperty("database.server.name"));
                   debeziumState.setSourcePartition(sourcePartition);
      
                   Map<String, Object> sourceOffset = new HashMap<>();
                   sourceOffset.put("file", sourceOffsetFile);
                   sourceOffset.put("pos", sourceOffsetPosition);
                   debeziumState.setSourceOffset(sourceOffset);
      
                   try {
                       ObjectMapper objectMapper = new ObjectMapper();
                       String offsetJson = objectMapper.writeValueAsString(debeziumState);
                       // 覆盖OFFSET_STATE_VALUE,让debezium识别出从位点消费
                       props.setProperty(FlinkOffsetBackingStore.OFFSET_STATE_VALUE, offsetJson);
                       // 标记database不存在,原来的实现就去恒返回true,以至于debezium为schema_only_recovery时,却不能恢复database
                            props.setProperty("database.history.exists", "false");
                   } catch (IOException e) {
                       throw new RuntimeException("Can't serialize debezium offset state from Object: " + debeziumState, e);
                   }
               }
      
    3. 重写database的exists方法

      // FlinkDatabaseHistory.java
      public void configure(Configuration config, HistoryRecordComparator comparator, DatabaseHistoryListener listener, boolean useCatalogBeforeSchema) {
           super.configure(config, comparator, listener, useCatalogBeforeSchema);
           this.instanceName = config.getString(DATABASE_HISTORY_INSTANCE_NAME);
           this.records = getRegisteredHistoryRecord(instanceName);
           if (records == null) {
               throw new IllegalStateException(
                   String.format("Couldn't find engine instance %s in the global records.", instanceName));
           }
           this.databaseexists = config.getBoolean("database.history.exists", true);
       }
      
       @Override
       public boolean exists() {
           return databaseexists;
       }
      

      以上为实现Flink-CDC指定位点消费mysql binlog的相关代码,代码已经Pull Request到github: https://github.com/ververica/flink-cdc-connectors/pull/39

    测试
    1. 第一次启动:

      CREATE TABLE orders (
        order_id INT,
        order_date TIMESTAMP(0),
        customer_name STRING,
        price DECIMAL(10, 5),
        product_id INT,
        order_status BOOLEAN
      ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = 'localhost',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'flink',
        'table-name' = 'orders'
      );
      
      CREATE TABLE print_test (
        order_id INT,
        order_date TIMESTAMP(0),
        customer_name STRING,
        price DECIMAL(10, 5),
        product_id INT,
        order_status BOOLEAN
      ) WITH (
        'connector' = 'print'
      );
      
      INSERT INTO print_test
      SELECT * FROM orders;
      

      输出:

      +I(1,2020-09-03T00:29:30,shizc,12.00000,1111,true)
      +I(2,2020-09-03T23:52:23,shizc1233,22.00000,1123,true)
      +I(3,2020-09-03T23:56:17,2333,3213.00000,11,false)
      +I(4,2020-09-03T23:56:43,shiz343,222.00000,11,true)
      
    2. 指定位点:关于位点怎么获取,这个我目前是通过在mysql执行 show binlog events in 'mysql-bin.000003' 得到。

      CREATE TABLE orders (
        order_id INT,
        order_date TIMESTAMP(0),
        customer_name STRING,
        price DECIMAL(10, 5),
        product_id INT,
        order_status BOOLEAN
      ) WITH (
        'connector' = 'mysql-cdc',
        'hostname' = 'localhost',
        'port' = '3306',
        'username' = 'root',
        'password' = '123456',
        'database-name' = 'flink',
        'table-name' = 'orders',
        'source-offset-file' = 'mysql-bin.000003',
        'source-offset-pos' = '1682'
      );
      

      输出:

      +I(3,2020-09-03T23:56:17,2333,3213.00000,11,false)
      +I(4,2020-09-03T23:56:43,shiz343,222.00000,11,true)
      -U(3,2020-09-03T23:56:17,2333,3213.00000,11,false)
      +U(3,2020-09-03T23:56:17,2333,3213.00000,11,false)
      

      从输出可以看到,从指定位点后,增加了两条数据,并且修改了一条数据,和我测试的数据保持一致。

    PS: 很多同学可能以为binlog的offset可以随便指定一个大概的范围的值,这其实是不行的,offset必须指定为一个有效的值,也就是能通过show binlog events in 'xxx' 查到的值,要不然会报一系列的异常,比如:

    org.apache.kafka.connect.errors.ConnectException: binlog truncated in the middle of event; consider out of disk space on master; the first event 'mysql-bin.000003' at 1681, the last event read from '.\mysql-bin.000003' at 123, the last byte read from '.\mysql-bin.000003' at 1700. Error code: 1236; SQLSTATE: HY000.
        at io.debezium.connector.mysql.AbstractReader.wrap(AbstractReader.java:230)
        at io.debezium.connector.mysql.AbstractReader.failed(AbstractReader.java:196)
        at io.debezium.connector.mysql.BinlogReader$ReaderThreadLifecycleListener.onCommunicationFailure(BinlogReader.java:1125)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:985)
        at com.github.shyiko.mysql.binlog.BinaryLogClient.connect(BinaryLogClient.java:581)
        at com.github.shyiko.mysql.binlog.BinaryLogClient$7.run(BinaryLogClient.java:860)
        at java.lang.Thread.run(Thread.java:748)
    Caused by: com.github.shyiko.mysql.binlog.network.ServerException: binlog truncated in the middle of event; consider out of disk space on master; the first event 'mysql-bin.000003' at 1681, the last event read from '.\mysql-bin.000003' at 123, the last byte read from '.\mysql-bin.000003' at 1700.
        at com.github.shyiko.mysql.binlog.BinaryLogClient.listenForEventPackets(BinaryLogClient.java:949)
        ... 3 common frames omitted
    

    目前也没有找到有效的解决方案,如果有好的解决方案,希望大家指出。


    The End

    相关文章

      网友评论

        本文标题:Flink-CDC(Change Data Capture)指定

        本文链接:https://www.haomeiwen.com/subject/kyhmsktx.html