下载安装canal
-
我下载的是最新稳定版本,下载地址:https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
-
解压缩
mkdir -p /usr/local/canal cp canal.deployer-1.1.3.tar.gz /usr/local/canal tar -zxvf canal.deployer-1.1.3.tar.gz cd canal.deployer-1.1.3
-
修改
instance
配置文件conf/example/instance.properties
#修改mysql数据库的连接信息. canal.instance.master.address=127.0.0.1:3306 canal.instance.dbUsername=root canal.instance.dbPassword=123456 #编码设置=utf-8即可,即使db_ljlj库是latin1,这里也设置utf-8 #否则乱码 canal.instance.connectionCharset = utf-8 #选择监控的数据库 canal.instance.defaultDatabaseName=db_ljlj # 白名单配置,指定数据库中的表进行同步 canal.instance.filter.regex=db_ljlj.product_conume #修改kafka队列配置 canal.mq.topic=hello #主题名 ##mysql库名.表名: 唯一主键,多个表之间用逗号分隔 canal.mq.partitionHash=db_ljlj.product_conume:consume_id^name,.*\\..*
-
修改
canal
配置文件/usr/local/canal/conf/canal.properties
# 可选项: tcp(默认), kafka, RocketMQ canal.serverMode = kafka # kafka/rocketmq 集群配置: canal.mq.servers = 127.0.0.1:9092,127.0.0.1:9093 canal.mq.retries = 0 # flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限 canal.mq.batchSize = 16384 canal.mq.maxRequestSize = 1048576 # flatMessage模式下请将该值改大, 建议50-200 canal.mq.lingerMs = 1 canal.mq.bufferMemory = 33554432 # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下) canal.mq.canalBatchSize = 50 # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时 canal.mq.canalGetTimeout = 100 # 是否为flat json格式对象 canal.mq.flatMessage = true canal.mq.compressionType = none canal.mq.acks = all # kafka消息投递是否使用事务 canal.mq.transaction = false
启动 和关闭 canal
- 启动
cd /usr/local/canal/ sh bin/startup.sh
- 查看日志
//a. 查看 logs/canal/canal.log vim logs/canal/canal.log //b. 查看instance的日志: vim logs/example/example.log
- 关闭
cd /usr/local/canal/ sh bin/stop.sh
- 新增product_conume表数据后,canal从binlog中获取新增到kafka中的数据格式:
{ "data": [ { "consume_id": "1886382", "consume_pact_id": "151201121", "consume_merchant_id": null, } ], "database": "db_ljlj", "es": 1585301895000, "id": 5, "isDdl": false, "mysqlType": { "consume_id": "int(10)", "consume_pact_id": "int(20) unsigned", "consume_merchant_id": "int(10)", }, "old": null, "pkNames": null, "sql": "", "sqlType": { "consume_id": 4, "consume_pact_id": 4, "consume_merchant_id": 4, }, "table": "product_conume", "ts": 1585301895411, "type": "INSERT" }
网友评论