mysql 与Redis 数据一致性问题 直接将Redis清空
中间件 canal框架 基于 docker环境构建
image.png
canal 框架 原理:
<u>https://gitee.com/mirrors/canal?utm_source=alading&utm_campaign=repo</u>
canal 框架原理
1,canal伪装成mysql从节点 订阅mysql 主节点的binlog文件
2,当我们的mysql 主节点 binlog 文件发生了变化,则将binlog 文件发送给canal服务器端
3,canal 服务器端将该binlog 文件二进制转换成json格式给canal客户端
4,canal客户端在将改数据同步到Redis/ES
基于Binlog 开启方式
1.mysql 开启binlog 文件配置
windows 配置
查询 my.ini配置文件位置
C:\ProgramData\MySQL\MySQL Server 5.7
2, linux mysql
docker cp mysql_slave:/etc/mysql/my.cnf /usr/local/mysql/slave/my.cnf
cd /usr/local/mysql/slave
Vi my.conf
log-bin=mysql-bin
server-id=2
安装canal
docker pull canal/canal-server:latest
docker run -p 11111:11111 --name canal -id canal/canal-server
进入容器
docker exec -it canal /bin/bash
编辑配置文件
vi /home/admin/canal-server/conf/canal.properties
# 修改canal.id 不能与之前的mysql配置id相同
image.png
vi /home/admin/canal-server/conf/example/instance.properties
image.png
重启canal
docker restart canal
# 可以设置开机启动 适用于所有容器
docker update --restart=always canal
Docker-compose 构建canal
version: '3'
services:
canal-server:
image: canal/canal-server:v1.1.4
container_name: canal-server
ports:
- 11111:11111
environment:
- canal.instance.mysql.slaveId=3
- canal.auto.scan=false
- canal.destinations=mayikt-commodity
- canal.instance.master.address=192.168.75.144:3306
- canal.instance.dbUsername=canal
- canal.instance.dbPassword=canal
canal.instance.mysql.slaveId:slaveId不能与mysql的serverId一样
canal.instance.master.address:mysql地址
canal.instance.dbUsername:mysql账号
canal.instance.dbPassword:mysql密码
-f 指定使用的 Compose 模板文件,默认为 docker-compose.yml,可以多次指定。
docker-compose -f docker-compose.yml up -d
package com.mayikt.canal.demo;
import com.alibaba.otter.canal.client.CanalConnector;
import com.alibaba.otter.canal.client.CanalConnectors;
import com.alibaba.otter.canal.protocol.CanalEntry;
import com.alibaba.otter.canal.protocol.Message;
import java.net.InetSocketAddress;
import java.util.List;
public class ClientSample {
/**
* 不推荐使用tcp模式
* canaltcp 模式
* 高并发的情况下 建议整合canal+kafka (效率是非常)
* 需要注意 消费者消费顺序一致性的问题
*
* @param args
*/
public static void main(String args[]) {
// 创建链接
CanalConnector connector = CanalConnectors.newSingleConnector(new InetSocketAddress("192.168.75.144",
11111), "mayikt_canal_test", "canal", "canal");
int batchSize = 1000;
int emptyCount = 0;
try {
connector.connect();
connector.subscribe(".*\\..*"); // 监听所有的表结构
connector.rollback();
int totalEmptyCount = 120;
while (emptyCount < totalEmptyCount) {
Message message = connector.getWithoutAck(batchSize); // 获取指定数量的数据
long batchId = message.getId();
int size = message.getEntries().size();
if (batchId == -1 || size == 0) {
// emptyCount++;
// System.out.println("empty count : " + emptyCount);
// try {
// Thread.sleep(1000);
// } catch (InterruptedException e) {
// }
} else {
emptyCount = 0;
// System.out.printf("message[batchId=%s,size=%s] \n", batchId, size);
printEntry(message.getEntries());
}
connector.ack(batchId); // 提交确认
// connector.rollback(batchId); // 处理失败, 回滚数据
}
System.out.println("empty too many times, exit");
} catch (Exception e) {
System.out.println(e.getMessage());
} finally {
connector.disconnect();
}
}
private static void printEntry(List<CanalEntry.Entry> entrys) {
for (CanalEntry.Entry entry : entrys) {
if (entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONBEGIN || entry.getEntryType() == CanalEntry.EntryType.TRANSACTIONEND) {
continue;
}
CanalEntry.RowChange rowChage = null;
try {
rowChage = CanalEntry.RowChange.parseFrom(entry.getStoreValue());
} catch (Exception e) {
throw new RuntimeException("ERROR ## parser of eromanga-event has an error , data:" + entry.toString(),
e);
}
CanalEntry.EventType eventType = rowChage.getEventType();
System.out.println(String.format("================> binlog[%s:%s] , name[%s,%s] , eventType : %s",
entry.getHeader().getLogfileName(), entry.getHeader().getLogfileOffset(),
entry.getHeader().getSchemaName(), entry.getHeader().getTableName(),
eventType));
for (CanalEntry.RowData rowData : rowChage.getRowDatasList()) {
// 延迟的概率非常大 kafka 对消费者集群 Redis 需要注意 消费 消费顺序一致性问题
switch (eventType) {
case DELETE:
delRedisUser(rowData);
break;
case INSERT:
default:
updateRedisUser(rowData);
}
// if (eventType == CanalEntry.EventType.DELETE) {
//
// } else if (eventType == CanalEntry.EventType.INSERT) {
// printColumn(rowData.getAfterColumnsList());
// // 同步 Redis 调用Redis api 插入 一条key
// } else {
// // 同步 Redis 调用Redis api update
// System.out.println("-------> before");
// printColumn(rowData.getBeforeColumnsList());
// System.out.println("-------> after");
// printColumn(rowData.getAfterColumnsList());
// }
}
}
}
private static void printColumn(List<CanalEntry.Column> columns) {
for (CanalEntry.Column column : columns) {
System.out.println(column.getName() + " : " + column.getValue() + " update=" + column.getUpdated());
}
}
/**
* 1.在Redis key 都是为 mysql表中的 主键id;
* 2.Redis key mysql表中 该主键id 对应的一行数据
*/
private static void updateRedisUser(CanalEntry.RowData rowData) {
// id列、name 列
CanalEntry.Column idColumns = rowData.getAfterColumns(0);
String idValue = idColumns.getValue();
System.out.println(idValue);
// name列
CanalEntry.Column nameColumns = rowData.getAfterColumns(1);
String nameValue = nameColumns.getValue();
System.out.println(nameValue);
JedisUtils.getJedis().set(idValue, nameValue);// 同步es
//canal 整合kafka-----kafka消费则拿到数据json 直接同步到Redis----整合kafka api封装
}
private static void delRedisUser(CanalEntry.RowData rowData) {
// id列、name 列
CanalEntry.Column idColumns = rowData.getBeforeColumns(0);
rowData.getBeforeColumns(0);// 获取删除之前的数据---该行数据是存在的
rowData.getAfterColumns(0);// 获取 index 越界
String idValue = idColumns.getValue();
JedisUtils.getJedis().del(idValue); // 同步es
// es mongdb
}
}
package com.mayikt.canal.demo;
import redis.clients.jedis.Jedis;
public class JedisUtils {
private static Jedis jedis = new Jedis("127.0.0.1", 6379);
public static Jedis getJedis() {
return jedis;
}
}
网友评论