美文网首页
canal学习

canal学习

作者: 笨手笨脚越 | 来源:发表于2022-03-07 14:37 被阅读0次

    [toc]

    canal是什么

    canal是一个伪装成slave订阅mysql的binlog,实现数据同步的中间件。

    中文文档 https://www.wenjiangs.com/doc/canal-introduction

    官网 https://github.com/alibaba/canal

    工作原理

    image.png

    1.canal模拟mysql slave的交互协议,伪装自己为mysql slave,向mysql master发送dump协议
    2.mysql master收到dump请求,开始推送binary log给slave(也就是canal)
    3.canal解析binary log对象(原始为byte流)

    架构

    image.png

    说明:

    • server代表一个canal运行实例,对应于一个jvm
    • instance对应于一个数据队列 (1个server对应1..n个instance)

    instance模块:

    • eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
    • eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
    • eventStore (数据存储)
    • metaManager (增量订阅&消费信息管理器)

    安装

    1.下载安装包

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz
    
    tar -xvf canal.deployer-1.1.5.tar.gz 
    

    2.mysql开启binlog

    修改my.cnf

    [mysqld]
    pid-file        = /var/run/mysqld/mysqld.pid
    socket          = /var/run/mysqld/mysqld.sock
    datadir         = /var/lib/mysql
    secure-file-priv= NULL
    log-bin=mysql-bin 
    binlog-format=ROW 
    server_id=1 
    

    binlog是row模式

    重启后,执行sql指令show variables like '%log_bin%'

    image.png

    3.创建mysql的canal用户

    mysql> CREATE USER 'canal'@'localhost' IDENTIFIED BY 'canal';
    Query OK, 0 rows affected (0.00 sec)
    mysql> GRANT ALL PRIVILEGES ON *.* TO 'canal'@'localhost' WITH GRANT OPTION;
    Query OK, 0 rows affected (0.01 sec)
    mysql> CREATE USER 'canal'@'%' IDENTIFIED BY 'canal';
    Query OK, 0 rows affected (0.00 sec)
    mysql> GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' WITH GRANT OPTION;
    Query OK, 0 rows affected (0.00 sec)
    mysql> flush privileges;
    Query OK, 0 rows affected (0.00 sec)
    

    4.修改canal配置文件

    文件夹/root/canal/conf/有一个example文件夹,一个example就代表一个instance实例

    vi /root/canal/conf/example/instance.properties
    #################################################
    # 定义mysql slave的id
    canal.instance.mysql.slaveId=1234
    # 填写数据库ip:端口
    canal.instance.master.address=192.168.10.27:3306
    # 填写数据库username/password
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    #################################################
    

    5.启动、关闭、重启canal

    cd /root/canal/bin
    sh startup.sh 
    sh stop.sh
    sh restart.sh  
    

    6.相关日志

    /root/canal/logs/canal/canal.log
    /root/canal/logs/example/example.log
    

    java代码读取binlog同步到redis

    1.添加依赖

    <dependency>
        <groupId>com.alibaba.otter</groupId>
        <artifactId>canal.client</artifactId>
        <version>1.1.0</version>
    </dependency>
    

    2.RedisUtil

    package com.wangyue.study.canal;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    
    public class RedisUtil {
    
        // Redis服务器IP
        private static String ADDR = "192.168.10.27";
    
        // Redis的端口号
        private static int PORT = 6379;
    
        // 访问密码
        private static String AUTH = "hxcx123!@#";
    
        // 可用连接实例的最大数目,默认值为8;
        // 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
        private static int MAX_ACTIVE = 1024;
    
        // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例,默认值也是8。
        private static int MAX_IDLE = 200;
    
        // 等待可用连接的最大时间,单位毫秒,默认值为-1,表示永不超时。如果超过等待时间,则直接抛出JedisConnectionException;
        private static int MAX_WAIT = 10000;
    
        // 过期时间
        protected static int  expireTime = 660 * 660 *24;
    
        // 连接池
        protected static JedisPool pool;
    
        /**
         * 静态代码,只在初次调用一次
         */
        static {
            JedisPoolConfig config = new JedisPoolConfig();
            //最大连接数
            config.setMaxTotal(MAX_ACTIVE);
            //最多空闲实例
            config.setMaxIdle(MAX_IDLE);
            //超时时间
            config.setMaxWaitMillis(MAX_WAIT);
            //
            config.setTestOnBorrow(false);
            pool = new JedisPool(config, ADDR, PORT, 1000, AUTH, 3);
        }
    
        /**
         * 获取jedis实例
         */
        protected static synchronized Jedis getJedis() {
            Jedis jedis = null;
            try {
                jedis = pool.getResource();
            } catch (Exception e) {
                e.printStackTrace();
            }
            return jedis;
        }
    
        /**
         * 释放jedis资源
         *
         * @param jedis
         * @param isBroken
         */
        protected static void closeResource(Jedis jedis, boolean isBroken) {
            return;
        }
    
        /**
         *  是否存在key
         *
         * @param key
         */
        public static boolean existKey(String key) {
            Jedis jedis = null;
            boolean isBroken = false;
            try {
                jedis = getJedis();
                jedis.select(0);
                return jedis.exists(key);
            } catch (Exception e) {
                isBroken = true;
            } finally {
                closeResource(jedis, isBroken);
            }
            return false;
        }
    
        /**
         *  删除key
         *
         * @param key
         */
        public static void delKey(String key) {
            Jedis jedis = null;
            boolean isBroken = false;
            try {
                jedis = getJedis();
                jedis.select(0);
                jedis.del(key);
            } catch (Exception e) {
                isBroken = true;
            } finally {
                closeResource(jedis, isBroken);
            }
        }
    
        /**
         *  取得key的值
         *
         * @param key
         */
        public static String stringGet(String key) {
            Jedis jedis = null;
            boolean isBroken = false;
            String lastVal = null;
            try {
                jedis = getJedis();
                jedis.select(0);
                lastVal = jedis.get(key);
                jedis.expire(key, expireTime);
            } catch (Exception e) {
                isBroken = true;
            } finally {
                closeResource(jedis, isBroken);
            }
            return lastVal;
        }
    
        /**
         *  添加string数据
         *
         * @param key
         * @param value
         */
        public static String stringSet(String key, String value) {
            Jedis jedis = null;
            boolean isBroken = false;
            String lastVal = null;
            try {
                jedis = getJedis();
                jedis.select(0);
                lastVal = jedis.set(key, value);
                jedis.expire(key, expireTime);
            } catch (Exception e) {
                e.printStackTrace();
                isBroken = true;
            } finally {
                closeResource(jedis, isBroken);
            }
            return lastVal;
        }
    
        /**
         *  添加hash数据
         *
         * @param key
         * @param field
         * @param value
         */
        public static void hashSet(String key, String field, String value) {
            boolean isBroken = false;
            Jedis jedis = null;
            try {
                jedis = getJedis();
                if (jedis != null) {
                    jedis.select(0);
                    jedis.hset(key, field, value);
                    jedis.expire(key, expireTime);
                }
            } catch (Exception e) {
                isBroken = true;
            } finally {
                closeResource(jedis, isBroken);
            }
        }
    
    }
    

    3.CanalTest

    package com.wangyue.study.canal;
    
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry.*;
    import com.alibaba.otter.canal.protocol.Message;
    
    import java.util.List;
    
    
    public class CanalTest {
    
    
        public static void main(String args[]) {
            // 创建链接
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.10.27", 11111),
                    "example", "canal", "canal");
            int batchSize = 1000;
            int emptyCount = 0;
            try {
                connector.connect();
                connector.subscribe(".*\\..*");
                connector.rollback();
                int totalEmptyCount = 120;
                while (emptyCount < totalEmptyCount) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
                        emptyCount++;
                        System.out.println("empty count : " + emptyCount);
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                        }
                    } else {
                        emptyCount = 0;
                        // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                        printEntry(message.getEntries());
                    }
    
                    connector.ack(batchId); // 提交确认
                    // connector.rollback(batchId); // 处理失败, 回滚数据
                }
    
                System.out.println("empty too many times, exit");
            } finally {
                connector.disconnect();
            }
        }
    
        private static void printEntry(List<Entry> entrys) {
            for (Entry entry : entrys) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    continue;
                }
    
                RowChange rowChage = null;
                try {
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                            e);
                }
    
                EventType eventType = rowChage.getEventType();
                System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                        eventType));
    
                for (RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == EventType.DELETE) {
                        redisDelete(rowData.getBeforeColumnsList());
                    } else if (eventType == EventType.INSERT) {
                        redisInsert(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("-------> before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------> after");
                        redisUpdate(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    
        private static void printColumn(List<Column> columns) {
            for (Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            }
        }
    
        private static void redisInsert( List<Column> columns){
            JSONObject json=new JSONObject();
            for (Column column : columns) {
                json.put(column.getName(), column.getValue());
            }
            if(columns.size()>0){
                RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
            }
        }
    
        private static  void redisUpdate( List<Column> columns){
            JSONObject json=new JSONObject();
            for (Column column : columns) {
                json.put(column.getName(), column.getValue());
            }
            if(columns.size()>0){
                RedisUtil.stringSet("user:"+ columns.get(0).getValue(),json.toJSONString());
            }
        }
    
        private static  void redisDelete( List<Column> columns){
            JSONObject json=new JSONObject();
            for (Column column : columns) {
                json.put(column.getName(), column.getValue());
            }
            if(columns.size()>0){
                RedisUtil.delKey("user:"+ columns.get(0).getValue());
            }
        }
    
    }
    
    
    

    运行后,在mysql数据库里修改数据保存

    控制台结果:


    image.png

    canal集群搭建

    安装zookeeper

    !!这里注意下,不要使用zookeeper的高版本,可能会出现启动失败的情况,Starting zookeeper ... FAILED TO START

    1.下载解压

    wget https://archive.apache.org/dist/zookeeper/zookeeper-3.4.9/zookeeper-3.4.9.tar.gz
    
    tar -xvf zookeeper-3.4.9.tar.gz 
    
    

    2.修改配置

    cd zookeeper-3.4.9/conf
    
    cp zoo_sample.cfg zoo.cfg
    
    vi zoo.cfg
    ############################
    #设置数据存储位置
    dataDir=/root/zookeeper/data
    ############################
    
    1. 启动/重启/关闭
    ./zkServer.sh start
    
    ./zkServer.sh restart
    
    ./zkServer.sh stop
    
    
    1. 查看状态
     ./zkServer.sh status
     
    ZooKeeper JMX enabled by default
    Using config: /root/zookeeper/zookeeper-3.4.9/bin/../conf/zoo.cfg
    Mode: standalone
    
    
    1. 客户端连接
    # 2181 是zk默认端口
    ./zkCli.sh -server localhost:2181
    

    集群部署

    目前canal的集群部署仅支持HA形式,使用zookeeper来实现抢占式HA,一个active,多个standby。

    修改canal配置文件

    vi canal/conf/canal.properties
    
    # register ip to zookeeper
    canal.register.ip = 192.168.10.27
    # zk地址,如果多个zk用逗号隔开且不留空格,例如10.105.10.123:2181,10.105.10.124:2181,10.105.10.125:2181
    canal.zkServers = 192.168.10.27:2181
    
    canal.instance.global.spring.xml = classpath:spring/default-instance.xml
    
    

    部署从节点canal

    拷贝主节点的canal到另一台机器,修改instance配置

    vi /root/canal/conf/example/instance.properties
    
    # 设置slaveid,和master不同即可
    canal.instance.mysql.slaveId=1235
    

    修改canal配置

    vi /root/canal/conf/canal.properties
    # register ip to zookeeper
    canal.register.ip = 192.168.10.26
    

    其他配置项都跟主节点一致,然后两个节点canal启动

    查看canal在zk中的状态

    ./zkCli.sh -server localhost:2181
    
    [zk: localhost:2181(CONNECTED) 0] get /otter/canal/destinations/example/running
    {"active":true,"address":"192.168.10.26:11111"}
    cZxid = 0x86
    ctime = Fri Mar 04 01:14:55 EST 2022
    mZxid = 0x86
    mtime = Fri Mar 04 01:14:55 EST 2022
    pZxid = 0x86
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x17f53863740000a
    dataLength = 47
    numChildren = 0
    

    部署情况:

    • 部署了192.168.10.27:11111和192.168.10.26:11111
    • 当前192.168.10.26:11111节点是active,192.168.10.27:11111是standby
    • 如果去192.168.10.26关闭canal,可以看到192.168.10.27成为active
    [zk: localhost:2181(CONNECTED) 3] get /otter/canal/destinations/example/running
    {"active":true,"address":"192.168.10.27:11111"}
    cZxid = 0x9f
    ctime = Fri Mar 04 03:06:18 EST 2022
    mZxid = 0x9f
    mtime = Fri Mar 04 03:06:18 EST 2022
    pZxid = 0x9f
    cversion = 0
    dataVersion = 0
    aclVersion = 0
    ephemeralOwner = 0x17f53863740000b
    dataLength = 47
    numChildren = 0
    

    java客户端代码

    修改CanalConnector即可,连接地址改成zookeeper的ip:端口

    CanalConnector connector = CanalConnectors.newClusterConnector("192.168.10.27:2181","example", "canal", "canal");
    

    Canal Admin

    canal-admin设计上是为canal提供整体配置管理、节点运维等面向运维的功能,提供相对友好的WebUI操作界面,方便更多用户快速和安全的操作

    部署

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz
    
    mkdir /tmp/canal-admin
    
    tar zxvf canal.admin-$version.tar.gz  -C /tmp/canal-admin
    

    修改配置文件 conf/application.yml 设置mysql地址

    image.png

    在mysql执行conf/canal_manager.sql 初始化sql

    启动

    sh bin/startup.sh
    

    启动成功,可以通过 http://127.0.0.1:8089/ 访问,默认密码:admin/123456

    image.png

    canal+Kafka进行数据库同步

    为了高可用和更高的性能,我们会创建多个canal-client构成一个集群,来进行解析并同步到新的数据库。这里就出现了一个比较重要的问题,如何保证canal-client集群解析消费binlog的顺序性呢?

    我们使用的binlog是row模式。每一个写操作都会产生一条binlog日志。 举个简单的例子:插入了一条a记录,并且立马修改a记录。这样会有两个消息发送给canal-client,如果由于网络等原因,更新的消息早于插入的消息被处理了,还没有插入记录,更新操作的最后效果是失败的。

    canal可以和消息队列组合,支持kafka,rabbitmq,rocketmq多种选择,在消息队列这层来实现消息的顺序性。

    image.png

    安装kafka

    部署

    wget https://archive.apache.org/dist/kafka/1.1.1/kafka_2.11-1.1.1.tgz
    

    修改配置文件

    vim /usr/local/kafka/kafka_2.11-1.1.1/config/server.properties 修改参数

    zookeeper.connect=192.168.10.27:2181
    listeners=PLAINTEXT://:9092
    # zookeeper地址
    advertised.listeners=PLAINTEXT://192.168.10.27:9092 
    

    启动server

    start脚本
    
    # bin/kafka-server-start.sh  -daemon  config/server.properties &
    查看所有topic
    
    # bin/kafka-topics.sh --list --zookeeper 192.168.1.110:2181
    
    查看指定topic 下面的数据
    # bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.117:9092  --from-beginning --topic example_t
    Using the ConsoleConsumer with old consumer is deprecated and will be removed in a future major release. Consider using the new consumer by passing [bootstrap-server] instead of [zookeeper].
    

    修改canal配置

    修改配置文件canal.properties

    # tcp, kafka, rocketMQ, rabbitMQ
    canal.serverMode = kafka
    # kafka地址
    kafka.bootstrap.servers = 192.168.10.27:9092
    

    然后重启

    [root@bogon kafka_2.11-1.1.1]# sh bin/kafka-topics.sh --list --zookeeper 192.168.10.27:2181
    example
    

    java代码

    pom.xml

    <dependency>
        <groupId>org.springframework.kafka</groupId>
        <artifactId>spring-kafka</artifactId>
    </dependency>
    
    package com.wangyue.study.canal;
    
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.JSONObject;
    import org.apache.commons.lang3.StringUtils;
    import org.apache.kafka.clients.consumer.ConsumerConfig;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    import org.apache.kafka.clients.consumer.ConsumerRecords;
    import org.apache.kafka.clients.consumer.KafkaConsumer;
    import org.apache.kafka.common.serialization.StringDeserializer;
    
    import java.time.Duration;
    import java.util.Collections;
    import java.util.Properties;
    
    public class CanalKafkaConsumer {
    
    
        public static void main(String[] args) {
            /* 消费者三个属性必须指定(broker地址清单、key和value的反序列化器) */
            Properties properties = new Properties();
            properties.put("bootstrap.servers", "192.168.10.27:9092");
            properties.put("key.deserializer", StringDeserializer.class);
            properties.put("value.deserializer", StringDeserializer.class);
            //  群组并非完全必须. 重要知识:在同一Topic下,相同的groupID消费群组中,只有一个消费者可以拿到数据。
            properties.put(ConsumerConfig.GROUP_ID_CONFIG, "consumer-group1");
            KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(properties);
            try {
                //消费者订阅主题(可以多个)
                consumer.subscribe(Collections.singletonList("example"));
                while (true) {
                    ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(500));
                    for (ConsumerRecord<String, String> record : records) {
                        System.out.println(String.format("topic:%s,分区:%d,偏移量:%d," + "key:%s,value:%s", record.topic(), record.partition(),
                                record.offset(), record.key(), record.value()));
    
                        JSONObject valueJson = JSONObject.parseObject(record.value());
                        JSONArray data = valueJson.getJSONArray("data");
                        String type = valueJson.getString("type");
                        String table = valueJson.getString("table");
    
                        if (StringUtils.equalsIgnoreCase(type, "delete")) {
                            redisDelete(data, table);
                        } else if (StringUtils.equalsIgnoreCase(type, "insert")) {
                            redisInsert(data, table);
                        } else if (StringUtils.equalsIgnoreCase(type, "update")) {
                            redisUpdate(data, table);
                        }
    
                    }
                }
    
                //通过另外一个线程 consumer. wakeup()
            } finally {
                consumer.close();
            }
    
    
        }
    
    
        private static void redisInsert(JSONArray data, String tableName) {
            for (int i = 0; i < data.size(); i++) {
                JSONObject rowData = data.getJSONObject(i);
                String key = tableName + ":" + rowData.getString("id");
                RedisUtil.stringSet(key, rowData.toJSONString());
            }
        }
    
        private static void redisUpdate(JSONArray data, String tableName) {
            for (int i = 0; i < data.size(); i++) {
                JSONObject rowData = data.getJSONObject(i);
                String key = tableName + ":" + rowData.getString("id");
                RedisUtil.stringSet(key, rowData.toJSONString());
            }
        }
    
        private static void redisDelete(JSONArray data, String tableName) {
            for (int i = 0; i < data.size(); i++) {
                JSONObject rowData = data.getJSONObject(i);
                String key = tableName + ":" + rowData.getString("id");
                RedisUtil.delKey(key);
            }
        }
    }
    
    

    canal存到kafka里的数据内容范例:

    {
        "data": [{
            "id": "17",
            "doctorId": "15",
            "name": "来二楼3",
            "birthday": "2013-02-28",
            "sex": "0",
            "telephone": "15632554566",
            "province": "重庆市",
            "city": "重庆城区",
            "area": "万州区",
            "address": "AP库珀热热蓉蓉",
            "createTime": "2022-02-28 17:29:43",
            "updateTime": "2022-03-03 14:48:31",
            "createBy": null,
            "updateBy": "18960862122",
            "isRemove": "1"
        }],
        "database": "tcm",
        "es": 1646631848000,
        "id": 9,
        "isDdl": false,
        "mysqlType": {
            "id": "int",
            "doctorId": "int",
            "name": "varchar(200)",
            "birthday": "varchar(100)",
            "sex": "int",
            "telephone": "varchar(20)",
            "province": "varchar(50)",
            "city": "varchar(50)",
            "area": "varchar(50)",
            "address": "varchar(255)",
            "createTime": "varchar(50)",
            "updateTime": "varchar(50)",
            "createBy": "varchar(50)",
            "updateBy": "varchar(50)",
            "isRemove": "bit(1)"
        },
        "old": [{
            "name": "来二楼"
        }],
        "pkNames": ["id"],
        "sql": "",
        "sqlType": {
            "id": 4,
            "doctorId": 4,
            "name": 12,
            "birthday": 12,
            "sex": 4,
            "telephone": 12,
            "province": 12,
            "city": 12,
            "area": 12,
            "address": 12,
            "createTime": 12,
            "updateTime": 12,
            "createBy": 12,
            "updateBy": 12,
            "isRemove": -7
        },
        "table": "t_patient",
        "ts": 1646631848288,
        "type": "UPDATE"
    }
    
    

    redis 存取结果:

    image.png

    相关文章

      网友评论

          本文标题:canal学习

          本文链接:https://www.haomeiwen.com/subject/ishnrrtx.html