1. canal
阿里巴巴的产品,主要用途是基于 MySQL 数据库增量日志解析,提供增量数据订阅和消费。当前的 canal 支持源端 MySQL 版本包括 5.1.x , 5.5.x , 5.6.x , 5.7.x , 8.0.x。
安装包下载地址,以及官方文档地址:官方github主页
2. canal 工作原理
- 模拟 MySQL slave 的交互协议,伪装自己为 MySQL slave ,向 MySQL master 发送dump 协议
- MySQL master 收到 dump 请求,开始推送 binary log 给 slave (即 canal )
- canal 解析 binary log 对象(原始为 byte 流)
3. 修改mysql binlog配置
修改mysql my.conf文件
log-bin=mysql-bin
binlog_format=row
server-id=1
参考:MaxWell+kafka解析mysql binlog
4. 在mysql中,给canal添加权限
mysql> create user canal identified by ‘canal’;
mysql> grant select,replication slave,replication client on *.* to ‘canal’@’%’;
mysql> flush privileges;
5. 修改canal的配置文件
# conf/example/instance.properties
canal.instance.master.address=127.0.0.1:3306 #mysql的地址端口
canal.instance.dbUsername=canal #mysql中赋予权限的用户
canal.instance.dbPassword=canal #mysql中赋予权限的密码
canal.instance.connectionCharset = UTF-8
canal.instance.filter.regex=.*\\..* #过滤模式,默认不过滤
canal.mq.topic=example
# conf/canal.properties
canal.serverMode = kafka
canal.mq.servers = node01:9092,node02:9092.node03:9093
canal.mq.producerGroup = test
6. 测试
- 启动canal
sh bin/startup.sh
- 启动kafka消费者
代码参见:MaxWell+kafka解析mysql binlog 将其中 topic 改为 ‘example’ 后运行即可。 - 在mysql中新增数据
- 结果:
# idea 控制台打印如下:
消费的数据为:{"data":[{"id":"9","name":"fak","money":"30.0"}],"database":"db01","es":1575958431000,"id":4,"isDdl":false,"mysqlType":{"id":"int(11)","name":"varchar(20)","money":"double"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":4,"name":12,"money":8},"table":"account2","ts":1575958431511,"type":"INSERT"}
网友评论