美文网首页
canal实现从mysql同步数据到es

canal实现从mysql同步数据到es

作者: 走在冷风中吧 | 来源:发表于2021-08-26 15:46 被阅读0次

    es建立索引:

    PUT 索引库名称
    {
          "settings": {
              "number_of_shards": 3,
              "number_of_replicas": 2
            }
      }
    
    GET /索引库/_mapping/表名
    {
      "properties":
      {
        "custNo":
        {
          "type":"text"  
        },
        "ifEver":
        {
          "type":"text"  
        },
        "ifApp":
        {
          "type":"text"  
        }
      }
    }
    

    canal-server:

    1. conf/canal.properties

    这里主要配置消费模式为tcp(canal.serverMode = tcp)

    #################################################
    #########       common argument     ############# 
    #################################################
    #canal.manager.jdbc.url=jdbc:mysql://127.0.0.1:3306/canal_manager?useUnicode=true&characterEncoding=UTF-8
    #canal.manager.jdbc.username=root
    #canal.manager.jdbc.password=121212
    canal.id = 1
    canal.ip = 172.16.2.74
    canal.port = 11111
    canal.metrics.pull.port = 11112
    canal.zkServers = 172.16.2.74:2181
    # flush data to zk
    canal.zookeeper.flush.period = 1000
    canal.withoutNetty = false
    # tcp, kafka, RocketMQ
    canal.serverMode = tcp
    # flush meta cursor/parse position to file
    canal.file.data.dir = ${canal.conf.dir}
    canal.file.flush.period = 1000
    
    
    #################################################
    #########       destinations        ############# 
    #################################################
    canal.destinations = examplesss
    # conf root dir
    canal.conf.dir = ../conf
    # auto scan instance dir add/remove and start/stop instance
    canal.auto.scan = true
    canal.auto.scan.interval = 5
    
    #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml
    
    canal.instance.global.mode = spring
    canal.instance.global.lazy = false
    #canal.instance.global.manager.address = 127.0.0.1:1099
    #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml
    canal.instance.global.spring.xml = classpath:spring/file-instance.xml
    #canal.instance.global.spring.xml = classpath:spring/default-instance.xml
    
    ##################################################
    #########            MQ              #############
    ##################################################
    canal.mq.servers = 127.0.0.1:6667
    canal.mq.retries = 0
    canal.mq.batchSize = 16384
    canal.mq.maxRequestSize = 1048576
    canal.mq.lingerMs = 100
    canal.mq.bufferMemory = 33554432
    canal.mq.canalBatchSize = 50
    canal.mq.canalGetTimeout = 100
    canal.mq.flatMessage = true
    canal.mq.compressionType = none
    canal.mq.acks = all
    # use transaction for kafka flatMessage batch produce
    canal.mq.transaction = false
    #canal.mq.properties. =
    
    1. /conf/xxxx/instance.properties xxx为实例名称, 后面adapter配置属性文件时需要对应。我这里叫做example2

    这里主要配置canal要监听哪个数据库(canal.instance.master.address)及用户名密码,

    对数据库中的哪张表数据进行过滤(canal.instance.filter.regex)

    #################################################
    ## mysql serverId , v1.0.26+ will autoGen
    # canal.instance.mysql.slaveId=0
    
    # enable gtid use true/false
    canal.instance.gtidon=false
    
    # position info
    canal.instance.master.address=172.16.2.207:3306
    canal.instance.master.journal.name=
    canal.instance.master.position=
    canal.instance.master.timestamp=
    canal.instance.master.gtid=
    
    # rds oss binlog
    canal.instance.rds.accesskey=
    canal.instance.rds.secretkey=
    canal.instance.rds.instanceId=
    
    # table meta tsdb info
    canal.instance.tsdb.enable=true
    #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb
    #canal.instance.tsdb.dbUsername=canal
    #canal.instance.tsdb.dbPassword=canal
    
    #canal.instance.standby.address =
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position =
    #canal.instance.standby.timestamp =
    #canal.instance.standby.gtid=
    
    # username/password
    canal.instance.dbUsername=root
    canal.instance.dbPassword=byxf1qaz
    canal.instance.connectionCharset = UTF-8
    # enable druid Decrypt database password
    canal.instance.enableDruid=false
    #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==
    
    # table regex
    canal.instance.filter.regex=dcp.dcp_db_config
    #canal.instance.filter.regex=.*\\..*
    # table black regex
    canal.instance.filter.black.regex=
    
    # mq config
    canal.mq.topic=example
    # dynamic topic route by schema or table regex
    #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*
    canal.mq.partition=0
    # hash partition config
    #canal.mq.partitionsNum=3
    #canal.mq.partitionHash=test.table:id^name,.*\\..*
    #################################################
    

    Canal-adapter:

    1. Conf/application.yml

      1.1 主要配置要处理的数据源信息(srcDataSources)。

      1.2 配置canal-server中的instance实例与具体“消费者”adapter之间的关联关系(canalAdapters), 这里可以配置多个关联关系。

       example2是我在admin-server上起的一个实例, 有消费组g1中的es消费。
      

      1.3 outAdapters下可以配置多个“消费者”,

      batchSize: 500
      syncBatchSize: 1000
      retries: 0
      timeout:
      accessKey:
      secretKey:
      srcDataSources:
        defaultDS: 
          url: jdbc:mysql://172.16.2.207:3306/dcp?useUnicode=true
          username: root
          password: byxf1qaz
      canalAdapters:
      - instance: example2
        groups:
        - groupId: g1
          outerAdapters:
          - name: logger
          - name: es
            hosts: 172.16.2.74:9300
            properties:
              cluster.name: okami-application #es集群名称okami-application
              
      #  - instance: example # canal instance Name or mq topic name
    #    groups:
    #    - groupId: g1
    #      outerAdapters:
    #      - name: logger
    #      - name: rdb
    #        key: mysql1
    #        properties:
    #          jdbc.driverClassName: com.mysql.jdbc.Driver
    #          jdbc.url: jdbc:mysql://127.0.0.1:3306/mytest2?useUnicode=true
    #          jdbc.username: root
    #          jdbc.password: 121212
    #      - name: rdb
    #        key: oracle1
    #        properties:
    #          jdbc.driverClassName: oracle.jdbc.OracleDriver
    #          jdbc.url: jdbc:oracle:thin:@localhost:49161:XE
    #          jdbc.username: mytest
    #          jdbc.password: m121212
    #      - name: rdb
    #        key: postgres1
    #        properties:
    #          jdbc.driverClassName: org.postgresql.Driver
    #          jdbc.url: jdbc:postgresql://localhost:5432/postgres
    #          jdbc.username: postgres
    #          jdbc.password: 121212
    #          threads: 1
    #          commitSize: 3000
    #      - name: hbase
    #        properties:
    #          hbase.zookeeper.quorum: 127.0.0.1
    #          hbase.zookeeper.property.clientPort: 2181
    #          zookeeper.znode.parent: /hbase
    #      - name: es
    #        hosts: 127.0.0.1:9300
    #        properties:
    #          cluster.name: elasticsearch
    
    1. conf/es/xx.yml xxx名字可以随意,这里不做匹配
    dataSourceKey: defaultDS
    destination: example2
    groupId: g1
    esMapping:
      _index: canal   //es索引库名
      _type: testTable  //es中的表名
      _id: custNo
      sql: "select db_code as custNo, db_name as ifApp, db_user as ifEver from dcp.dcp_db_config"
    

    相关文章

      网友评论

          本文标题:canal实现从mysql同步数据到es

          本文链接:https://www.haomeiwen.com/subject/yyduiltx.html