美文网首页
97 基于Binlog实现MySQL与Redis数据一致性问题

97 基于Binlog实现MySQL与Redis数据一致性问题

作者: 滔滔逐浪 | 来源:发表于2021-09-27 08:35 被阅读0次

    mysql 与Redis 数据一致性问题 直接将Redis清空
    中间件 canal框架 基于 docker环境构建


    image.png

    canal 框架 原理:

    <u>https://gitee.com/mirrors/canal?utm_source=alading&utm_campaign=repo</u>

    canal 框架原理
    1,canal伪装成mysql从节点 订阅mysql 主节点的binlog文件
    2,当我们的mysql 主节点 binlog 文件发生了变化,则将binlog 文件发送给canal服务器端
    3,canal 服务器端将该binlog 文件二进制转换成json格式给canal客户端
    4,canal客户端在将改数据同步到Redis/ES
    基于Binlog 开启方式
    1.mysql 开启binlog 文件配置
    windows 配置
    查询 my.ini配置文件位置
    C:\ProgramData\MySQL\MySQL Server 5.7
    2, linux mysql

    docker cp mysql_slave:/etc/mysql/my.cnf /usr/local/mysql/slave/my.cnf
    cd  /usr/local/mysql/slave
    Vi my.conf
    log-bin=mysql-bin
    server-id=2
    
    

    安装canal

    
    docker pull canal/canal-server:latest
    docker run -p 11111:11111 --name canal -id canal/canal-server
    

    进入容器

    docker exec -it canal /bin/bash
    
    

    编辑配置文件

    vi /home/admin/canal-server/conf/canal.properties
    # 修改canal.id  不能与之前的mysql配置id相同
    
    
    image.png
    vi /home/admin/canal-server/conf/example/instance.properties
    
    image.png

    重启canal

    docker restart canal
    # 可以设置开机启动 适用于所有容器
    docker update --restart=always canal
    
    

    Docker-compose 构建canal

    version: '3'
    services: 
      canal-server: 
        image: canal/canal-server:v1.1.4
        container_name: canal-server
        ports: 
          - 11111:11111
        environment: 
          - canal.instance.mysql.slaveId=3
          - canal.auto.scan=false
          - canal.destinations=mayikt-commodity
          - canal.instance.master.address=192.168.75.144:3306
          - canal.instance.dbUsername=canal
          - canal.instance.dbPassword=canal
    
    
    

    canal.instance.mysql.slaveId:slaveId不能与mysql的serverId一样
    canal.instance.master.address:mysql地址
    canal.instance.dbUsername:mysql账号
    canal.instance.dbPassword:mysql密码

    -f 指定使用的 Compose 模板文件,默认为 docker-compose.yml,可以多次指定。

    docker-compose -f docker-compose.yml up -d 
    
    
    package com.mayikt.canal.demo;
    
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry;
    import com.alibaba.otter.canal.protocol.Message;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    
    
    public class ClientSample {
    
    
        /**
         * 不推荐使用tcp模式
         * canaltcp 模式
         * 高并发的情况下 建议整合canal+kafka (效率是非常)
         * 需要注意 消费者消费顺序一致性的问题
         *
         * @param args
         */
        public static void main(String args[]) {
            // 创建链接
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.75.144",
                    11111), "mayikt_canal_test", "canal", "canal");
            int batchSize = 1000;
            int emptyCount = 0;
            try {
                connector.connect();
                connector.subscribe(".*\\..*"); // 监听所有的表结构
                connector.rollback();
                int totalEmptyCount = 120;
                while (emptyCount < totalEmptyCount) {
                    Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    if (batchId == -1 || size == 0) {
    //                    emptyCount++;
    //                    System.out.println("empty count : " + emptyCount);
    //                    try {
    //                        Thread.sleep(1000);
    //                    } catch (InterruptedException e) {
    //                    }
                    } else {
                        emptyCount = 0;
                        // System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
                        printEntry(message.getEntries());
                    }
    
                    connector.ack(batchId); // 提交确认
                    // connector.rollback(batchId); // 处理失败, 回滚数据
                }
    
                System.out.println("empty too many times, exit");
            } catch (Exception e) {
                System.out.println(e.getMessage());
            } finally {
                connector.disconnect();
            }
        }
    
        private static void printEntry(List<CanalEntry.Entry> entrys) {
            for (CanalEntry.Entry entry : entrys) {
                if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
                    continue;
                }
    
                CanalEntry.RowChange rowChage = null;
                try {
                    rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                            e);
                }
    
                CanalEntry.EventType eventType = rowChage.getEventType();
                System.out.println(String.format("================&gt; binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                        eventType));
    
                for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
                    // 延迟的概率非常大  kafka  对消费者集群 Redis 需要注意 消费 消费顺序一致性问题
                    switch (eventType) {
                        case DELETE:
                            delRedisUser(rowData);
                            break;
                        case INSERT:
                        default:
                            updateRedisUser(rowData);
                    }
    //                if (eventType == CanalEntry.EventType.DELETE) {
    //
    //                } else if (eventType == CanalEntry.EventType.INSERT) {
    //                    printColumn(rowData.getAfterColumnsList());
    //                    // 同步 Redis  调用Redis api 插入 一条key
    //                } else {
    //                    // 同步 Redis  调用Redis api update
    //                    System.out.println("-------&gt; before");
    //                    printColumn(rowData.getBeforeColumnsList());
    //                    System.out.println("-------&gt; after");
    //                    printColumn(rowData.getAfterColumnsList());
    //                }
                }
            }
        }
    
        private static void printColumn(List<CanalEntry.Column> columns) {
            for (CanalEntry.Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            }
        }
    
        /**
         * 1.在Redis key  都是为 mysql表中的 主键id;
         * 2.Redis key  mysql表中  该主键id  对应的一行数据
         */
        private static void updateRedisUser(CanalEntry.RowData rowData) {
            // id列、name 列
            CanalEntry.Column idColumns = rowData.getAfterColumns(0);
            String idValue = idColumns.getValue();
            System.out.println(idValue);
            // name列
            CanalEntry.Column nameColumns = rowData.getAfterColumns(1);
            String nameValue = nameColumns.getValue();
            System.out.println(nameValue);
            JedisUtils.getJedis().set(idValue, nameValue);// 同步es
            //canal 整合kafka-----kafka消费则拿到数据json 直接同步到Redis----整合kafka api封装
        }
    
        private static void delRedisUser(CanalEntry.RowData rowData) {
            // id列、name 列
            CanalEntry.Column idColumns = rowData.getBeforeColumns(0);
            rowData.getBeforeColumns(0);// 获取删除之前的数据---该行数据是存在的
            rowData.getAfterColumns(0);// 获取 index  越界
            String idValue = idColumns.getValue();
            JedisUtils.getJedis().del(idValue); // 同步es
            // es mongdb
        }
    }
    
    
    package com.mayikt.canal.demo;
    
    import redis.clients.jedis.Jedis;
    
    public class JedisUtils {
        private static Jedis jedis = new Jedis("127.0.0.1", 6379);
    
        public static  Jedis getJedis() {
            return jedis;
        }
    }
    

    相关文章

      网友评论

          本文标题:97 基于Binlog实现MySQL与Redis数据一致性问题

          本文链接:https://www.haomeiwen.com/subject/ibmzgltx.html