Canal是阿里巴巴集团提供的一个开源产品,能够通过解析数据库的增量日志,提供增量数据的订阅和消费功能。当您需要将MySQL中的增量数据同步至ES时,可通过Canal来实现。
准备环境(以下几个的版本是我用的,不是强制使用):
- JDK(1.8)
- Mysql (5.7.30)
- ElasticSearch (7.7.0)
- Canal Server(1.1.5)
- Canal-adapter(1.1.5)(由于使用的es7,1.1.4及以下不支持es7)
1 mysql
1.1 开启mysql的binlog写入功能
执行show master staus;如果显示为空,则是没有配置主从,修改my.cnf并重启mysql,https://github.com/alibaba/canal/wiki/AdminGuide:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1
image.png
image.png
image.png
1.2 创建数据库es_test
1.3 创建表user
CREATE TABLE `user` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(256) COLLATE utf8mb4_unicode_ci NOT NULL,
`detail` text COLLATE utf8mb4_unicode_ci NOT NULL,
`age` int(3) NOT NULL,
PRIMARY KEY (`id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
2 elasticsearch
2.1 创建索引
PUT user
{
"acknowledged" : true,
"shards_acknowledged" : true,
"index" : "user"
}
2.2 创建mapping
PUT user
{
"settings": {
"index": {
"number_of_shards": "5",
"number_of_replicas": "1"
}
},
"mappings": {
"properties": {
"age": {
"type": "integer"
},
"id": {
"type": "long"
},
"name": {
"type": "text",
"analyzer": "ik_smart"
},
"detail": {
"type": "text",
"analyzer": "ik_max_word"
}
}
}
}
执行GET /user,查看创建的索引:
GET /user
{
"user" : {
"aliases" : { },
"mappings" : {
"properties" : {
"age" : {
"type" : "integer"
},
"detail" : {
"type" : "text",
"analyzer" : "ik_max_word"
},
"id" : {
"type" : "long"
},
"name" : {
"type" : "text",
"analyzer" : "ik_smart"
}
}
},
"settings" : {
"index" : {
"creation_date" : "1591092311266",
"number_of_shards" : "5",
"number_of_replicas" : "1",
"uuid" : "ZSl8PnqcSriSPtNNyS8qdA",
"version" : {
"created" : "7070099"
},
"provided_name" : "user"
}
}
}
}
3 Canal-server
3.1 下载Canal-deployer,本文使用Canal-deployer 1.1.5版本。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.deployer-1.1.5-SNAPSHOT.tar.gz
3.2 解压canal.deployer-1.1.5-SNAPSHOT.tar.gz。
mkdir canal.deployer-1.1.5
tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C ./canal.deployer-1.1.5/
3.3 修改conf/example/instance.properties文件。
vim conf/example/instance.properties
配置mysql的相关信息:
canal.instance.master.address=127.0.0.1:3306
canal.instance.dbUsername=canal
canal.instance.dbPassword=canal
#table regex
canal.instance.filter.regex=es_test\\..*
数据表的过滤条件https://github.com/alibaba/canal/wiki/%E5%B8%B8%E8%A7%81%E9%97%AE%E9%A2%98%E8%A7%A3%E7%AD%94
使用:wq命令保存文件并退出vim模式。
3.4 启动Canal-server,并查看日志。
./bin/startup.sh
cat logs/canal/canal.log
image.png
4 Canal-adapter
4.1 下载Canal-adapter,本文使用Canal-adapter1.1.5版本。
wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.adapter-1.1.5-SNAPSHOT.tar.gz
4.2 解压canal.adapter-1.1.5-SNAPSHOT.tar.gz。
mkdir canal.adapter-1.1.5
tar -zxvf canal.adapter-1.1.5-SNAPSHOT.tar.gz -C ./canal.adapter-1.1.5/
4.3 修改conf/application.yml文件。
vim conf/application.yml
修改以下配置:
srcDataSources:
defaultDS:
url: jdbc:mysql://127.0.0.1:3306/es_test?useUnicode=true
username: x
password: x123
- name: logger
key: logger1
- name: es7
key: es71 #这个很重要,必须设置唯一,不然有坑
hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
properties:
mode: rest # or rest
security.auth: elastic:123456 # only used for rest mode
cluster.name: xzp-application
配置项 | 说明 |
---|---|
canal.conf.canalServerHost | canalDeployer访问地址。保持默认(127.0.0.1:11111)即可。 |
canal.conf.srcDataSources.defaultDS.url | jdbc:mysql://<MySQL地址>:<端口>/<数据库名称>?useUnicode=true |
canal.conf.srcDataSources.defaultDS.username | MySQL数据库的账号名称 |
canal.conf.srcDataSources.defaultDS.password | MySQL数据库的密码 |
canal.conf.canalAdapters.groups.outerAdapters.hosts | 定位到name:es的位置,将hosts替换为<ES的地址>:<端口> |
canal.conf.canalAdapters.groups.outerAdapters.mode | 必须设置为rest。 |
canal.conf.canalAdapters.groups.outerAdapters.properties.security.auth | <ES的账号>:<密码>。例如elastic:es_password。 |
canal.conf.canalAdapters.groups.outerAdapters.properties.cluster.name | es集群的名字 |
使用:wq命令保存文件并退出vim模式。
4.4 在conf/es/目录下新建user.yml,定义MySQL数据到ES数据的映射字段。
vim user.yml
dataSourceKey: defaultDS
outerAdapterKey: es71
destination: example
groupId: g1
esMapping:
_index: user
_type: _doc
_id: _id
upsert: true
# pk: id
sql: "select id as _id,id,name,detail,age from user"
etlCondition: "where id<='{0}'" #etl的条件参数,可以将之前没能同步的数据同步,数据量大的话可以用logstash
commitBatch: 3000
使用:wq命令保存文件并退出vim模式。
配置项 | 说明 |
---|---|
esMapping._index | 创建的索引的名称,user |
esMapping._type | 默认_doc即可 |
esMapping._id | 需要同步到ES实例的文档的id,可自定义。本文使用_id。 |
esMapping.sql | SQL语句,用来查询需要同步到ES中的字段。本文使用select id as _id,id,name,detail,age from user |
4.5 启动Canal-adapter服务,并查看日志。
./bin/startup.sh
cat logs/adapter/adapter.log
image.png
5 adapter管理REST接口
5.1 查询所有订阅同步的canal instance或MQ topic
http://ip:8081/destinations
[{"destination":"example","status":"on"}]
5.2 查询数据同步开关状态
查看指定 canal instance/MQ topic 的数据同步开关状态
http://ip:8081/syncSwitch/example
{"stauts":"on"}
5.3 打开数据同步开关
针对 example 这个canal instance/MQ topic 进行开关操作. off代表关闭, instance/topic下的同步将阻塞或者断开连接不再接收数据, on代表开启
注: 如果在配置文件中配置了 zookeeperHosts 项, 则会使用分布式锁来控制HA中的数据同步开关, 如果是单机模式则使用本地锁来控制开关
PUT http://ip:8081/syncSwitch/example/on
{
"code": 20000,
"message": "实例: example 开启同步成功"
}
改为/syncSwitch/example/off是关闭
{
"code": 20000,
"message": "实例: example 关闭同步成功"
}
5.4 手动ETL
导入数据到指定类型的库, 如果params参数为空则全表导入, 参数对应的查询条件在配置中的etlCondition指定
#/etl/es7/{key}/user.yml
#curl http://ip:8081/etl/es7/es71/user.yml -X POST -d "params=50"
curl http://ip:8081/etl/es7/es71/user.yml -X POST
{
"succeeded": true,
"resultMessage": "导入ES 数据:59 条"
}
6 出现的问题
6.1 数据没同步
image.png执行show master staus;如果显示为空,则是没有配置主从,修改my.cnf并重启mysql,https://github.com/alibaba/canal/wiki/AdminGuide:
[mysqld]
log-bin=mysql-bin
binlog-format=ROW
server-id=1
image.png
image.png
image.png
网友评论