美文网首页
通过Canal将MySQL数据同步到ElasticSearch

通过Canal将MySQL数据同步到ElasticSearch

作者: 爱的旋转体 | 来源:发表于2020-06-06 11:52 被阅读0次

    Canal是阿里巴巴集团提供的一个开源产品,能够通过解析数据库的增量日志,提供增量数据的订阅和消费功能。当您需要将MySQL中的增量数据同步至ES时,可通过Canal来实现。

    准备环境(以下几个的版本是我用的,不是强制使用):

    • JDK(1.8)
    • Mysql (5.7.30)
    • ElasticSearch (7.7.0)
    • Canal Server(1.1.5)
    • Canal-adapter(1.1.5)(由于使用的es7,1.1.4及以下不支持es7)

    1 mysql

    1.1 开启mysql的binlog写入功能

    执行show master staus;如果显示为空,则是没有配置主从,修改my.cnf并重启mysql,https://github.com/alibaba/canal/wiki/AdminGuide

    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server-id=1
    
    image.png
    image.png
    image.png

    1.2 创建数据库es_test

    1.3 创建表user

    CREATE TABLE `user` (
      `id` bigint(20) NOT NULL AUTO_INCREMENT,
      `name` varchar(256) COLLATE utf8mb4_unicode_ci NOT NULL,
      `detail` text COLLATE utf8mb4_unicode_ci NOT NULL,
      `age` int(3) NOT NULL,
      PRIMARY KEY (`id`)
    ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
    

    2 elasticsearch

    2.1 创建索引

    PUT user
    
    {
      "acknowledged" : true,
      "shards_acknowledged" : true,
      "index" : "user"
    }
    

    2.2 创建mapping

    PUT user
    {
      "settings": {
        "index": {
          "number_of_shards": "5",
          "number_of_replicas": "1"
        }
      },
      "mappings": {
        "properties": {
          "age": {
            "type": "integer"
          },
          "id": {
            "type": "long"
          },
          "name": {
            "type": "text",
            "analyzer": "ik_smart"
          },
          "detail": {
            "type": "text",
            "analyzer": "ik_max_word"
          }
        }
      }
    }
    

    执行GET /user,查看创建的索引:

    GET /user
    
    {
      "user" : {
        "aliases" : { },
        "mappings" : {
          "properties" : {
            "age" : {
              "type" : "integer"
            },
            "detail" : {
              "type" : "text",
              "analyzer" : "ik_max_word"
            },
            "id" : {
              "type" : "long"
            },
            "name" : {
              "type" : "text",
              "analyzer" : "ik_smart"
            }
          }
        },
        "settings" : {
          "index" : {
            "creation_date" : "1591092311266",
            "number_of_shards" : "5",
            "number_of_replicas" : "1",
            "uuid" : "ZSl8PnqcSriSPtNNyS8qdA",
            "version" : {
              "created" : "7070099"
            },
            "provided_name" : "user"
          }
        }
      }
    }
    

    3 Canal-server

    3.1 下载Canal-deployer,本文使用Canal-deployer 1.1.5版本。

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.deployer-1.1.5-SNAPSHOT.tar.gz
    

    3.2 解压canal.deployer-1.1.5-SNAPSHOT.tar.gz。

    mkdir canal.deployer-1.1.5
    tar -zxvf canal.deployer-1.1.5-SNAPSHOT.tar.gz -C ./canal.deployer-1.1.5/
    

    3.3 修改conf/example/instance.properties文件。

    vim conf/example/instance.properties
    

    配置mysql的相关信息:

    canal.instance.master.address=127.0.0.1:3306
    canal.instance.dbUsername=canal
    canal.instance.dbPassword=canal
    #table regex
    canal.instance.filter.regex=es_test\\..*
    

    数据表的过滤条件https://github.com/alibaba/canal/wiki/%E5%B8%B8%E8%A7%81%E9%97%AE%E9%A2%98%E8%A7%A3%E7%AD%94
    使用:wq命令保存文件并退出vim模式。

    image.png

    3.4 启动Canal-server,并查看日志。

    ./bin/startup.sh
    cat logs/canal/canal.log
    
    image.png

    4 Canal-adapter

    4.1 下载Canal-adapter,本文使用Canal-adapter1.1.5版本。

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.5-alpha-1/canal.adapter-1.1.5-SNAPSHOT.tar.gz
    

    4.2 解压canal.adapter-1.1.5-SNAPSHOT.tar.gz。

    mkdir canal.adapter-1.1.5
    tar -zxvf canal.adapter-1.1.5-SNAPSHOT.tar.gz -C ./canal.adapter-1.1.5/
    

    4.3 修改conf/application.yml文件。

    vim conf/application.yml
    

    修改以下配置:

      srcDataSources:
        defaultDS:
          url: jdbc:mysql://127.0.0.1:3306/es_test?useUnicode=true
          username: x
          password: x123
    
          - name: logger
            key: logger1
    
          - name: es7
            key: es71 #这个很重要,必须设置唯一,不然有坑
            hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode
            properties:
              mode: rest # or rest
              security.auth: elastic:123456 #  only used for rest mode
              cluster.name: xzp-application
    
    配置项 说明
    canal.conf.canalServerHost canalDeployer访问地址。保持默认(127.0.0.1:11111)即可。
    canal.conf.srcDataSources.defaultDS.url jdbc:mysql://<MySQL地址>:<端口>/<数据库名称>?useUnicode=true
    canal.conf.srcDataSources.defaultDS.username MySQL数据库的账号名称
    canal.conf.srcDataSources.defaultDS.password MySQL数据库的密码
    canal.conf.canalAdapters.groups.outerAdapters.hosts 定位到name:es的位置,将hosts替换为<ES的地址>:<端口>
    canal.conf.canalAdapters.groups.outerAdapters.mode 必须设置为rest。
    canal.conf.canalAdapters.groups.outerAdapters.properties.security.auth <ES的账号>:<密码>。例如elastic:es_password。
    canal.conf.canalAdapters.groups.outerAdapters.properties.cluster.name es集群的名字

    使用:wq命令保存文件并退出vim模式。

    4.4 在conf/es/目录下新建user.yml,定义MySQL数据到ES数据的映射字段。

    vim user.yml
    
    dataSourceKey: defaultDS
    outerAdapterKey: es71
    destination: example
    groupId: g1
    esMapping:
      _index: user
      _type: _doc
      _id: _id
      upsert: true
    #  pk: id
      sql: "select id as _id,id,name,detail,age from user"
      etlCondition: "where id<='{0}'" #etl的条件参数,可以将之前没能同步的数据同步,数据量大的话可以用logstash
      commitBatch: 3000
    

    使用:wq命令保存文件并退出vim模式。

    配置项 说明
    esMapping._index 创建的索引的名称,user
    esMapping._type 默认_doc即可
    esMapping._id 需要同步到ES实例的文档的id,可自定义。本文使用_id。
    esMapping.sql SQL语句,用来查询需要同步到ES中的字段。本文使用select id as _id,id,name,detail,age from user

    4.5 启动Canal-adapter服务,并查看日志。

    ./bin/startup.sh
    cat logs/adapter/adapter.log
    
    image.png

    5 adapter管理REST接口

    https://github.com/alibaba/canal/wiki/ClientAdapter#32-adapter%E7%AE%A1%E7%90%86rest%E6%8E%A5%E5%8F%A3

    5.1 查询所有订阅同步的canal instance或MQ topic

    http://ip:8081/destinations
    
    [{"destination":"example","status":"on"}]
    

    5.2 查询数据同步开关状态

    查看指定 canal instance/MQ topic 的数据同步开关状态

    http://ip:8081/syncSwitch/example
    
    {"stauts":"on"}
    

    5.3 打开数据同步开关

    针对 example 这个canal instance/MQ topic 进行开关操作. off代表关闭, instance/topic下的同步将阻塞或者断开连接不再接收数据, on代表开启

    注: 如果在配置文件中配置了 zookeeperHosts 项, 则会使用分布式锁来控制HA中的数据同步开关, 如果是单机模式则使用本地锁来控制开关

    PUT http://ip:8081/syncSwitch/example/on
    
    {
        "code": 20000,
        "message": "实例: example 开启同步成功"
    }
    

    改为/syncSwitch/example/off是关闭

    {
        "code": 20000,
        "message": "实例: example 关闭同步成功"
    }
    

    5.4 手动ETL

    导入数据到指定类型的库, 如果params参数为空则全表导入, 参数对应的查询条件在配置中的etlCondition指定

    #/etl/es7/{key}/user.yml
    #curl http://ip:8081/etl/es7/es71/user.yml -X POST -d "params=50"
    curl http://ip:8081/etl/es7/es71/user.yml -X POST
    
    {
        "succeeded": true,
        "resultMessage": "导入ES 数据:59 条"
    }
    

    6 出现的问题

    6.1 数据没同步

    image.png
    执行show master staus;如果显示为空,则是没有配置主从,修改my.cnf并重启mysql,https://github.com/alibaba/canal/wiki/AdminGuide
    [mysqld]
    log-bin=mysql-bin
    binlog-format=ROW
    server-id=1
    
    image.png
    image.png
    image.png

    相关文章

      网友评论

          本文标题:通过Canal将MySQL数据同步到ElasticSearch

          本文链接:https://www.haomeiwen.com/subject/dohezhtx.html