美文网首页
Canal 使用

Canal 使用

作者: 万事万物 | 来源:发表于2021-08-04 09:32 被阅读0次

前言

使用cannal之前,需要保证mysql支持binlog功能,以及保证canal已经配置好了。

启动canal

  • 进入canal的安装目录中。
[admin@hadoop102 canal]$ ll
总用量 4
drwxrwxr-x. 2 admin admin 76 7月  31 19:18 bin
drwxrwxr-x. 5 admin admin 93 7月  31 19:49 conf
drwxrwxr-x. 2 admin admin 4096 7月  31 19:18 lib
drwxrwxr-x. 2  admin admin 6 11月 26 2018 logs
  • 执行bin目录下的startup.sh
[admin@hadoop102 canal]$ bin/startup.sh
  • 查看启动进程中是否有CanalLauncher
[admin@hadoop102 canal]$ jps
2662 Jps
2586 CanalLauncher
  • 若启动时出现如下情况
[admin@hadoop102 canal]$ bin/startup.sh
found canal.pid , Please run stop.sh first ,then startup.sh

表示上次启动之后,异常退出导致canal.pid依旧存在,需要先执行stop.sh命令。

[admin@hadoop102 canal]$ bin/startup.sh
hadoop102: stopping canal 2586 ...

参考:https://github.com/alibaba/canal/wiki/AdminGuide

停止canal

  • 进入canal的安装目录中。
  • 进入canal的安装目录中。
[admin@hadoop102 canal]$ ll
总用量 4
drwxrwxr-x. 2 admin admin 76 7月  31 19:18 bin
drwxrwxr-x. 5 admin admin 93 7月  31 19:49 conf
drwxrwxr-x. 2 admin admin 4096 7月  31 19:18 lib
drwxrwxr-x. 2  admin admin 6 11月 26 2018 logs
  • 执行bin目录下的startup.sh
[admin@hadoop102 canal]$ bin/startup.sh

日志查看

cd logs 中,

  • canal:
    这是有关canal的日志,若启动canal失败,可以再次看看日志
  • example:
    这是有关mysql实例的日志

需求

监听mysql数据库写操作变化,从canal中获取该数据,并将数据推送到Kafka中。
案例:比如使用canal监听一张用户表,每新增一个用户,canal便会从user表中获取该用户信息,这样我们就可以直接获取新增的用户信息。

疑问

为什么需要需要使用canal?直接从mysql中取不信吗?答案:不行,获取数据的目的就是用于计算,假设mysql有一万条用户信息,我需要统计男女人数,并且用户数据还是不停累加的。比如平均以每分钟1000个用户量递增。
方案一:每新增一批,就统计一次(一万条数据时统一一次,一万一千条是再统计一次,以此类推)
方案二:先统计一万条数据中的男女人数,新增一千条时再从一千条中再统计一次,结果和上一个结果相加,以此类推。
显然第二种方案效率更高,canal的出现便更好的帮我们解决了该问题。

创建一个maven 项目

参考:https://github.com/alibaba/canal/wiki/ClientExample

  • 导入依赖
       <!-- canal 相关的APi -->
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.2</version>
        </dependency>
       <!-- kafka 相关的APi -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>2.4.1</version>
        </dependency>
  • 编写一个CanalClient程序
    便于从canal中获取数据

  • 步骤

  1. 创建一个客服端对象
    需要使用到CanalConnector,它时一个接口,所以需要指定它的子类
    CanalConnector子类
    -SimpleCanalConnector:单点模式
    --ClusterCanalConnector:集群模式,创建客户端,自带故障转移,要求canal server必须是一个HA的,如果当前连接的active的 cnanl server 挂了,自动重新连接。
    -CanalMQConnector:用于一些中间件的连接接口
    --KafkaCanalConnector:用于将数据写入Kafka中
    --RocketMQCanalConnector:用于将数据写入RocketMQ中

