美文网首页运维消息队列数据可视化
使用canal实现增量同步MySQL的数据到ES

使用canal实现增量同步MySQL的数据到ES

作者: Okami_ | 来源:发表于2019-06-14 17:19 被阅读266次

    使用canal实现增量同步MySQL的数据

    搭建环境

    • 操作系统: CentOS release 6.5 (Final)
    • MySQL版本: 10.0.33-MariaDB-wsrep
    • JDK版本:1.8(强力要求,否则会导致ES和canal-adapter无法启动)
    • ElasticSearch版本:6.8.0
    • canal版本: 1.1.3
    • zookeeper

    技术方案概览

    • 开启MySQL的binary log日志记录
    • 修改MySQL的binary log模式为ROW
    • canal-server充当MySQL集群的一个slave,获取master的binary log信息
    • canal-server将拿到的binary log信息推送给canal-adapter
    • canal-server和canal-adapter采用多节点部署的方式提高可用性
    • canal-adapter将数据同步到es集群

    MySQL配置

    • 开启master的binary log记录功能,并且选择模式为ROW
    log-bin=mysql-bin #添加这一行就ok
    binlog-format=ROW #选择row模式
    server_id=1 #配置mysql replaction需要定义,不能和canal的slaveId重复
    
    • canal的原理是模拟自己为mysql slave,所以这里一定需要做为mysql slave的相关权限.
    CREATE USER canal IDENTIFIED BY 'canal';  
    GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO 'canal'@'%';
    -- GRANT ALL PRIVILEGES ON *.* TO 'canal'@'%' ;
    FLUSH PRIVILEGES;
    

    ES安装

    下载安装包

    • 进入到Elasticsearch的官网下载页面https://www.elastic.co/cn/downloads/elasticsearch
    • 如果不想安装最新版本,可以选择历史版本


    • 本次安装版本号选用6.8.0


    • 下载安装包
    wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-6.8.0.tar.gz
    

    新增系统用户

    • 由于elasticsearch不能使用root用户启动,所以我们创建一个新的用户
    # 新建用户
    adduser es  
    # 给新用户添加密码
    passwd es
    # 切换登陆用户
    su es
    
    • 将安装包copy到路径/home/es/elasticsearch
    mv elasticsearch-6.8.0.tar.gz /home/es/elasticsearch
    

    解压安装包

    cd /home/es/elasticsearch
    tar -xzvf elasticsearch-6.8.0.tar.gz
    

    修改配置文件

    vi config/elasticsearch.yml 
    
    #集群的名称,同一个集群该值必须设置成相同的
    cluster.name: okami-application
    #该节点的名字
    node.name: node-1
    #该节点有机会成为master节点
    node.master: true
    #该节点可以存储数据
    node.data: true
    #shard的数目
    #index.number_of_shards: 5
    #数据副本的数目
    #index.number_of_replicas: 3
    #设置绑定的IP地址,可以是IPV4或者IPV6
    network.bind_host: 0.0.0.0
    #设置其他节点与该节点交互的IP地址
    network.publish_host: 192.168.10.1
    #该参数用于同时设置bind_host和publish_host
    network.host: 192.168.10.1
    #设置节点之间交互的端口号
    transport.tcp.port: 9300
    #设置是否压缩tcp上交互传输的数据
    transport.tcp.compress: true
    #设置对外服务的http端口号
    http.port: 9200
    #设置http内容的最大大小
    http.max_content_length: 100mb
    #是否开启http服务对外提供服务
    http.enabled: true 
    #设置这个参数来保证集群中的节点可以知道其它N个有master资格的节点。默认为1,对于大的集群来说,可以设置大一点的值(2-4)
    discovery.zen.minimum_master_nodes: 1
    #设置集群中自动发现其他节点时ping连接的超时时间
    discovery.zen.ping_timeout: 120s
    #设置是否打开多播发现节点
    #discovery.zen.ping.multicast.enabled: true
    #设置集群中的Master节点的初始列表,可以通过这些节点来自动发现其他新加入集群的节点
    discovery.zen.ping.unicast.hosts: ["192.168.10.1:9300"]
    
    path.data: /usr/hdp/2.5.0.0-1245/esdata
    path.logs: /usr/hdp/2.5.0.0-1245/eslog
    
    http.cors.enabled: true
    http.cors.allow-origin: "*"
    #--------------------------------------------------------------------------------
    #index.analysis.analyzer.ik.type: "ik"
    

    启动ES

    • ES要求Java版本至少1.8,所以要检查Java版本,如果版本过低的话需要更新
    [es@xxx elasticsearch-7.1.1]# java -version
    java version "1.8.0_172"
    Java(TM) SE Runtime Environment (build 1.8.0_172-b11)
    Java HotSpot(TM) 64-Bit Server VM (build 25.172-b11, mixed mode)
    
    • 启动ES(添加参数-d,后台启动)
    ./home/es/elasticsearch/elasticsearch-6.8.0/bin/elasticsearch -d
    
    • 检查ES节点是否部署成功
    [es@xxx ~]#  curl http://127.0.0.1:9200
    {
      "name" : "node-1",
      "cluster_name" : "okami-application",
      "cluster_uuid" : "Q00-w01oQT6vsXx7E6KIeA",
      "version" : {
        "number" : "6.8.0",
        "build_flavor" : "default",
        "build_type" : "tar",
        "build_hash" : "65b6179",
        "build_date" : "2019-05-15T20:06:13.172855Z",
        "build_snapshot" : false,
        "lucene_version" : "7.7.0",
        "minimum_wire_compatibility_version" : "5.6.0",
        "minimum_index_compatibility_version" : "5.0.0"
      },
      "tagline" : "You Know, for Search"
    }
    

    安装部署其他主机

    • 在同一个局域网段内的其他主机按照以上步骤安装部署ES

    检查集群的部署情况

    [es@xxx ~]#  curl http://127.0.0.1:9200/_cluster/health
    {"cluster_name":"okami-application","status":"green","timed_out":false,"number_of_nodes":3,"number_of_data_nodes":3,"active_primary_shards":0,"active_shards":0,"relocating_shards":0,"initializing_shards":0,"unassigned_shards":0,"delayed_unassigned_shards":0,"number_of_pending_tasks":0,"number_of_in_flight_fetch":0,"task_max_waiting_in_queue_millis":0,"active_shards_percent_as_number":100.0}
    
    

    安装中遇到的问题

      1. max file descriptors [4096] for elasticsearch process is too low, increase to at least [65536]
      • 每个进程最大同时打开文件数太小,可通过下面2个命令查看当前数量
       ulimit -Hn
       ulimit -Sn
      
      • 修改/etc/security/limits.conf文件,增加配置,用户退出后重新登录生效
        *               soft    nofile          65536
        *               hard    nofile          65536
        
      1. max number of threads [3818] for user [es] is too low, increase to at least [4096]
      • 问题同上,最大线程个数太低。修改配置文件/etc/security/limits.conf,增加配置
          *               soft    nproc           4096
          *               hard    nproc           4096
        
        可通过命令查看
          ulimit -Hu
          ulimit -Su
        
      1. max virtual memory areas vm.max_map_count [65530] is too low, increase to at least [262144]
        • 修改/etc/sysctl.conf文件,增加配置vm.max_map_count=262144
          vi /etc/sysctl.conf
          sysctl -p
        

    canal-server的安装

    下载canal

    • (可以直接下载安装包,也可以下载源码自己打包,我们采用直接下载的方式), 已下载的话直接拷贝到安装目录即可
    wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz
    
    • 将下载好的文件移动到自定义的安装路径
    mv canal.deployer-1.1.3.tar.gz /opt/app/canal
    

    解压

    tar zxvf canal.deployer-1.1.3.tar.gz
    

    修改配置文件

    • vi /opt/app/canal/canal_server/conf/canal.properties
    canal.id = 1 # 每个canal server实例的唯一标识,暂无实际意义
    canal.ip = 192.111.112.103 # canal server绑定的本地IP信息,如果不配置,默认选择一个本机IP进行启动服务
    canal.port = 11111 # canal server提供socket服务的端口
    canal.metrics.pull.port = 11112
    canal.zkServers = 192.168.1.111:2181 #canal server链接zookeeper集群的链接信息
    
    # flush data to zk
    canal.zookeeper.flush.period = 1000 #canal持久化数据到zookeeper上的更新频率,单位毫秒
    canal.withoutNetty = false 
    # tcp, kafka, RocketMQ
    canal.serverMode = tcp
    # flush meta cursor/parse position to file
    canal.file.data.dir = ${canal.conf.dir}
    canal.file.flush.period = 1000
    ## memory store RingBuffer size, should be Math.pow(2,n)
    canal.instance.memory.buffer.size = 16384
    ## memory store RingBuffer used memory unit size , default 1kb
    canal.instance.memory.buffer.memunit = 1024 
    ## meory store gets mode used MEMSIZE or ITEMSIZE
    canal.instance.memory.batch.mode = MEMSIZE
    canal.instance.memory.rawEntry = true
    
    ## detecing config
    canal.instance.detecting.enable = false
    #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()
    canal.instance.detecting.sql = select 1
    canal.instance.detecting.interval.time = 3
    canal.instance.detecting.retry.threshold = 3
    canal.instance.detecting.heartbeatHaEnable = false
    
    # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery
    canal.instance.transaction.size =  1024
    # mysql fallback connected to new master should fallback times
    canal.instance.fallbackIntervalInSeconds = 60
    
    # network config
    canal.instance.network.receiveBufferSize = 16384
    canal.instance.network.sendBufferSize = 16384
    canal.instance.network.soTimeout = 30
    
    # binlog filter config
    canal.instance.filter.druid.ddl = true
    canal.instance.filter.query.dcl = false
    canal.instance.filter.query.dml = false
    canal.instance.filter.query.ddl = false
    canal.instance.filter.table.error = false
    canal.instance.filter.rows = false
    canal.instance.filter.transaction.entry = false
    
    # binlog format/image check
    canal.instance.binlog.format = ROW,STATEMENT,MIXED 
    canal.instance.binlog.image = FULL,MINIMAL,NOBLOB
    
    # binlog ddl isolation
    canal.instance.get.ddl.isolation = false
    
    # parallel parser config
    canal.instance.parser.parallel = true
    ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()
    #canal.instance.parser.parallelThreadSize = 16
    ## disruptor ringbuffer size, must be power of 2
    canal.instance.parser.parallelBufferSize = 256
    
    # table meta tsdb info
    canal.instance.tsdb.enable = false
    canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}
    canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;
    canal.instance.tsdb.dbUsername = canal
    canal.instance.tsdb.dbPassword = password
    # dump snapshot interval, default 24 hour
    canal.instance.tsdb.snapshot.interval = 24
    # purge snapshot expire , default 360 hour(15 days)
    canal.instance.tsdb.snapshot.expire = 360
    
    # aliyun ak/sk , support rds/mq
    canal.aliyun.accessKey =
    canal.aliyun.secretKey =
    
    #################################################
    #########               destinations            ############# 
    #################################################
    canal.destinations = example_01,example_02  # 当前server上部署的instance列表
    # conf root dir
    canal.conf.dir = ../conf
    # auto scan instance dir add/remove and start/stop instance
    canal.auto.scan = true
    canal.auto.scan.interval = 5
    
    #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml
    #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
    
    canal.instance.global.mode = spring # 全局配置加载方式
    canal.instance.global.lazy = false
    #canal.instance.global.manager.address = 127.0.0.1:1099
    #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
    #canal.instance.global.spring.xml = classpath:spring/file-instance.xml
    canal.instance.global.spring.xml = classpath:spring/default-instance.xml
    
    ##################################################
    #########                    MQ                      #############
    ##################################################
    canal.mq.servers = 127.0.0.1:6667
    canal.mq.retries = 0
    canal.mq.batchSize = 16384
    canal.mq.maxRequestSize = 1048576
    canal.mq.lingerMs = 100
    canal.mq.bufferMemory = 33554432
    canal.mq.canalBatchSize = 50
    canal.mq.canalGetTimeout = 100
    canal.mq.flatMessage = true
    canal.mq.compressionType = none
    canal.mq.acks = all
    # use transaction for kafka flatMessage batch produce
    canal.mq.transaction = false
    #canal.mq.properties. =
    
    
    • 配置多个destination, 需要在conf下创建对应的目录
    mkdir conf/example_01
    mkdir conf/example_02
    
    • 在对应的目录下边编写配置文件instance.properties
    canal.instance.mysql.slaveId=99
    canal.instance.gtidon=false
    
    # position info
    canal.instance.master.address=
    canal.instance.master.journal.name=
    canal.instance.master.position=
    canal.instance.master.timestamp=
    canal.instance.master.gtid=
    
    # rds oss binlog
    canal.instance.rds.accesskey=
    canal.instance.rds.secretkey=
    canal.instance.rds.instanceId=
    
    # table meta tsdb info
    canal.instance.tsdb.enable=false
    
    # username/password
    canal.instance.dbUsername=username
    canal.instance.dbPassword=password
    canal.instance.defaultDatabaseName=dbName
    canal.instance.connectionCharset = UTF-8
    # enable druid Decrypt database password
    canal.instance.enableDruid=false
    
    # table regex
    canal.instance.filter.regex=.*\\..*
    
    # mq config
    canal.mq.topic=example
    # dynamic topic route by schema or table regex
    #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
    canal.mq.partition=0
    # hash partition config
    #canal.mq.partitionsNum=3
    #canal.mq.partitionHash=test.table:id^name,.*\\..*
    

    配置说明

    • mysql链接时的起始位置

    • canal.instance.master.journal.name + canal.instance.master.position : 精确指定一个binlog位点,进行启动

    • canal.instance.master.timestamp : 指定一个时间戳,canal会自动遍历mysql binlog,找到对应时间戳的binlog位点后,进行启动

    • 不指定任何信息:默认从当前数据库的位点,进行启动

    • instance.xml配置文件

      • memory-instance.xml: 所有的组件(parser , sink , store)都选择了内存版模式,记录位点的都选择了memory模式,重启后又会回到初始位点进行解析
      • default-instance.xml: store选择了内存模式,其余的parser/sink依赖的位点管理选择了持久化模式,目前持久化的方式主要是写入zookeeper,保证数据集群共享.
      • group-instance.xml: 主要针对需要进行多库合并时,可以将多个物理instance合并为一个逻辑instance,提供客户端访问
    • 多个destination配置

      • 在canal.properties里边配置canal.destinations , 用英文逗号分隔
      • 在conf路径下创建对应的路径并添加对应的instance.properties
    • canal.instance.filter.regex的编写规则

    1.  所有表:.*   or  .*\\..*
    2.  canal schema下所有表: canal\\..*
    3.  canal下的以canal打头的表:canal\\.canal.*
    4.  canal schema下的一张表:canal.test1
    5.  多个规则组合使用:canal\\..*,mysql.test1,mysql.test2 (逗号分隔)
    

    启动

    • 进入到路径bin下边,有几个脚本

      canal.pid     # 记录服务的进程ID
      restart.sh    # 重启服务
      startup.sh    # 启动脚本
      stop.sh           # 停止服务
      
    • 运行./startup.sh就可以启动了

    查看日志

    • 服务启动日志(logs/canal/canal.log)

    • 实例运行日志 (logs/example/example.log)

    canal-adapter的安装

    下载安装包

    wget https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.adapter-1.1.3.tar.gz
    

    解压

    tar xzvf canal.adapter-1.1.3.tar.gz
    

    修改配置文件

    • 修改conf/application.yml
    server:
      port: 8081
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: non_null
    
    canal.conf:
      mode: tcp
      zookeeperHosts: 192.111.111.173:2181
    #  mqServers: 127.0.0.1:9092 #or rocketmq
    #  flatMessage: true
      batchSize: 500
      syncBatchSize: 1000
      retries: 0
      timeout:
      accessKey:
      secretKey:
      srcDataSources:
        defaultDS:
          url: jdbc:mysql://192.168.1.100:3306/test?useUnicode=true
          username: username
          password: password
        defaultDS2:
          url: jdbc:mysql://192.168.1.101:3306/test?useUnicode=true
          username: username
          password: password
      canalAdapters:
      - instance: example_01
        groups:
        - groupId: g1
          outerAdapters:
          - name: logger
          - name: es
            hosts: 192.168.1.110:9300
            properties:
              cluster.name: okami-application
      - instance: example_02
        groups:
        - groupId: g1
          outerAdapters:
          - name: logger
          - name: es
            hosts: 192.168.1.111:9300
            properties:
              cluster.name: okami-application
    
    • 在conf/es/路径下添加配置文件example_01.yml 和 example_02.yml
    vi conf/es/example_01.yml
    
    dataSourceKey: defaultDS
    destination: example_01
    groupId: g1
    esMapping:
      _index: indexName
      _type: typeName
      _id: _id
      upsert: true
    #  pk: id
      sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
            a.c_time as _c_time from user a
            left join role b on b.id=a.role_id"
    #  objFields:
    #    _labels: array:;
    #  etlCondition: "where a.c_time>='{0}'"
      commitBatch: 3000
    
    vi conf/es/example_02.yml
    
    dataSourceKey: defaultDS2
    destination: example_02
    groupId: g1
    esMapping:
      _index: indexName
      _type: typeName
      _id: _id
      upsert: true
    #  pk: id
      sql: "select a.id as _id, a.name as _name, a.role_id as _role_id, b.role_name as _role_name,
            a.c_time as _c_time from user a
            left join role b on b.id=a.role_id"
    
    
    #  objFields:
    #    _labels: array:;
    #  etlCondition: "where a.c_time>='{0}'"
      commitBatch: 3000
    

    配置说明

    • 一份数据可以被多个group同时消费, 多个group之间会是一个并行执行, 一个group内部是一个串行执行多个outerAdapters

    启动

    • 进入到路径bin下边,有几个脚本

      canal.pid     # 记录服务的进程ID
      restart.sh    # 重启服务
      startup.sh    # 启动脚本
      stop.sh           # 停止服务
      
    • 运行./startup.sh就可以启动了

    查看日志

    tail -f logs/adapter/adapter.log 
    

    通过Http请求管理

    • 查询所有订阅同步的canal instance:http://112.33.11.124:8081/destinations

      [
          {
              "destination": "example_01",
              "status": "on"
          },
          {
              "destination": "example_02",
              "status": "on"
          }
      ]
    
    • 数据同步开关状态: http://112.33.11.124:8081/syncSwitch/example_02

    {
        "stauts": "off"
    }
    
    • 数据同步开关http://112.33.11.124:8081/syncSwitch/example_01/on PUT

    {
        "code": 20000,
        "message": "实例: example_01 开启同步成功"
    }
    

    相关文章

      网友评论

        本文标题:使用canal实现增量同步MySQL的数据到ES

        本文链接:https://www.haomeiwen.com/subject/lcwpfctx.html