美文网首页
canal.adapter 实现mysql数据库同步

canal.adapter 实现mysql数据库同步

作者: Herman7z | 来源:发表于2019-01-07 23:16 被阅读0次

    一、首先需要安装canal-server, 这里使用docker来运行canal-server

    1. 下载最近的release版本 https://github.com/alibaba/canal/releases,解压后拷贝 conf 目录到新的目录 /Users/huaan9527/Documents/docker/canal-server/conf
    2. 根据canal-server的文档

    修改 canal.properties :

    canal.serverMode = kafka   # tcp, kafka, RocketMQ
    canal.destinations = highso  # instance的名字  多个使用逗号分隔
    canal.mq.servers = 192.168.16.xxx:9092 #kafka集群的地址
    

    修改instance.properties

    canal.instance.master.address=192.168.16.xxx:3306  #需要监控的mysql数据库地址
    canal.instance.dbUsername=xxx
    canal.instance.dbPassword=xxx
    canal.instance.connectionCharset = UTF-8
    canal.instance.defaultDatabaseName =highso
    canal.instance.filter.regex=highso\\.crmtest,highso\\.test   #监控的数据库表
    
    canal.mq.topic=common_mq_highso  #kafka 中topic的名字
    
    1. 启动canal-server容器的脚本
    #!/bin/bash
    
    function usage() {
        echo "Usage:"
        echo "  run.sh [CONFIG]"
        echo "example:"
        echo "  run.sh -e canal.instance.master.address=127.0.0.1:3306 \\"
        echo "         -e canal.instance.dbUsername=canal \\"
        echo "         -e canal.instance.dbPassword=canal \\"
        echo "         -e canal.instance.connectionCharset=UTF-8 \\"
        echo "         -e canal.instance.tsdb.enable=true \\"
        echo "         -e canal.instance.gtidon=false \\"
        echo "         -e canal.instance.filter.regex=.*\\\\\\..* "
        exit
    }
    
    function check_port() {
        local port=$1
        local TL=$(which telnet)
        if [ -f $TL ]; then
            data=`echo quit | telnet 127.0.0.1 $port| grep -ic connected`
            echo $data
            return
        fi
    
        local NC=$(which nc)
        if [ -f $NC ]; then
            data=`nc -z -w 1 127.0.0.1 $port | grep -ic succeeded`
            echo $data
            return
        fi
        echo "0"
        return
    }
    
    function getMyIp() {
        case "`uname`" in
            Darwin)
             myip=`echo "show State:/Network/Global/IPv4" | scutil | grep PrimaryInterface | awk '{print $3}' | xargs ifconfig | grep inet | grep -v inet6 | awk '{print $2}'`
             ;;
            *)
             myip=`ip route get 1 | awk '{print $NF;exit}'`
             ;;
      esac
      echo $myip
    }
    
    NET_MODE=""
    case "`uname`" in
        Darwin)
            bin_abs_path=`cd $(dirname $0); pwd`
            ;;
        Linux)
            bin_abs_path=$(readlink -f $(dirname $0))
            NET_MODE="--net=host"
            ;;
        *)
            NET_MODE="--net=host"
            bin_abs_path=`cd $(dirname $0); pwd`
            ;;
    esac
    BASE=${bin_abs_path}
    if [ "$1" == "-h" ] ; then
        usage
    elif [ "$1" == "help" ] ; then
        usage
    fi
    
    # DATA="$BASE/data"
    # mkdir -p $DATA
    VOLUMNS="-v $BASE/conf:/home/admin/canal-server/conf -v $BASE/logs:/home/admin/canal-server/logs"
    PORTLIST="8000 2222 11111 11112"
    PORTS=""
    for PORT in $PORTLIST ; do
        #exist=`check_port $PORT`
        exist="0"
        if [ "$exist" == "0" ]; then
            PORTS="$PORTS -p $PORT:$PORT"
        else
            echo "port $PORT is used , pls check"
            exit 1
        fi
    done
    
    MEMORY="-m 2048m"
    LOCALHOST=`getMyIp`
    cmd="docker run -d -it -h $LOCALHOST --name=canal-server $VOLUMNS $NET_MODE $PORTS $MEMORY canal/canal-server:v1.1.2"
    echo $cmd
    eval $cmd
    

    二、配置canal.adapter 实现 mysql数据库3306 同步 到 mysql数据库3308

    1. 下载canal.adapter 解压 https://github.com/alibaba/canal/releases
    2. 编辑application.yml
    server:
      port: 8081
    logging:
      level:
        org.springframework: INFO
        com.alibaba.otter.canal.client.adapter.hbase: DEBUG
        com.alibaba.otter.canal.client.adapter.es: DEBUG
        com.alibaba.otter.canal.client.adapter.rdb: DEBUG
    spring:
      jackson:
        date-format: yyyy-MM-dd HH:mm:ss
        time-zone: GMT+8
        default-property-inclusion: non_null
    
    canal.conf:
      mqServers: 192.168.16.xxx:9092 #or rocketmq
      flatMessage: true
      batchSize: 500
      syncBatchSize: 1000
      retries: 0
      timeout:
      accessKey:
      secretKey:
      mode: kafka # tcp kafka rocketMQ
      srcDataSources:
        demoSrc:
          url: jdbc:mysql://192.168.16.xxx:3306/demo?useUnicode=true
          username: xxx
          password: xxx
      canalAdapters:
      - instance: common_mq_highso # canal instance Name or mq topic name
        groups:
        - groupId: g1
          outerAdapters:
          - name: logger
          - name: rdb
            key: demoDes
            properties:
              jdbc.driverClassName: com.mysql.jdbc.Driver
              jdbc.url: jdbc:mysql://192.168.16.xxx:3308/demo?useUnicode=true
              jdbc.username: xxx
              jdbc.password: xxx
    
    1. 编辑rdb目录下面表的映射文件,数据库demo, 表 test
    dataSourceKey: demoSrc
    destination: common_mq_highso
    outerAdapterKey: demoDes
    concurrent: true
    dbMapping:
      database: demo
      table: test
      targetTable: demo.test
      targetPk:
        id: id
      mapAll: true
    
    1. 启动canal.adapter : ./startup.sh

    三、测试

    1. 在3306数据库中插入一条数据库,检查是否在3308中同步
    2. 更新
    3. 删除
    4. 批量插入
    5. 批量更新
    6. 批量删除

    相关文章

      网友评论

          本文标题:canal.adapter 实现mysql数据库同步

          本文链接:https://www.haomeiwen.com/subject/socmrqtx.html