此文章主要补充Canal整合RabbitMQ时填过的坑。
- Canal的简介和用法,请查看官网介绍:https://github.com/alibaba/canal/wiki/%E7%AE%80%E4%BB%8B;
- RabbitMQ的用法请查看官网:http://www.rabbitmq.com/
1、完整数据处理流程
用户中心数据处理流程这里核心处理点,在于Canal的部署和RabbitMQ的整合。
Canal主要模拟MySQL的Slave,实现主从复制,拉取Mysql的binlog进行解析转换成相应的数据,获取到数据后,将数据推送到RabbitMQ(默认支持的是RocketMQ和Kafka,新版加入对RabbitMQ的支持)。
Canal整合RabbitMQ主要采用的是 路由模式。
2、Canal 配置及部署
2.1、Canal Admin 安装及配置
新版本Canal已经支持在线管理,我们可以先安装一个canal-admin。参考:Canal Admin QuickStart
canal-admin的核心模型主要有:
- instance,对应canal-server里的instance,一个最小的订阅mysql的队列
- server,对应canal-server,一个server里可以包含多个instance
- 集群,对应一组canal-server,组合在一起面向高可用HA的运维
简单解释:
- instance因为是最原始的业务订阅诉求,它会和 server/集群 这两个面向资源服务属性的进行关联,比如instance A绑定到server A上或者集群 A上,
- 有了任务和资源的绑定关系后,对应的资源服务就会接收到这个任务配置,在对应的资源上动态加载instance,并提供服务。动态加载的过程,有点类似于之前的autoScan机制,只不过基于canal-admin之后可就以变为远程的web操作,而不需要在机器上运维配置文件
- 将server抽象成资源之后,原本canal-server运行所需要的canal.properties/instance.properties配置文件就需要在web ui上进行统一运维,每个server只需要以最基本的启动配置 (比如知道一下canal-admin的manager地址,以及访问配置的账号、密码即可)
集群模式配置
-
先配置一个zk地址和集群名称。
集群模式 -
配置canal.properties
集群模式主配置
2.1. 选择主配置
2.2. 设置canal.properties,可以先载入模板,然后进行修改。
canal.properties
canal.properties核心配置:
...
# register ip to zookeeper
canal.register.ip = 10.8.158.4
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
canal.user = canal
canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458
# canal admin config
canal.admin.manager = 10.8.158.4:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
canal.zkServers = 10.8.158.4:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = rabbitmq
...
# table meta tsdb info
canal.instance.tsdb.enable = false
...
# 一下几项均为 1.1.5 新版本新增支持 rabbitmq 的配置
rabbitmq.host = 10.224.45.9:25674
rabbitmq.virtual.host = /
# 指定 rabbitmq 上的 exchange 名称, "新建 `Exchange`" 步骤新建的名称
rabbitmq.exchange = usercenter
# 连接 rabbitmq 的用户名
rabbitmq.username = guest
# 连接 rabbitmq 的密码
rabbitmq.password = guest
...
3、 Canal Server配置及部署
canal-server接入canal-admin,参考:Canal Admin ServerGuide
3.1. 核心配置
root@dreamson-QiTianM425-N000:/tmp/canal-1.1.5/conf# cat canal_local.properties
# register ip
canal.register.ip = 10.8.158.4
# canal admin config
canal.admin.manager = 10.8.158.4:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = usercenter
3.2. 启动canal-server,注册canal-admin
root@dreamson-QiTianM425-N000:/tmp/canal-1.1.5# bash bin/startup.sh local
注册后在admin中的展现。
server注册admin
4.Instance实例配置
4.1. 新增instance实例
Instance实例启动后,即可监听mysql的binlog,并将数据推到RabbitMQ。推送的规则参考:Canal Kafka/RocketMQ QuickStart
RabbitMQ的配置的topic,事实上是设置到routeKey上。
5. 最后我们调试一下
MQ监控处理
@RabbitListener(bindings = {
@QueueBinding(
value = @Queue(value = "usercenter", durable = "true"),
exchange = @Exchange(value = "usercenter", type = ExchangeTypes.TOPIC),
key = "usercenter"
)
}, concurrency = "10")
public void test(String message) {
log.info("test接收到消息。message:{}", message);
}
修改表数据
修改数据库数据MQ监控输入日志如下
2021-02-04 11:34:53.037 INFO [???-???-auth] com.???.???.???.auth.???.comsumer.AuthComsumer :: - test接收到消息。message:{"data":[{"id":"163","name":"0.18278394004589313","age":"0"}],"database":"user_center","es":1612409677000,"id":15,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(200)","age":"int(4)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12,"age":4},"table":"test","ts":1612409677905,"type":"INSERT"}
2021-02-04 11:34:53.050 INFO [???-???-auth] com.???.???.???.auth.???.comsumer.AuthComsumer :: - test接收到消息。message:{"data":[{"id":"163","name":"0.461750433278969","age":"0"}],"database":"user_center","es":1612409677000,"id":15,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(200)","age":"int(4)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12,"age":4},"table":"test_1","ts":1612409677905,"type":"INSERT"}
2021-02-04 11:34:53.051 INFO [???-???-auth] com.???.???.???.auth.???.comsumer.AuthComsumer :: - test接收到消息。message:{"data":[{"id":"163","name":"0.12820304849017694","age":"0"}],"database":"user_center","es":1612409677000,"id":15,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(200)","age":"int(4)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12,"age":4},"table":"test_2","ts":1612409677905,"type":"INSERT"}
网友评论