如何创建连接?canal提供一个工具类com.alibaba.otter.canal.client.CanalConnectors,专门用于创建CanalConnector客户端

        // 创建链接
        String hostname="hadoop102"; //主机名 参考 canal.properties中的 canal.ip
        int port=11111;// 端口号 canal.properties中的 canal.port
        InetSocketAddress socketAddress = new InetSocketAddress(hostname,port);

        /**
         * SocketAddress address,  连接 canal server的主机名和端口号,参考 canal.properties
         * String destination,参考 canal.properties  中的 canal.destinations,
         *        example :是指instance.properties 的目录名就是 example
         * String username, 不需要写
         * String password 不需要写
         *  官方解释: https://github.com/alibaba/canal/wiki/AdminGuide
         *  canal.user   canal数据端口订阅的ACL配置 (v1.1.4新增) 如果为空,代表不开启 无
         *  canal.passwd canal数据端口订阅的ACL配置 (v1.1.4新增) 如果为空,代表不开启
         */
        CanalConnector connector = CanalConnectors.newSingleConnector(socketAddress, "example", null, null);
  1. 使用客服端对象连接canal,调用connector.connect()方法即可,没有返回值
        // 使用客服端对象连接`canal`
        connector.connect();
  1. 订阅,调用connector.subscribe("库名.表名");
        // 订阅数据表
        connector.subscribe("gmallrealtime0323.order_info");
  1. 拉取数据,调用connector.get()
    Message get(int batchSize):获取数据,自动进行确认,该方法返回的条件:尝试拿batchSize条记录,有多少取多少,不会阻塞等待
    Message get(int batchSize, Long timeout, TimeUnit unit):获取数据,自动进行确认,该方法返回的条件:
    a. 拿够batchSize条记录或者超过timeout时间
    b. 如果timeout=0,则阻塞至拿到batchSize记录才返回

这里使用第一种,拉取一批次数据

// 拉取数据
Message message = connector.get(100);

当前完整代码

public static void main(String[] args) {
        // 创建链接
        String hostname="hadoop102"; //主机名 参考 canal.properties中的 canal.ip
        int port=11111;// 端口号 canal.properties中的 canal.port
        InetSocketAddress socketAddress = new InetSocketAddress(hostname,port);

        /**
         * SocketAddress address,  连接 canal server的主机名和端口号,参考 canal.properties
         * String destination,参考 canal.properties  中的 canal.destinations,
         *        example :是指instance.properties 的目录名就是 example
         * String username, 不需要写
         * String password 不需要写
         *  官方解释: https://github.com/alibaba/canal/wiki/AdminGuide
         *  canal.user   canal数据端口订阅的ACL配置 (v1.1.4新增) 如果为空,代表不开启 无
         *  canal.passwd canal数据端口订阅的ACL配置 (v1.1.4新增) 如果为空,代表不开启
         */
        CanalConnector connector = CanalConnectors.newSingleConnector(socketAddress, "example", null, null);

        // 使用客服端对象连接`canal`
        connector.connect();

        // 订阅数据表
        connector.subscribe("gmallrealtime0323.order_info");


        // 拉取数据
        Message message = connector.get(100);
        System.out.println(message);


    }
  • 运行结果(消息太大,你忍一下),就不贴出来了。

  • 接下里看看如何理解Message

  1. Message 代表我们拉取的一批数据。
  2. Message 的一些属性
private long id;
private List<Entry> entries = new ArrayList(); //Entry 代表一条写操作的sql,造成的数据变化
private boolean raw = true;
private List<ByteString> rawEntries = new ArrayList();
  1. Entry :记录写操作的sql,造成的数据变化
    所谓写操作就是除了select 语句外的sql语句。如(insert、update、delete)。
    比如 insert 影响了10行数据,update影响了10行数据,那么Message会将该两个操作(insert、update)分别包装到Entry中,也就是说List<Entry>中会存有两个Entry
    查看下面的图


    Message:上面介绍过,一次canal从日志中抓取的信息,一个message可以包含多个sql执行的结果。
    Entry:上面介绍过,对应一个sql命令,一个sql可能会对多行记录造成影响。
    Tablename:执行sql影响到的表
    EntryType:表示当前sql是什么类型的lsql语句,insert语句或update语句等。注意 bigin(开启事务)、commit(提交事务)也是属于写操作。
    StoreValue:若当前EntryTypeROWDATA,表示会影响数据的变化,那么会影响那么数据变化呢?它会采用StoreValue进行记录。
    RowChange:StoreValue是一个序列化对象,不能直接使用,所以我们需要使用RowChange进行反序列化,RowChange对象就表示一条sql引起的数据变化。
    EevntType:用于判断当前sql是什么类型的。
    RowDataList:一条sql可能会引发多行数据变化的,RowDataList便存储了引发数据变化的多行。
    RowData:每个RowData就代表一行数据。
    column:一行数据有多个列。
  2. 如何知道当前的EntryType是什么呢?canal 定义了没有枚举类

