美文网首页
88,分布式数据同步一致性方案-mysql与缓存双写一致

88,分布式数据同步一致性方案-mysql与缓存双写一致

作者: 滔滔逐浪 | 来源:发表于2021-04-06 22:17 被阅读0次

    1 缓存一致性协议产生的背景

    2,多级缓存框架设计方案

    3,Mysql与redis缓存一致性原理
    mysql 与redis/es/mongdb都是相同的
    第一次查询:
    1,查询二级缓存redis,redis如果没有该缓存数据,则开始查询mysql;
    2, 在查询mysql,如果mysql中存在数据的情况下,就将该数据缓存到Redis中。
    第二次查询:
    1,查询到redis中如果存在该数据的情况下,则不会查询mysql
    能够减轻数据库访问压力;
    重点解决方案:
    1,对mysql做写的操作时候,都会同步到redis中
    如果insert db ,
    如果 update。将redis中该key删除。--懒加载
    如果update,直接修改redis --增量同步
    如果delete,将redis中该key删除。---增量同步。

    如果数据库发生变化,如何同步给redis

    1,直接清除redis缓存:(适合小项目)
    2,基于mq形式异步同步,(适合中小项目)
    3,基于cannel+mq异步同步(推荐)

    4,如果数据库发生变化,如何同步给redis

    5,cannal 同步数据一致性原理

    6,缓存双写一致性方案之旁路缓存策略
    7,缓存双写一致性方案之延迟双删策略。

    基于cannel同步的原理:
    1,cannel 服务器端伪装成一个mysql从节点,订阅mysql主节点的binlog二进制文件。
    2,cannel 服务器端收到binlog文件,就会转换成json的格式发送给Cannel客户端。
    3,cannel 客户端会将数据同步给nosql缓存 redis。
    配置Mysql服务器
    select @@datadir;


    image.png

    D:\Program Files\mysql-8.0.20-winx64

    1. 配置MySQL的 my.ini/my.cnf 开启允许基于binlog文件主从同步
      log-bin=mysql-bin #添加这一行就ok
      binlog-format=ROW #选择row模式
      server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复

    配置该文件后,重启mysql服务器即可

    show variables like 'log_bin';

    没有开启log_bin的值是OFF,开启之后是ON

    image.png
    1. 添加cannl的账号 或者直接使用自己的root账号

    手动创建cannl账号或者直接使用root账号

    drop user 'canal'@'%';

    create user 'canal'@'%' identified by 'canal'

    grant all privileges on . to 'canal'@'%' with grant option;

    FLUSH PRIVILEGES;

    select * from mysql.user where user='canal'; 查询账号权限
    一定要检查mysql user 权限为y


    image.png

    配置CanalService
    修改 \conf\example下的instance.properties 配置文件内容
    canal.instance.master.address=127.0.0.1:3306
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal

    启动startup.bat 查看 \logs\example example.log日志文件


    image.png

    创建CanalClient
    Maven依赖

    <dependencies>
    
        <dependency>
            <groupId>com.alibaba.otter</groupId>
            <artifactId>canal.client</artifactId>
            <version>1.1.0</version>
        </dependency>
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
    </dependencies>
    
    
    

    同步代码

    package com.mayikt.canal;
    
    import com.alibaba.fastjson.JSONObject;
    import com.alibaba.otter.canal.client.CanalConnector;
    import com.alibaba.otter.canal.client.CanalConnectors;
    import com.alibaba.otter.canal.protocol.CanalEntry.*;
    import com.alibaba.otter.canal.protocol.Message;
    
    import java.net.InetSocketAddress;
    import java.util.List;
    
    
    /**
     * CanalClient
     */
    public class CanalClient {
    
        public static void main(String args[]) {
            CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("127.0.0.1",
                    11111), "example", "", "");
            int batchSize = 100;
            try {
                connector.connect();
                // 配置同步db信息
                connector.subscribe("test.users");
                connector.rollback();
                while (true) {
                    // 获取指定数量的数据
                    Message message = connector.getWithoutAck(batchSize);
                    long batchId = message.getId();
                    int size = message.getEntries().size();
                    System.out.println("batchId = " + batchId);
                    System.out.println("size = " + size);
                    if (batchId == -1 || size == 0) {
                        try {
                            Thread.sleep(1000);
                        } catch (InterruptedException e) {
                            e.printStackTrace();
                        }
                    } else {
                        printEntry(message.getEntries());
                    }
                    // 提交确认
                    connector.ack(batchId);
                    // connector.rollback(batchId); // 处理失败, 回滚数据
                }
            } finally {
                connector.disconnect();
            }
        }
    
        private static void printEntry(List<Entry> entrys) {
            for (Entry entry : entrys) {
                if (entry.getEntryType() == EntryType.TRANSACTIONBEGIN || entry.getEntryType() == EntryType.TRANSACTIONEND) {
                    continue;
                }
                RowChange rowChage = null;
                try {
                    rowChage = RowChange.parseFrom(entry.getStoreValue());
                } catch (Exception e) {
                    throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
                            e);
                }
                EventType eventType = rowChage.getEventType();
                System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
                        entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
                        entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
                        eventType));
    
                for (RowData rowData : rowChage.getRowDatasList()) {
                    if (eventType == EventType.DELETE) {
                        redisDelete(rowData.getBeforeColumnsList());
                    } else if (eventType == EventType.INSERT) {
                        redisInsert(rowData.getAfterColumnsList());
                    } else {
                        System.out.println("-------> before");
                        printColumn(rowData.getBeforeColumnsList());
                        System.out.println("-------> after");
                        redisUpdate(rowData.getAfterColumnsList());
                    }
                }
            }
        }
    
        private static void printColumn(List<Column> columns) {
            for (Column column : columns) {
                System.out.println(column.getName() + " : " + column.getValue() + "    update=" + column.getUpdated());
            }
        }
    
        private static void redisInsert(List<Column> columns) {
            JSONObject json = new JSONObject();
            for (Column column : columns) {
                json.put(column.getName(), column.getValue());
            }
            if (columns.size() > 0) {
                RedisUtil.stringSet(columns.get(0).getValue(), json.toJSONString());
            }
        }
    
        private static void redisUpdate(List<Column> columns) {
            JSONObject json = new JSONObject();
            for (Column column : columns) {
                json.put(column.getName(), column.getValue());
            }
            if (columns.size() > 0) {
                RedisUtil.stringSet(columns.get(0).getValue()+columns.get(0).getValue(), json.toJSONString());
            }
        }
    
        private static void redisDelete(List<Column> columns) {
            JSONObject json = new JSONObject();
            for (Column column : columns) {
                json.put(column.getName(), column.getValue());
            }
            if (columns.size() > 0) {
                RedisUtil.delKey(columns.get(0).getValue());
            }
        }
    }
    
    
    
    package com.mayikt.canal;
    
    import redis.clients.jedis.Jedis;
    
    public class RedisUtil {
    
        private static Jedis jedis = null;
    
        public static synchronized Jedis getJedis() {
            if (jedis == null) {
                jedis = new Jedis("127.0.0.1", 6379);
            }
            return jedis;
        }
    
        public static boolean existKey(String key) {
            return getJedis().exists(key);
        }
    
        public static void delKey(String key) {
            getJedis().del(key);
        }
    
        public static String stringGet(String key) {
            return getJedis().get(key);
        }
    
        public static String stringSet(String key, String value) {
            return getJedis().set(key, value);
        }
    
        public static void hashSet(String key, String field, String value) {
            getJedis().hset(key, field, value);
        }
    }
    

    整合Kafka

    Kafka环境

    1. 先安装zookeeper
      zoo_sample.cfg 修改为 zoo.cfg
      修改 zoo.cfg 中的 dataDir=E:\zkkafka\zookeeper-3.4.14\data

    新增环境变量:
    ZOOKEEPER_HOME: E:\zkkafka\zookeeper-3.4.14 (zookeeper目录)
    Path: 在现有的值后面添加 ";%ZOOKEEPER_HOME%\bin;"

    运行zk zkServer.cmd


    image.png
    1. 安装kafka

    解压 kafka_2.13-2.4.0 改名为 kafka

    修改 server.properties中的配置

    log.dirs=E:\zkkafka\kafka\logs
    Cmd 进入到该目录:
    cd E:\zkkafka\kafka
    .\bin\windows\kafka-server-start.bat .\config\server.properties


    image.png

    Kafka启动成功
    Canal配置更改

    1.修改 example/instance.properties
    canal.mq.topic=maikt-topic
    2.修改 canal.properties

    tcp, kafka, RocketMQ

    canal.serverMode = kafka
    canal.mq.servers = 127.0.0.1:9092
    SpringBoot项目整合kafka
    Maven依赖
    同步代码

    package com.mayikt;
    
    import com.alibaba.fastjson.JSONArray;
    import com.alibaba.fastjson.JSONObject;
    import com.mayikt.utils.RedisUtils;
    import lombok.extern.slf4j.Slf4j;
    import org.apache.kafka.clients.consumer.ConsumerRecord;
    
    import org.springframework.beans.factory.annotation.Autowired;
    import org.springframework.boot.SpringApplication;
    import org.springframework.boot.autoconfigure.SpringBootApplication;
    import org.springframework.kafka.annotation.KafkaListener;
    import org.springframework.kafka.core.KafkaTemplate;
    import org.springframework.web.bind.annotation.RequestMapping;
    import org.springframework.web.bind.annotation.RestController;
    
    /**
     * @author 余胜军
     */
    @RestController
    @Slf4j
    @SpringBootApplication
    public class KafkaController {
    
        /**
         * 注入kafkaTemplate
         */
        @Autowired
        private KafkaTemplate<String, String> kafkaTemplate;
    
        @Autowired
        private RedisUtils redisUtils;
    
        /**
         * 发送消息的方法
         *
         * @param key  推送数据的key
         * @param data 推送数据的data
         */
        private void send(String key, String data) {
            // topic 名称 key data 消息数据
            kafkaTemplate.send("mayikt", key, data);
    
        }
        // test 主题 1 my_test 3
    
        @RequestMapping("/kafka")
        public String testKafka() {
            int iMax = 6;
            for (int i = 1; i < iMax; i++) {
                send("key" + i, "data" + i);
            }
            return "success";
        }
    
        public static void main(String[] args) {
            SpringApplication.run(KafkaController.class, args);
        }
    
        /**
         * 消费者使用日志打印消息
         */
    
        @KafkaListener(topics = "maikt-topic")
        public void receive(ConsumerRecord<?, ?> consumer) {
            String index = consumer.offset() + "" + consumer.value();
            log.info(">topic名称:{},,key:{},分区位置:{},, 下标{}<", consumer.topic(), consumer.key(), consumer.partition(), index);
            String json = (String) consumer.value();
            JSONObject jsonObject = JSONObject.parseObject(json);
            String sqlType = jsonObject.getString("type");
            JSONArray data = jsonObject.getJSONArray("data");
            JSONObject userObject = data.getJSONObject(0);
            String id = userObject.getString("id");
            String database = jsonObject.getString("database");
            String table = jsonObject.getString("table");
            String key = database + "_" + table + "_" + id;
            if ("UPDATE".equals(sqlType) || "INSERT".equals(sqlType)) {
                redisUtils.setString(key, userObject.toJSONString());
                return;
            }
            if ("DELETE".equals(sqlType)) {
                redisUtils.deleteKey(key);
            }
    
    
        }
    
    }
    
    
    
    

    YML

    # kafka
    spring:
      kafka:
        # kafka服务器地址(可以多个)
        bootstrap-servers: 127.0.0.1:9092
        consumer:
          # 指定一个默认的组名
          group-id: kafka2
          # earliest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,从头开始消费
          # latest:当各分区下有已提交的offset时,从提交的offset开始消费;无提交的offset时,消费新产生的该分区下的数据
          # none:topic各分区都存在已提交的offset时,从offset后开始消费;只要有一个分区不存在已提交的offset,则抛出异常
          auto-offset-reset: earliest
          # key/value的反序列化
          key-deserializer: org.apache.kafka.common.serialization.StringDeserializer
          value-deserializer: org.apache.kafka.common.serialization.StringDeserializer
        producer:
          # key/value的序列化
          key-serializer: org.apache.kafka.common.serialization.StringSerializer
          value-serializer: org.apache.kafka.common.serialization.StringSerializer
          # 批量抓取
          batch-size: 65536
          # 缓存容量
          buffer-memory: 524288
      redis:
        host: 127.0.0.1
    #    password:
        port: 6379
        database: 0
    
    
    
    ···
    
    pom
    

    <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
    xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>
    <groupId>com.itmayiedu</groupId>
    <artifactId>springboot2.0_kafka</artifactId>
    <version>0.0.1-SNAPSHOT</version>
    <parent>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-parent</artifactId>
    <version>2.0.1.RELEASE</version>
    </parent>
    <dependencies>

    <dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
    </dependency>

    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-web</artifactId>
    </dependency>
    <dependency>
    <groupId>com.alibaba</groupId>
    <artifactId>fastjson</artifactId>
    <version>1.2.62</version>
    </dependency>
    <dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-data-redis</artifactId>
    </dependency>
    <dependency>
    <groupId>org.projectlombok</groupId>
    <artifactId>lombok</artifactId>
    <version>1.18.12</version>
    </dependency>
    </dependencies>

    </project>

    相关文章

      网友评论

          本文标题:88,分布式数据同步一致性方案-mysql与缓存双写一致

          本文链接:https://www.haomeiwen.com/subject/abgekltx.html