美文网首页Flink人生几何?Flink学习指南
Flink 使用之 CDC 自定义 Deserializatio

Flink 使用之 CDC 自定义 Deserializatio

作者: AlienPaul | 来源:发表于2021-09-15 14:24 被阅读0次

    Flink 使用介绍相关文档目录

    Flink 使用介绍相关文档目录

    背景

    本篇接Flink 使用之 MySQL CDC。在这篇博客,我们解析CDC数据的时候用的是StringDebeziumDeserializationSchema。实际上它仅仅调用接收到SourceRecordtoString()方法,将结果传递到下游。SourceRecord.toString()返回的内容过多,层级较为复杂,给下游的解析造成了很大的困难。

    为了解决这个问题,本篇为大家讲解如何自定义DeserializationSchema

    目标

    通常来说,我们关心的CDC内容包含如下:

    • 数据库名
    • 表名
    • 操作类型(增删改)
    • 变更前数据
    • 变更后数据
    • 其他字段(例如变更时间等,根据业务需要添加)

    下面例子中,我们手工将SourceRecord解析为一个Map类型。

    实现方式

    实现自己的DeserializationSchema,我们可以参考官方给出的StringDebeziumDeserializationSchema的源代码。如下所示:

    public class StringDebeziumDeserializationSchema implements DebeziumDeserializationSchema<String> {
        private static final long serialVersionUID = -3168848963265670603L;
    
        public StringDebeziumDeserializationSchema() {
        }
    
        public void deserialize(SourceRecord record, Collector<String> out) throws Exception {
            out.collect(record.toString());
        }
    
        public TypeInformation<String> getProducedType() {
            return BasicTypeInfo.STRING_TYPE_INFO;
        }
    }
    

    它继承了DebeziumDeserializationSchema,实现了2个方法:

    • deserialize:反序列化的逻辑在此,CDC捕获到的原始数据为SourceRecord,我们将其解析之后,通过Collector收集,即可传递给下游。
    • getProducedType:返回解析之后的数据类型。

    我们自己的解析逻辑只需要实现这个接口,在deserialize完成自己的解析逻辑即可。当然,这个例子中仅仅是为了print看起来直观,将转换后的map转换为String后发往下游。实际生产中建议使用Java Bean,Scala Case Class或者JSON格式。

    完整的代码和关键点讲解如下所示:

    public class CustomDebeziumDeserializer implements DebeziumDeserializationSchema<String> {
        @Override
        public void deserialize(SourceRecord sourceRecord, Collector<String> collector) throws Exception {
            Map<String, Object> parsedObject = new HashMap<>();
    
            // 注意,SourceRecord中并没有获取操作类型的方法。获取操作类型需要这么写
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            parsedObject.put("operation", operation.toString());
    
            // topic返回的内容为:mysql_binlog_source.dbName.tableName,即数据源.数据库名.表名
            // 按照业务使用要求解析即可
            if (null != sourceRecord.topic()) {
                String[] splitTopic = sourceRecord.topic().split("\\.");
                if (splitTopic.length == 3) {
                    parsedObject.put("database", splitTopic[1]);
                    parsedObject.put("table", splitTopic[2]);
                }
            }
    
            // value返回sourceRecord中携带的数据,它是一个Struct类型
            // Struct的类型为org.apache.kafka.connect.data.Struct
            Struct value = (Struct) sourceRecord.value();
            // 变更前后的数据位于value这个Struct中,名称分别为before和after
            Struct before = value.getStruct("before");
            Struct after = value.getStruct("after");
    
            // 对于新增或删除的数据,before和after可能不存在,需要做null检查
            if (null != before) {
                Map<String, Object> beforeMap = new HashMap<>();
                // 获取Struct中包含所有字段名可以使用struct.schema().fields()方法,遍历即可
                Schema beforeSchema = before.schema();
                for (Field field : beforeSchema.fields()) {
                    beforeMap.put(field.name(), before.get(field));
                }
                parsedObject.put("before", beforeMap);
            }
    
            if (null != after) {
                Map<String, Object> afterMap = new HashMap<>();
                Schema afterSchema = after.schema();
                for (Field field : afterSchema.fields()) {
                    afterMap.put(field.name(), after.get(field));
                }
    
                parsedObject.put("after", afterMap);
            }
    
            // 调用collector的collect方法,将转换后的数据发往下游
            collector.collect(parsedObject.toString());
        }
    
        @Override
        public TypeInformation<String> getProducedType() {
            return BasicTypeInfo.STRING_TYPE_INFO;
        }
    }
    

    最后和Flink 使用之 MySQL CDC类似。编写Flink主程序:

    val env = StreamExecutionEnvironment.getExecutionEnvironment
    
    // 使用MySQLSource创建数据源
    // 使用自己编写的CustomDebeziumDeserializer替换掉官方提供的StringDebeziumDeserializationSchema
    val sourceFunction = MySQLSource.builder().hostname("your-ip").port(3306)
        .databaseList("demo").username("root").password("123456")
        .deserializer(new CustomDebeziumDeserializer).build();
    
    // 单并行度打印,避免输出乱序
    env.addSource(sourceFunction).print.setParallelism(1)
    
    env.execute()
    

    相关文章

      网友评论

        本文标题:Flink 使用之 CDC 自定义 Deserializatio

        本文链接:https://www.haomeiwen.com/subject/jxdkgltx.html