美文网首页
flink cdc oceanbase 数据自定义格式序列化

flink cdc oceanbase 数据自定义格式序列化

作者: 负二贷 | 来源:发表于2023-08-08 18:38 被阅读0次
import com.alibaba.fastjson2.JSON;
import com.alibaba.fastjson2.JSONObject;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseDeserializationSchema;
import com.ververica.cdc.connectors.oceanbase.table.OceanBaseRecord;
import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
import io.debezium.data.Envelope;
import org.apache.commons.lang3.ObjectUtils;
import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.shaded.netty4.io.netty.util.internal.ObjectUtil;
import org.apache.flink.util.Collector;
import org.apache.flink.util.StringUtils;
import org.apache.kafka.connect.data.Field;
import org.apache.kafka.connect.data.Struct;
import org.apache.kafka.connect.source.SourceRecord;

import java.util.Map;
import java.util.Objects;


/**
 * @author majiajue
 * @Title:
 * @Description:
 * @date 2023/8/915:16
 */
public class OceanBaseDeserializer implements OceanBaseDeserializationSchema<String> {

    @Override
    public TypeInformation<String> getProducedType() {
        return BasicTypeInfo.STRING_TYPE_INFO;
    }

    @Override
    public void deserialize(OceanBaseRecord oceanBaseRecord, Collector<String> collector) throws Exception {
        JSONObject jsonObject = new JSONObject();

        //提取数据库名

        String database = oceanBaseRecord.getSourceInfo().getDatabase();;
        //提取表名
        String tableName = oceanBaseRecord.getSourceInfo().getTable();

        //获取after数据
//        Struct afterStruct = oceanBaseRecord.getLogMessageFieldsAfter();
        JSONObject afterJson = new JSONObject();
        //判断是否有after
        if (oceanBaseRecord.getLogMessageFieldsAfter() != null) {
            //遍历oceanBaseRecord.getLogMessageFieldsAfter()
            oceanBaseRecord.getLogMessageFieldsAfter().forEach((k,v)->{
                afterJson.put(k,v);
            });

        }
        JSONObject beforeJson = new JSONObject();
        if(oceanBaseRecord.getLogMessageFieldsBefore() != null){
            oceanBaseRecord.getLogMessageFieldsBefore().forEach((k,v)->{
                beforeJson.put(k,v);
            });
        }
        //获取before数据
//        Struct beforeStruct = value.getStruct("before");
//        JSONObject beforeJson = new JSONObject();
//        //判断是否有before
//        if (beforeStruct != null) {
//            for (Field field : beforeStruct.schema().fields()) {
//                beforeJson.put(field.name(), beforeStruct.get(field));
//            }
//        }

        //获得操作类型 DELETE UPDATE CREATE

        String type ="" ;
        if(Objects.isNull(oceanBaseRecord.getOpt())){
            //适配initial模式
            type = "insert".toUpperCase();
            if(oceanBaseRecord.getJdbcFields()!=null&oceanBaseRecord.getLogMessageFieldsAfter()==null){
                oceanBaseRecord.getJdbcFields().forEach((k,v)->{
                    afterJson.put(k,v);
                });
            }
        }else{
            type = oceanBaseRecord.getOpt().name();
        }
        // if ("create".toUpperCase().equals(type)) {
        //     type = "insert".toUpperCase();
        // }

        //封装数据到JSONObject
        jsonObject.put("database", database);
        jsonObject.put("tableName", tableName);
        jsonObject.put("after", afterJson);
        jsonObject.put("before", beforeJson);
        jsonObject.put("type", type);
        jsonObject.put("ts",oceanBaseRecord.getSourceInfo().getTimestampS());

        collector.collect(jsonObject.toJSONString());

    }
}

在.deserializer(new OceanBaseDeserializer()) 这样使用,这样就会输出json方便后续逻辑处理

相关文章

网友评论

      本文标题:flink cdc oceanbase 数据自定义格式序列化

      本文链接:https://www.haomeiwen.com/subject/vzznpdtx.html