美文网首页
canal同步binlog数据到kafka

canal同步binlog数据到kafka

作者: liangxifeng833 | 来源:发表于2020-08-12 12:10 被阅读0次

下载安装canal

  • 我下载的是最新稳定版本,下载地址:https://github.com/alibaba/canal/releases/download/canal-1.1.3/canal.deployer-1.1.3.tar.gz

  • 解压缩

      mkdir -p /usr/local/canal
      cp   canal.deployer-1.1.3.tar.gz   /usr/local/canal
      tar -zxvf canal.deployer-1.1.3.tar.gz 
      cd canal.deployer-1.1.3
    
  • 修改instance配置文件 conf/example/instance.properties

        #修改mysql数据库的连接信息.
      canal.instance.master.address=127.0.0.1:3306
      canal.instance.dbUsername=root
      canal.instance.dbPassword=123456
      #编码设置=utf-8即可,即使db_ljlj库是latin1,这里也设置utf-8
      #否则乱码
      canal.instance.connectionCharset = utf-8
      #选择监控的数据库
      canal.instance.defaultDatabaseName=db_ljlj
      # 白名单配置,指定数据库中的表进行同步
      canal.instance.filter.regex=db_ljlj.product_conume
    
      #修改kafka队列配置
      canal.mq.topic=hello #主题名
      ##mysql库名.表名: 唯一主键,多个表之间用逗号分隔
      canal.mq.partitionHash=db_ljlj.product_conume:consume_id^name,.*\\..*
    
  • 修改canal 配置文件/usr/local/canal/conf/canal.properties

        # 可选项: tcp(默认), kafka, RocketMQ
        canal.serverMode = kafka
        # kafka/rocketmq 集群配置:       
        canal.mq.servers = 127.0.0.1:9092,127.0.0.1:9093
        canal.mq.retries = 0
        # flagMessage模式下可以调大该值, 但不要超过MQ消息体大小上限
        canal.mq.batchSize = 16384
        canal.mq.maxRequestSize = 1048576
        # flatMessage模式下请将该值改大, 建议50-200
        canal.mq.lingerMs = 1
        canal.mq.bufferMemory = 33554432
        # Canal的batch size, 默认50K, 由于kafka最大消息体限制请勿超过1M(900K以下)
        canal.mq.canalBatchSize = 50
        # Canal get数据的超时时间, 单位: 毫秒, 空为不限超时
        canal.mq.canalGetTimeout = 100
        # 是否为flat json格式对象
        canal.mq.flatMessage = true
        canal.mq.compressionType = none
        canal.mq.acks = all
        # kafka消息投递是否使用事务
        canal.mq.transaction = false
    

启动 和关闭 canal

  • 启动
      cd /usr/local/canal/
      sh bin/startup.sh
    
  • 查看日志
      //a. 查看 logs/canal/canal.log
      vim logs/canal/canal.log
      //b. 查看instance的日志:
      vim logs/example/example.log
    
  • 关闭
     cd /usr/local/canal/
    sh bin/stop.sh
    
  • 新增product_conume表数据后,canal从binlog中获取新增到kafka中的数据格式:
          {
              "data": [
                {
                    "consume_id": "1886382", 
                    "consume_pact_id": "151201121", 
                    "consume_merchant_id": null, 
                }
              ], 
              "database": "db_ljlj", 
              "es": 1585301895000, 
              "id": 5, 
              "isDdl": false, 
              "mysqlType": {
                  "consume_id": "int(10)", 
                  "consume_pact_id": "int(20) unsigned", 
                  "consume_merchant_id": "int(10)", 
               }, 
              "old": null, 
              "pkNames": null, 
              "sql": "", 
              "sqlType": {
                "consume_id": 4, 
                "consume_pact_id": 4, 
                "consume_merchant_id": 4, 
              }, 
              "table": "product_conume", 
              "ts": 1585301895411, 
              "type": "INSERT"
          }  
    

相关文章

网友评论

      本文标题:canal同步binlog数据到kafka

      本文链接:https://www.haomeiwen.com/subject/qhlruhtx.html