美文网首页
Canal+RabbitMQ数据异构填坑指南

Canal+RabbitMQ数据异构填坑指南

作者: DreamsonMa | 来源:发表于2021-02-04 14:14 被阅读0次

此文章主要补充Canal整合RabbitMQ时填过的坑。

1、完整数据处理流程

用户中心数据处理流程

这里核心处理点,在于Canal的部署和RabbitMQ的整合。

Canal主要模拟MySQL的Slave,实现主从复制,拉取Mysql的binlog进行解析转换成相应的数据,获取到数据后,将数据推送到RabbitMQ(默认支持的是RocketMQ和Kafka,新版加入对RabbitMQ的支持)。

Canal整合RabbitMQ主要采用的是 路由模式

2、Canal 配置及部署

2.1、Canal Admin 安装及配置

新版本Canal已经支持在线管理,我们可以先安装一个canal-admin。参考:Canal Admin QuickStart

canal-admin的核心模型主要有:

  1. instance,对应canal-server里的instance,一个最小的订阅mysql的队列
  2. server,对应canal-server,一个server里可以包含多个instance
  3. 集群,对应一组canal-server,组合在一起面向高可用HA的运维

简单解释:

  1. instance因为是最原始的业务订阅诉求,它会和 server/集群 这两个面向资源服务属性的进行关联,比如instance A绑定到server A上或者集群 A上,
  2. 有了任务和资源的绑定关系后,对应的资源服务就会接收到这个任务配置,在对应的资源上动态加载instance,并提供服务。动态加载的过程,有点类似于之前的autoScan机制,只不过基于canal-admin之后可就以变为远程的web操作,而不需要在机器上运维配置文件
  3. 将server抽象成资源之后,原本canal-server运行所需要的canal.properties/instance.properties配置文件就需要在web ui上进行统一运维,每个server只需要以最基本的启动配置 (比如知道一下canal-admin的manager地址,以及访问配置的账号、密码即可)

集群模式配置

  1. 先配置一个zk地址和集群名称。


    集群模式
  2. 配置canal.properties
    2.1. 选择主配置

    集群模式主配置
    2.2. 设置canal.properties,可以先载入模板,然后进行修改。
    canal.properties

canal.properties核心配置:

...
# register ip to zookeeper
canal.register.ip = 10.8.158.4
canal.port = 11111
canal.metrics.pull.port = 11112
# canal instance user/passwd
canal.user = canal
canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

# canal admin config
canal.admin.manager = 10.8.158.4:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441

canal.zkServers = 10.8.158.4:2181
# flush data to zk
canal.zookeeper.flush.period = 1000
canal.withoutNetty = false
# tcp, kafka, RocketMQ
canal.serverMode = rabbitmq
...
# table meta tsdb info
canal.instance.tsdb.enable = false
...
# 一下几项均为 1.1.5 新版本新增支持 rabbitmq 的配置
rabbitmq.host = 10.224.45.9:25674
rabbitmq.virtual.host = /
# 指定 rabbitmq 上的 exchange 名称, "新建 `Exchange`" 步骤新建的名称
rabbitmq.exchange = usercenter
# 连接 rabbitmq 的用户名
rabbitmq.username = guest
# 连接 rabbitmq 的密码
rabbitmq.password = guest
...

3、 Canal Server配置及部署

canal-server接入canal-admin,参考:Canal Admin ServerGuide

3.1. 核心配置

root@dreamson-QiTianM425-N000:/tmp/canal-1.1.5/conf# cat canal_local.properties 
# register ip
canal.register.ip = 10.8.158.4

# canal admin config
canal.admin.manager = 10.8.158.4:8089
canal.admin.port = 11110
canal.admin.user = admin
canal.admin.passwd = 4ACFE3202A5FF5CF467898FC58AAB1D615029441
# admin auto register
canal.admin.register.auto = true
canal.admin.register.cluster = usercenter

3.2. 启动canal-server,注册canal-admin

root@dreamson-QiTianM425-N000:/tmp/canal-1.1.5# bash bin/startup.sh local

