Canal简介
Canal是阿里开源的一款基于Mysql数据库binlog的增量订阅和消费组件,通过它可以订阅数据库的binlog日志,然后进行一些数据消费,如数据镜像、数据异构、数据索引、缓存更新等。相对于消息队列,通过这种机制可以实现数据的有序化和一致性。
github地址:https://github.com/alibaba/canal
完整wiki地址:https://github.com/alibaba/canal/wiki
Canal工作原理
原理相对比较简单:
- 1、canal模拟mysql slave与mysql master的交互协议,伪装自己是一个mysql slave,向mysql master发送dump协议。
- 2、mysql master收到mysql slave(canal)发送的dump请求,开始推送binlog增量日志给slave(也就是canal)。
- 3、mysql slave(canal伪装的)收到binlog增量日志后,就可以对这部分日志进行解析,获取主库的结构及数据变更。
Mysql主从同步原理
canal工作原理其实也是基于mysql主从同步原理的,所以理解mysql主从同步原理是第一步
同步原理:
- 1、Master主库,启动Binlog机制,将变更数据写入Binlog文件。( binary log,其中记录叫做二进制日志事件 log events,可以通过 show binlog events 进行查看)
- 2、Slave(I/O thread),从Master主库拉取Binlog数据,将它拷贝到Slave的中继日志(relay log)中。
- 3、Slave(SQL thread),回放relay log,更新从库数据以此来达到数据一致。
启用Binlog注意以下几点:
- 1、Master主库一般会有多台Slave订阅,且Master主库要支持业务系统实时变更操作,服务器资源会有瓶颈。
- 2、需要同步的数据表一定要有主键。
Mysql的binlog
- 1、它记录了所有的DDL和DML(除了数据查询语句)语句,以事件形式记录,还包含语句所执行的消耗的时间。主要用来备份和数据同步。
- 2、binlog 有三种:STATEMENT、ROW、MIXED。
- STATEMENT:记录的是执行的sql语句。
- ROW:记录的是真实的行数据记录。
- MIXED:记录的是1+2,优先按照1的模式记录。
什么是中继日志?
- 从服务器I/O线程将主服务器的二进制日志读取过来记录到从服务器本地文件,然后从服务器SQL线程会读取relay-log日志的内容并应用到从服务器,从而使从服务器和主服务器的数据保持一致。
Canal架构
说明:
- 1、server代表一个canal运行实例,对应于一个jvm。
- 2、instance对应于一个数据队列。
instance模块:
- 1、eventParser (数据源接入,模拟slave协议和master进行交互,协议解析)
- 2、eventSink (Parser和Store链接器,进行数据过滤,加工,分发的工作)
- 3、eventStore (数据存储)
- 4、metaManager (增量订阅&消费信息管理器)
Canal-HA机制
canal是支持HA的,其实现机制也是依赖zookeeper来实现的,用到的特性有watcher和EPHEMERAL节点(和session生命周期绑定),与HDFS的HA类似。
canal的ha分为两部分,canal server和canal client分别有对应的ha实现:
- canal server:为了减少对mysql dump的请求,不同server上的instance(不同server上的相同instance)要求同一时间只能有一个处于running,其他的处于standby状态(standby是instance的状态)。
- canal client:为了保证有序性,一份instance同一时间只能由一个canal client进行get/ack/rollback操作,否则客户端接收无法保证有序。
server ha的架构图如下:
大致步骤:
- 1、canal server要启动某个canal instance时都先向zookeeper_进行一次尝试启动判断_(实现:创建EPHEMERAL节点,谁创建成功就允许谁启动)
- 2、创建zookeeper节点成功后,对应的canal server就启动对应的canal instance,没有创建成功的canal instance就会处于standby状态。
- 3、一旦zookeeper发现canal server A创建的instance节点消失后,立即通知其他的canal server再次进行步骤1的操作,重新选出一个canal server启动instance。
- 4、canal client每次进行connect时,会首先向zookeeper询问当前是谁启动了canal instance,然后和其建立链接,一旦链接不可用,会重新尝试connect。
Canal Client的方式和canal server方式类似,也是利用zookeeper的抢占EPHEMERAL节点的方式进行控制
Canal应用场景
1、同步缓存redis/全文搜索ES
canal一个常见应用场景是同步缓存/全文搜索,当数据库变更后通过binlog进行缓存/ES的增量更新。当缓存/ES更新出现问题时,应该回退binlog到过去某个位置进行重新同步,并提供全量刷新缓存/ES的方法,如下图所示:
2、下发任务
另一种常见应用场景是下发任务,当数据变更时需要通知其他依赖系统。其原理是任务系统监听数据库变更,然后将变更的数据写入MQ/kafka进行任务下发,比如商品数据变更后需要通知商品详情页、列表页、搜索页等相关系统。这种方式可以保证数据下发的精确性,通过MQ发送消息通知变更缓存是无法做到这一点的,而且业务系统中不会散落着各种下发MQ的代码,从而实现了下发归集,如下图所示。
3、数据异构
在大型网站架构中,DB都会采用分库分表来解决容量和性能问题,但分库分表之后带来的新问题。比如不同维度的查询或者聚合查询,此时就会非常棘手。一般我们会通过数据异构机制来解决此问题。
所谓的数据异构,那就是将需要join查询的多表按照某一个维度又聚合在一个DB中。让你去查询。canal就是实现数据异构的手段之一。
canal的HA集群模式部署
1、canal下载地址
-
下载安装包:
https://github.com/alibaba/canal/releases/download/canal-1.1.5/canal.deployer-1.1.5.tar.gz -
解压到安装目录
tar -zxf canal.deployer-1.1.5.tar.gz
解压出来的目录结构是这样的:
2、mysql开启binlog
MySQL,需要先开启 Binlog 写入功能,配置 binlog-format 为 ROW 模式,my.cnf 中配置如下:
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
3、mysql授权账号权限
授权 canal 链接 MySQL 账号具有作为 MySQL slave 的权限(repication权限),如果已有账户可直接 grant:
CREATE USER canal IDENTIFIED BY 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
-- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
FLUSH PRIVILEGES;
备注
可以通过在 mysql 终端中执行以下命令判断配置是否生效:
show variables like 'log_bin';
show variables like 'binlog_format';
4、修改配置
canal服务的部署其实特别简单,解压之后只需修改canal.properties、instance.properties这两个配置两个文件即可。
修改canal.properties中配置
//指定注册的zk集群地址
canal.zkServers = 192.168.135.27:2181,192.168.135.28:2181,192.168.135.29:2181
//HA模式必须使用该xml,需要将相关数据写入zookeeper,保证数据集群共享
canal.instance.global.spring.xml = classpath:spring/default-instance.xml
// 这个demo就是conf目录里的实例,如果要建别的实例'test'就建个test目录,
// 把example里面的instance.properties文件拷贝到test的实例目录下就好了,
// 然后在这里的配置就是canal.destinations = demo,test
canal.destinations = demo
修改配置文件instance.properties
## mysql serverId , v1.0.26+ will autoGen
# canal伪装的mysql slave的编号,不能与mysql数据库和其他的slave重复
canal.instance.mysql.slaveId=1003
# 按需修改成自己的数据库信息
# position info
canal.instance.master.address=10.200.*.109:3306
# username/password 数据库的用户名和密码
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
canal.instance.defaultDatabaseName = test
5、详细步骤
canal 的HA集群模式部署详细步骤,可以访问:https://blog.csdn.net/XDSXHDYY/article/details/97825508
然后cd到bin目录 启动和停止canal-server
启动
sh startup.sh
停止
sh stop.sh
验证启动状态,查看log文件
vim canal/log/canal/canal.log
2021-07-11 16:17:38.224 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## start the canal server.
2021-07-11 16:17:38.273 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[172.19.0.1:11111]
2021-07-11 16:17:38.569 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## the canal server is running now ......
上述日志信息显示启动canal成功
SpringBoot整合Canal
两种方式,官方提供的demo和springboot starter
1、官方提供的
<dependency>
<groupId>com.alibaba.otter</groupId>
<artifactId>canal.client</artifactId>
<version>1.1.5</version>
</dependency>
具体参考:https://blog.csdn.net/leilei1366615/article/details/108819651
2、springboot starter方式
- 引入Maven依赖
<dependency>
<groupId>top.javatool</groupId>
<artifactId>canal-spring-boot-starter</artifactId>
<version>1.2.1-RELEASE</version>
</dependency>
- 添加配置
canal:
server: 127.0.0.1:11111
destination: demo
- 实现EntryHandler接口
@Component
public class ADHandler implements EntryHandler<SeckillGoods> {
@Override
public void insert(SeckillGoods seckillGoods) {
// CanalModel可以得到当前这次的库名和表名
CanalModel canal = CanalContext.getModel();
//业务操作
//新增缓存操作等
}
@Override
public void update(SeckillGoods before, SeckillGoods after) {
//业务操作
//更新缓存操作等
}
@Override
public void delete(SeckillGoods seckillGoods) {
//业务操作
//删除缓存操作等
}
}
参考:
https://www.cnblogs.com/caoweixiong/p/11824423.html
https://www.cnblogs.com/huangxincheng/p/7456397.html
https://segmentfault.com/a/1190000023297973
网友评论