com.alibaba.otter.canal.protocol.CanalEntry.EntryType

他就是说明哪些类型,分别代表什么意思

TRANSACTIONBEGIN : 表示 bigin  开启事务
ROWDATA:普通的写操作(insert、update、delete 统称为 ROWDATA,是会引起数据变化的sql)
TRANSACTIONEND:commit 提交事务
HEARTBEAT 心跳连接请求
GTIDLOG(4, 5);
  1. CanalEntry.Header 会记录表名信息

  2. ByteString storeValue_:记录当前ROWDATA 类型的sql,引起的数据变化。
    ByteString 是一个二进制类型,需要转换成RowChange类型。

  3. 剩下的概念就在代码中介绍吧,我们接着Message往下写。

当然我们从Message打印中也能看到以上说的信息

// 有数据 id =1,没数据 id=-1 
Message[id=1,entries=[header { 
  version: 1
  logfileName: "mysql-bin.000001"
  logfileOffset: 219
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1627740174000
  sourceType: MYSQL
  schemaName: ""
  tableName: ""
  eventLength: 68
}
entryType: TRANSACTIONBEGIN  // 开启事务
storeValue: " \005"
, header {
  version: 1
  logfileName: "mysql-bin.000001"
  logfileOffset: 352
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1627740174000
  sourceType: MYSQL
  schemaName: "demo"
  tableName: "employees" # 表名
  eventLength: 95
  eventType: UPDATE
  props {
    key: "rowsCount"
    value: "1"
  }
}
entryType: ROWDATA // insert、update、delete 语句
storeValue: "\bl\020\002P\000b\235\003\n\032\b\000\020\004\032\002id" // 存储的值,是一个二进制的,太长了,我就清空了‘
, header {
  version: 1
  logfileName: "mysql-bin.000001"
  logfileOffset: 447
  serverId: 1
  serverenCode: "UTF-8"
  executeTime: 1627740174000
  sourceType: MYSQL
  schemaName: ""
  tableName: ""
  eventLength: 31
}

完善

整理一下:

  1. 创建一个maven项目
  2. 获取canal server 服务
  3. 通过``canal server获取CanalConnector`对象
  4. 通过CanalConnector成功获取Message并打印
  5. 介绍了Message 的组成架构。

接下来的工作就是从Message 获取我们先要的数据。我们需要一直监控着canal拉取数据,所以需要不停的请求

 // 不停的拉取数据
        while (true){
            Message message = connector.get(100);
            System.out.println(message);
        }

若拉取到没有数据时(如下)。

Message[id=-1,entries=[],raw=false,rawEntries=[]]
Message[id=-1,entries=[],raw=false,rawEntries=[]]

应该暂停一下,等待一会再请求,所以使用Thread.sleep(3000);休眠3秒钟再继续

        // 不停的拉取数据
        while (true){
            Message message = connector.get(100);
            if (message.getId()==-1){
                try {
                    Thread.sleep(3000);
                    // 终止本次循环,执行下次循环
                    continue;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            System.out.println(message);
        }

接下来:开始解析数据,我们只要ROWDATA类型的数据,使用 java8 stram 进行过滤。

//解析 只要 ROWDATA 类型的数据
 List<CanalEntry.Entry> entryList = message.getEntries()
                    .stream()
                    .filter(entry -> entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA))
                    .collect(Collectors.toList());

接下里我们就应该从ROWDATA数据类型中获取StoreValue

 entryList.stream().forEach(entry -> {
    // 获取 storeValue
    ByteString storeValue = entry.getStoreValue();

    //解析json 并将其转换成 List<JSONObject>
    List<JSONObject>  list= analysisStoreValue(storeValue);
    System.out.println(list);
 });

通过StoreValue 将数据转成成json串。单独封装成了一个方法analysisStoreValue

    /**
     * 解析 storeValue  封装为 json
     * @param storeValue
     */
    public static List<JSONObject>  analysisStoreValue(ByteString storeValue){

        try {
            // storeValue 是一个二进制序列化值,所以需要将其反序列化成 RowChange
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);

            // 这里只需要获取insert sql的语句数据
            if(rowChange.getEventType().equals(CanalEntry.EventType.INSERT)){
                // 获取表中所有的行
                List<CanalEntry.RowData> rows = rowChange.getRowDatasList();

                // 通过行获取所有的列
               return rows.stream().map(row->{
                    List<CanalEntry.Column> columns = row.getAfterColumnsList();
                    // 将每行数据包装成json
                    JSONObject result=new JSONObject();
                    columns.forEach(column -> result.put(column.getName(),column.getValue()));
                    return result;
                }).collect(Collectors.toList());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }

        return null;
    }

最后输出结果,这就是每个sql的执行结果(5个insertsql语句)

[{"payment_way":"2","delivery_address":"JYSrezirkbAySRFAOqpA","consignee":"OVnBSI","create_time":"2021-08-01 13:42:20","order_comment":"IPaJfJeuLtNHWObRhhrk","expire_time":"","order_status":"1","out_trade_no":"9397945080","tracking_no":"","total_amount":"498.0","user_id":"1","img_url":"","province_id":"6","consignee_tel":"13792449126","trade_body":"","id":"1","parent_order_id":"","operate_time":""}]
[{"payment_way":"1","delivery_address":"sFXYagEEgSWfKkVXbptY","consignee":"FRuwYB","create_time":"2021-08-01 06:48:37","order_comment":"MRUarJvbSmFsUvwUjnJK","expire_time":"","order_status":"1","out_trade_no":"4961868146","tracking_no":"","total_amount":"438.0","user_id":"2","img_url":"","province_id":"5","consignee_tel":"13728675675","trade_body":"","id":"2","parent_order_id":"","operate_time":""}]
[{"payment_way":"1","delivery_address":"drzwLnhVjjrgHPdXCTNg","consignee":"HahHQg","create_time":"2021-08-01 18:57:58","order_comment":"suWwwQPBnKLAwDCaxjCl","expire_time":"","order_status":"2","out_trade_no":"4720570830","tracking_no":"","total_amount":"251.0","user_id":"1","img_url":"","province_id":"8","consignee_tel":"13164210619","trade_body":"","id":"3","parent_order_id":"","operate_time":"2021-08-01 19:22:34"}]
[{"payment_way":"1","delivery_address":"uLwYDUQvCDjjuwXzFAOt","consignee":"wOFLME","create_time":"2021-08-01 04:06:48","order_comment":"zsrFakbCIJtShgkHJkga","expire_time":"","order_status":"1","out_trade_no":"6377533572","tracking_no":"","total_amount":"578.0","user_id":"1","img_url":"","province_id":"7","consignee_tel":"13657270001","trade_body":"","id":"4","parent_order_id":"","operate_time":""}]
[{"payment_way":"1","delivery_address":"cnhZyvFRqhLhJJqJwflP","consignee":"MtJZdv","create_time":"2021-08-01 20:40:49","order_comment":"txgpgKgzfdbUvrAyUcCD","expire_time":"","order_status":"1","out_trade_no":"2390154344","tracking_no":"","total_amount":"469.0","user_id":"2","img_url":"","province_id":"8","consignee_tel":"13887020238","trade_body":"","id":"5","parent_order_id":"","operate_time":""}]

该数据最终是要推送到Kafka中,所有,最好的方式就是将json集合合成一个集合中。

// 拆分所有集合
List<JSONObject> result = entryList.stream()
       .flatMap(entry -> analysisStoreValue(entry.getStoreValue()).stream())
       .filter(e-> e!=null && !e.isEmpty())
       .collect(Collectors.toList());

       System.out.println(result);

数据结果

[{"payment_way":"1","delivery_address":"PVgWoDbxivHYXQkCgYAI","consignee":"bhFEia","create_time":"2021-08-01 00:19:29","order_comment":"MMygkJALdkNmTIKztsKv","expire_time":"","order_status":"1","out_trade_no":"9606170154","tracking_no":"","total_amount":"576.0","user_id":"1","img_url":"","province_id":"1","consignee_tel":"13567887322","trade_body":"","id":"1","parent_order_id":"","operate_time":""}, 
{"payment_way":"2","delivery_address":"ZEZjWgPIVCbxkHIsNNzj","consignee":"yqkdMz","create_time":"2021-08-01 13:21:03","order_comment":"yOzghuykDnCbyqfEHvhz","expire_time":"","order_status":"1","out_trade_no":"0773742642","tracking_no":"","total_amount":"170.0","user_id":"2","img_url":"","province_id":"4","consignee_tel":"13265798120","trade_body":"","id":"2","parent_order_id":"","operate_time":""},
{"payment_way":"2","delivery_address":"MJWLPUatUpPfiAcKsKwc","consignee":"QYuemX","create_time":"2021-08-01 19:13:23","order_comment":"VwxWsWEgVkoPhpcsHiUX","expire_time":"","order_status":"1","out_trade_no":"0550616794","tracking_no":"","total_amount":"971.0","user_id":"2","img_url":"","province_id":"1","consignee_tel":"13049382676","trade_body":"","id":"3","parent_order_id":"","operate_time":""}, 
{"payment_way":"1","delivery_address":"oiWkmEjjtooEdDFuepkf","consignee":"sUwybN","create_time":"2021-08-01 06:41:31","order_comment":"XwqpDVTFvmZlheXBQyWk","expire_time":"","order_status":"2","out_trade_no":"2591907530","tracking_no":"","total_amount":"229.0","user_id":"2","img_url":"","province_id":"1","consignee_tel":"13736224357","trade_body":"","id":"4","parent_order_id":"","operate_time":"2021-08-01 07:18:40"}, 
{"payment_way":"1","delivery_address":"MzlHFbtPTWdCAWiXrOTe","consignee":"pOaLAx","create_time":"2021-08-01 21:04:55","order_comment":"PLlYhQITtmgTYlmCYmpN","expire_time":"","order_status":"2","out_trade_no":"9277374270","tracking_no":"","total_amount":"542.0","user_id":"2","img_url":"","province_id":"5","consignee_tel":"13603620396","trade_body":"","id":"5","parent_order_id":"","operate_time":"2021-08-01 21:20:07"}]

**当前完整代码 **

public class CanalClient {

    public static void main(String[] args) {
        // 创建链接
        String hostname="hadoop102"; //主机名 参考 canal.properties中的 canal.ip
        int port=11111;// 端口号 canal.properties中的 canal.port
        InetSocketAddress socketAddress = new InetSocketAddress(hostname,port);

        /**
         * SocketAddress address,  连接 canal server的主机名和端口号,参考 canal.properties
         * String destination,参考 canal.properties  中的 canal.destinations,
         *        example :是指instance.properties 的目录名就是 example
         * String username, 不需要写
         * String password 不需要写
         *  官方解释: https://github.com/alibaba/canal/wiki/AdminGuide
         *  canal.user   canal数据端口订阅的ACL配置 (v1.1.4新增) 如果为空,代表不开启 无
         *  canal.passwd canal数据端口订阅的ACL配置 (v1.1.4新增) 如果为空,代表不开启
         */
        CanalConnector connector = CanalConnectors.newSingleConnector(socketAddress, "example", null, null);

        // 使用客服端对象连接`canal`
        connector.connect();

        // 订阅数据表
        connector.subscribe("gmallrealtime0323.order_info");


        // 不停的拉取数据
        while (true){
            Message message = connector.get(100);
            if (message.getId()==-1){
                try {
                    Thread.sleep(3000);
                    // 终止本次循环,执行下次循环
                    continue;
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }

            //解析 只要 ROWDATA 类型的数据
            List<CanalEntry.Entry> entryList = message.getEntries()
                    .stream()
                    .filter(entry -> entry.getEntryType().equals(CanalEntry.EntryType.ROWDATA))
                    .collect(Collectors.toList());


            // 拆分所有集合
            List<JSONObject> result = entryList.stream()
                    .flatMap(entry -> analysisStoreValue(entry.getStoreValue()).stream())
                    .filter(e-> e!=null && !e.isEmpty())
                    .collect(Collectors.toList());


            System.out.println(result);
        }



    }

    /**
     * 解析 storeValue  封装为 json
     * @param storeValue
     */
    public static List<JSONObject>  analysisStoreValue(ByteString storeValue){

        try {
            // storeValue 是一个二进制序列化值,所以需要将其反序列化成 RowChange
            CanalEntry.RowChange rowChange = CanalEntry.RowChange.parseFrom(storeValue);

            // 这里只需要获取insert sql的语句数据
            if(rowChange.getEventType().equals(CanalEntry.EventType.INSERT)){
                // 获取表中所有的行
                List<CanalEntry.RowData> rows = rowChange.getRowDatasList();

                // 通过行获取所有的列
               return rows.stream().map(row->{
                    List<CanalEntry.Column> columns = row.getAfterColumnsList();
                    // 将每行数据包装成json
                    JSONObject result=new JSONObject();
                    columns.forEach(column -> result.put(column.getName(),column.getValue()));
                    return result;
                }).collect(Collectors.toList());
            }
        } catch (InvalidProtocolBufferException e) {
            e.printStackTrace();
        }
        List<JSONObject> result = new ArrayList<>();
        return result;
    }
}

写入Kafka

  • 创建topic
 bin]$ sh kafka-topics.sh --bootstrap-server hadoop102:9092 hadoop103:9092 hadoop104:9092 --create --partitions 3 --replication-factor 2 --topic gmall-order-info

--bootstrap-server Kafka集群
--create 表名是这是创建 Kafka
--partitions 分区
--replication-factor 副本
--topic topic名称

  • 创建一个Produce
/**
 * Kafka客户端
 * @author admin
 * @date 2021/8/1
 */
public class KafkaClient {

    private static Producer producer;

    static {
         producer = getProducer();
    }

    /**
     * 创建生产者
     * @return
     */
    public static Producer getProducer(){
        // 配置,具体要配置什么 参考  org.apache.kafka.clients.producer.ProducerConfig
        Properties properties=new Properties();
        // kafka集群地址
        properties.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG,"hadoop102:9092,hadoop103:9092,hadoop104:9092");
        // 字符序列化方式
        String stringSerializer="org.apache.kafka.common.serialization.StringSerializer";
        // key序列化类型
        properties.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,stringSerializer);
        // value序列化类型
        properties.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,stringSerializer);

        // 创建
        KafkaProducer<String, String> kafkaProducer = new KafkaProducer(properties);

        //返回
        return kafkaProducer;
    }

    /**
     * 生产数据
     * @param topic 主题
     * @param value 参数值
     */
    public static Future sendData( String topic, JSONObject value){
        ProducerRecord<String,String> record=new ProducerRecord(topic,value.toJSONString());
        return producer.send(record);
    }

}
  • 将数据写入 fafka
   /**
     * 将数据写入Kafka
     * @param results
     */
    public static void sendToKafka(List<JSONObject> results){
        results.forEach(json->KafkaClient.sendData(CustomConstant.GMALL_ORDER_INFO,json));
    }
  • 启动消费者监听gmall-order-info主题
[admin@hadoop102 bin]$ sh kafka-console-consumer.sh --bootstrap-server hadoop102:9092 hadoop103:9092 hadoop104:9092 --topic gmall-order-info
  • order_info表新增数据,并查看消费情况
[admin@hadoop102 bin]$ sh kafka-console-consumer.sh --bootstrap-server hadoop102:9092 hadoop103:9092 hadoop104:9092 --topic gmall-order-info
{"payment_way":"1","delivery_address":"daQbNIdsFVPkJzEwxWpJ","consignee":"kKXIve","create_time":"2021-08-01 22:53:19","order_comment":"DNgrqBLaTtnhYudhAeXy","expire_time":"","order_status":"1","out_trade_no":"5279649775","tracking_no":"","total_amount":"285.0","user_id":"1","img_url":"","province_id":"4","consignee_tel":"13168096420","trade_body":"","id":"1","parent_order_id":"","operate_time":""}
{"payment_way":"1","delivery_address":"vScIgIXOdWANmWXYZdqu","consignee":"owrvbW","create_time":"2021-08-01 08:13:40","order_comment":"ccdmZnqNVmAOuJYYXPjz","expire_time":"","order_status":"2","out_trade_no":"9194355258","tracking_no":"","total_amount":"122.0","user_id":"2","img_url":"","province_id":"9","consignee_tel":"13505537925","trade_body":"","id":"2","parent_order_id":"","operate_time":"2021-08-01 09:12:56"}
{"payment_way":"2","delivery_address":"KJdxaMHafCsiLervaSmH","consignee":"uMDFpI","create_time":"2021-08-01 06:35:35","order_comment":"yxRPzfbPTcFVKNJiOtAy","expire_time":"","order_status":"1","out_trade_no":"8849195680","tracking_no":"","total_amount":"423.0","user_id":"1","img_url":"","province_id":"9","consignee_tel":"13413107389","trade_body":"","id":"3","parent_order_id":"","operate_time":""}
{"payment_way":"2","delivery_address":"fqliirkavbTrbhFHtXJo","consignee":"KqxstS","create_time":"2021-08-01 01:59:45","order_comment":"zJKESzTXhQJYQirmidSe","expire_time":"","order_status":"2","out_trade_no":"8940133587","tracking_no":"","total_amount":"768.0","user_id":"1","img_url":"","province_id":"2","consignee_tel":"13113356625","trade_body":"","id":"4","parent_order_id":"","operate_time":"2021-08-01 02:43:59"}
{"payment_way":"2","delivery_address":"JsMELTjoUEnBUSJBQxRP","consignee":"wdZOXs","create_time":"2021-08-01 18:13:50","order_comment":"AfcUqXPhqgIVHyvJiOuH","expire_time":"","order_status":"1","out_trade_no":"0155110810","tracking_no":"","total_amount":"464.0","user_id":"2","img_url":"","province_id":"2","consignee_tel":"13982863828","trade_body":"","id":"5","parent_order_id":"","operate_time":""}

最后

关于Canal的实战项目就完成了,也许看起来比较乱,所以大家还是挑重要点看吧。

补充

 // 通过行获取所有的列
               return rows.stream().map(row->{
                    List<CanalEntry.Column> columns = row.getAfterColumnsList();
                    // 将每行数据包装成json
                    JSONObject result=new JSONObject();
                    columns.forEach(column -> result.put(column.getName(),column.getValue()));
                    return result;
                }).collect(Collectors.toList());

除了使用row.getAfterColumnsList(); 获取List<CanalEntry.Column> 还可以通过row.getBeforeColumnsList()方式获取

row.getAfterColumnsList() 与 row.getBeforeColumnsList() 有什么区别呢?
row.getAfterColumnsList():表示用于获取数据执行变化后的结果,案例中提到,当前主要用于获取insert语句的新增数据(如下

if(rowChange.getEventType().equals(CanalEntry.EventType.INSERT)){

所以无法获取新增前的数据结果,也就无法使用row.getBeforeColumnsList()的方式。
row.getBeforeColumnsList():主要用于updatedelete语句,比如获取修改前的数据,就可以用row.getBeforeColumnsList(),获取修改后的数据则使用row.getAfterColumnsList(),当然delete只能使用row.getBeforeColumnsList()

相关文章

网友评论

      本文标题:Canal 使用

      本文链接:https://www.haomeiwen.com/subject/ldfrvltx.html