注册后在admin中的展现。


server注册admin

4.Instance实例配置

4.1. 新增instance实例

Instance实例启动后,即可监听mysql的binlog,并将数据推到RabbitMQ。推送的规则参考:Canal Kafka/RocketMQ QuickStart
RabbitMQ的配置的topic,事实上是设置到routeKey上。

新增instance实例 instance列表

5. 最后我们调试一下

MQ监控处理

@RabbitListener(bindings = {
            @QueueBinding(
                    value = @Queue(value = "usercenter", durable = "true"),
                    exchange = @Exchange(value = "usercenter", type = ExchangeTypes.TOPIC),
                    key = "usercenter"
            )
    }, concurrency = "10")
    public void test(String message) {
        log.info("test接收到消息。message:{}", message);
    }

修改表数据

修改数据库数据

MQ监控输入日志如下

2021-02-04 11:34:53.037 INFO  [???-???-auth] com.???.???.???.auth.???.comsumer.AuthComsumer :: - test接收到消息。message:{"data":[{"id":"163","name":"0.18278394004589313","age":"0"}],"database":"user_center","es":1612409677000,"id":15,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(200)","age":"int(4)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12,"age":4},"table":"test","ts":1612409677905,"type":"INSERT"}
2021-02-04 11:34:53.050 INFO  [???-???-auth] com.???.???.???.auth.???.comsumer.AuthComsumer :: - test接收到消息。message:{"data":[{"id":"163","name":"0.461750433278969","age":"0"}],"database":"user_center","es":1612409677000,"id":15,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(200)","age":"int(4)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12,"age":4},"table":"test_1","ts":1612409677905,"type":"INSERT"}
2021-02-04 11:34:53.051 INFO  [???-???-auth] com.???.???.???.auth.???.comsumer.AuthComsumer :: - test接收到消息。message:{"data":[{"id":"163","name":"0.12820304849017694","age":"0"}],"database":"user_center","es":1612409677000,"id":15,"isDdl":false,"mysqlType":{"id":"bigint(20)","name":"varchar(200)","age":"int(4)"},"old":null,"pkNames":["id"],"sql":"","sqlType":{"id":-5,"name":12,"age":4},"table":"test_2","ts":1612409677905,"type":"INSERT"}

相关文章

  • Canal+RabbitMQ数据异构填坑指南

    此文章主要补充Canal整合RabbitMQ时填过的坑。 Canal的简介和用法,请查看官网介绍:https://...

  • puppeteer填坑指南

    puppeteer填坑指南 前言 原文地址:https://zhangzippo.github.io/posts/...

  • carthage 填坑指南

    1.安装carthage可以参考这篇文章http://blog.csdn.net/Mazy_ma/article/...

  • Python填坑指南

    1。Linux环境下,Python是默认安装的,而且同时内置了2.7和3.5版本,这就导致了后面要讲的一些问题。 ...

  • RN填坑指南

    本地JS server要打开,这个不是很稳定,有时候能运行,有时候不能,不能时只能新建一个项目了运行adb rev...

  • Docker 填坑指南

    背景 之前团队中因为Node版本升级问题,项目整个瘫痪无法运行,搞得杰哥双休日折腾环境折腾了两天,老大遂提出以Do...

  • FREESWITCH 填坑指南

    1.查看网关注册状态 sofia status 2.桥接(未实践) http://wiki.freeswitch....

  • flutter填坑指南

    Flutter报错 Waiting for another flutter command to release ...

  • 2018-09-06--ActiveAndroid在8.0权限问

    ActiveAndroid填坑记录: 坑--------》使用ActiveAndroid数据库在系统8.0(华为、...

  • 开发工具总结(4)之Android Studio3.0填坑指南

    序言 上篇讲了: 全面总结Android Studio2.X的填坑指南 这篇讲一下AS3.0的坑。。 作为这个世界...

网友评论

      本文标题:Canal+RabbitMQ数据异构填坑指南

      本文链接:https://www.haomeiwen.com/subject/dpwvtltx.html