美文网首页
flink-cdc 读取mysql数据

flink-cdc 读取mysql数据

作者: 远行的夜色 | 来源:发表于2022-07-01 13:37 被阅读0次

    通过flink-cdc的Connector读取mysql数据,并写入到其他系统或者数据库,需要先开启mysql的binlog功能

    1. 导入maven 依赖

     <dependencies>
        <dependency>
          <groupId>mysql</groupId>
          <artifactId>mysql-connector-java</artifactId>
          <version>8.0.19</version>
        </dependency>
     <dependency>
          <groupId>com.alibaba</groupId>
          <artifactId>fastjson</artifactId>
          <version>2.0.6</version>
        </dependency>
        <!-- 引入日志管理相关依赖-->
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-api</artifactId>
          <version>${slf4j.version}</version>
        </dependency>
        <dependency>
          <groupId>org.slf4j</groupId>
          <artifactId>slf4j-log4j12</artifactId>
          <version>${slf4j.version}</version>
        </dependency>
        <dependency>
          <groupId>org.apache.logging.log4j</groupId>
          <artifactId>log4j-to-slf4j</artifactId>
          <version>2.14.0</version>
        </dependency>
        <dependency>
          <groupId>org.projectlombok</groupId>
          <artifactId>lombok</artifactId>
          <version>1.16.18</version>
        </dependency>
        <dependency>
          <groupId>com.alibaba.otter</groupId>
          <artifactId>canal.client</artifactId>
          <version>1.1.6</version>
        </dependency>
        <dependency>
          <groupId>com.ververica</groupId>
          <artifactId>flink-connector-mysql-cdc</artifactId>
          <version>2.2.1</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-streaming-java_2.12</artifactId>
          <version>1.13.6</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-clients_2.12</artifactId>
          <version>1.13.6</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-java</artifactId>
          <version>1.13.6</version>
        </dependency>
        <dependency>
          <groupId>org.apache.flink</groupId>
          <artifactId>flink-table-planner-blink_2.12</artifactId>
          <version>1.13.6</version>
          <type>test-jar</type>
        </dependency>
      </dependencies>
    

    2. 新建Flink-cdc测试类

    import com.ververica.cdc.connectors.mysql.source.MySqlSource;
    import com.ververica.cdc.connectors.mysql.table.StartupOptions;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.commons.lang3.ArrayUtils;
    import org.apache.flink.api.common.eventtime.WatermarkStrategy;
    import org.apache.flink.configuration.Configuration;
    import org.apache.flink.runtime.state.filesystem.FsStateBackend;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    
    @Slf4j
    public class FlinkCDC {
    
    
        public static void main(String[] args) throws Exception {
            MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("127.0.0.1")
                .port(3306)
                .databaseList("user") // set captured database
                .tableList("user.log_info") // set captured table
                .username("root")
                .password("password")
                 // 自定义反序列化方式
                .deserializer(new CustomDeserialization())
                 //           StartupOptions.initial() 先全量后增量
    //            .startupOptions(StartupOptions.initial())
                  //   StartupOptions.latest() 从最新binlog读取,增量方式
                .startupOptions(StartupOptions.latest())
                .build();
    
    
            Configuration config = new Configuration();
    
    //        config.setString("execution.savepoint.path", "file:///D:\\flink\\checkpoints\\cc52b93fd24977e5388f0a19a30d49d2\\chk-87");
            // 启动时设置
            if(ArrayUtils.isNotEmpty(args)) {
                 String lasCheckpointPath =  args[0];
                 // 例如 D:\flink\checkpoints\8bdf5d49bb1b4cda56aaa0a590fc2cef\chk-55
                // 重启服务器指定最新checkpoint路径,从该路径指定checkpoint位置恢复读取数据
                 config.setString("execution.savepoint.path", lasCheckpointPath);
            }
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(config);
    
            // enable checkpoint
            env.enableCheckpointing(3000);
    //      env.getCheckpointConfig().setCheckpointStorage("file:///D:\\flink\\checkpoints");
    // 设置checkpoint保存位置,这里设置为本地文件存储
            env.setStateBackend(new FsStateBackend("file:///D:\\flink\\checkpoints"));
            env
                .fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                // set 4 parallel source tasks
                .setParallelism(4)
                .addSink(new CustomSink()).setParallelism(1);
    //            .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
    
            env.execute("flinkcdc");
        }
    }
    
    

    3.自定义反序列化类

    import com.alibaba.fastjson2.JSONObject;
    import com.ververica.cdc.debezium.DebeziumDeserializationSchema;
    import io.debezium.data.Envelope;
    import java.util.List;
    import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.util.Collector;
    import org.apache.kafka.connect.data.Field;
    import org.apache.kafka.connect.data.Schema;
    import org.apache.kafka.connect.data.Struct;
    import org.apache.kafka.connect.source.SourceRecord;
    
    /**
     * 自定义序列化器
     */
    public class CustomDeserialization implements DebeziumDeserializationSchema<String> {
    
    
    
        @Override
        public void deserialize(SourceRecord sourceRecord, Collector<String> collector)
            throws Exception {
    
            JSONObject res = new JSONObject();
    
            // 获取数据库和表名称
            String topic = sourceRecord.topic();
            String[] fields = topic.split("\\.");
            String database = fields[1];
            String tableName = fields[2];
    
            Struct value = (Struct) sourceRecord.value();
            // 获取before数据
            Struct before = value.getStruct("before");
            JSONObject beforeJson = new JSONObject();
            if (before != null) {
                Schema beforeSchema = before.schema();
                List<Field> beforeFields = beforeSchema.fields();
                for (Field field : beforeFields) {
                    Object beforeValue = before.get(field);
                    beforeJson.put(field.name(), beforeValue);
                }
            }
    
            // 获取after数据
            Struct after = value.getStruct("after");
            JSONObject afterJson = new JSONObject();
            if (after != null) {
                Schema afterSchema = after.schema();
                List<Field> afterFields = afterSchema.fields();
                for (Field field : afterFields) {
                    Object afterValue = after.get(field);
                    afterJson.put(field.name(), afterValue);
                }
            }
    
            //获取操作类型 READ DELETE UPDATE CREATE
            Envelope.Operation operation = Envelope.operationFor(sourceRecord);
            String type = operation.toString().toLowerCase();
            if ("create".equals(type)) {
                type = "insert";
            }
    
            // 将字段写到json对象中
            res.put("database", database);
            res.put("tableName", tableName);
            res.put("before", beforeJson);
            res.put("after", afterJson);
            res.put("type", type);
    
            //输出数据
            collector.collect(res.toString());
        }
    
        @Override
        public TypeInformation<String> getProducedType() {
            return BasicTypeInfo.STRING_TYPE_INFO;
        }
    }
    

    4.自定义Sink输出

    import org.apache.flink.streaming.api.functions.sink.RichSinkFunction;
    
    public class CustomSink extends RichSinkFunction {
    
    
        @Override
        public void invoke(Object value,Context context) throws Exception {
            String v = value.toString();
    
             TableData<LogInfo>  tableData = JSON.parseObject(v,  new 
                TypeReference<TableData<LogInfo>>() {});
               System.out.println(t.toString());
                // TODO 保存到其他系统/中间件(mq等)/其他数据库,同学可以自己根据情况实现
                //  发送到消息队列rabbitmq或者kafaka中处理
               // rabbitmqtemplate.send(tableData );
              //  保存数据库
              //  testMapper.insert(tableData.getAfter());
        }
    }
    
    
    @Data
    @NoArgsConstructor
    @AllArgsConstructor
    public class TableData<T> {
    
        private String database;
    
        private String tableName;
    
        private String update;
    
        private T before;
    
        private T after;
    }
    
    

    相关文章

      网友评论

          本文标题:flink-cdc 读取mysql数据

          本文链接:https://www.haomeiwen.com/subject/yiscbrtx.html