美文网首页
Canal安装

Canal安装

作者: 淡淡的小番茄 | 来源:发表于2021-03-29 09:40 被阅读0次

    一、安装canal_admin

    从官方下载canal.admin-1.1.4.tar.gz文件。

    1、脚本执行

    连接mysql数据库,执行canal_manager.sql脚本。

    执行成功后,效果如下:

    2、canal.admin安装

    1)解压文件canal.admin-1.1.4.tar.gz

    2)修改配置文件application.yml

    注:红色部分修改为实际的配置。

    3)配置集群及主配置

    通过界面配置集群信息:zookeeper的地址和主配置信息。

    canal.properties,载入主配置,然后按需修改:

    #################################################

    ######### common argument #############

    #################################################

    # tcp bind ip

    canal.ip = 172.30.xxx.xxx

    # register ip to zookeeper

    canal.register.ip = 172.30.xxx.xxx

    canal.port = 11111

    canal.metrics.pull.port = 11112

    # canal instance user/passwd

    canal.user = canal

    canal.passwd = E3619321C1A937C46A0D8BD1DAC39F93B27D4458

    # canal admin config

    canal.admin.manager = 172.30.xxx.xxx:8090

    canal.admin.port = 11110

    canal.admin.user = admin

    canal.admin.passwd = 29C41D42A39BA0FD07C298463639112C9A190F9A

    ....

    # tcp, kafka, RocketMQ

    canal.serverMode = tcp

    canal.zkServers = 172.30.xxx.xxx:2181,172.30.xxx.yxx:2181,172.30.xxx.uux:2181

    # flush data to zk

    canal.zookeeper.flush.period = 1000

    canal.withoutNetty = false

    # tcp, kafka, RocketMQ

    canal.serverMode = tcp

    # flush meta cursor/parse position to file

    canal.file.data.dir = ${canal.conf.dir}

    canal.file.flush.period = 1000

    ## memory store RingBuffer size, should be Math.pow(2,n)

    canal.instance.memory.buffer.size = 16384

    ## memory store RingBuffer used memory unit size , default 1kb

    canal.instance.memory.buffer.memunit = 1024

    ## meory store gets mode used MEMSIZE or ITEMSIZE

    canal.instance.memory.batch.mode = MEMSIZE

    canal.instance.memory.rawEntry = true

    ## detecing config

    canal.instance.detecting.enable = false

    #canal.instance.detecting.sql = insert into retl.xdual values(1,now()) on duplicate key update x=now()

    canal.instance.detecting.sql = select 1

    canal.instance.detecting.interval.time = 3

    canal.instance.detecting.retry.threshold = 3

    canal.instance.detecting.heartbeatHaEnable = false

    # support maximum transaction size, more than the size of the transaction will be cut into multiple transactions delivery

    canal.instance.transaction.size =  1024

    # mysql fallback connected to new master should fallback times

    canal.instance.fallbackIntervalInSeconds = 60

    # network config

    canal.instance.network.receiveBufferSize = 16384

    canal.instance.network.sendBufferSize = 16384

    canal.instance.network.soTimeout = 30

    # binlog filter config

    canal.instance.filter.druid.ddl = true

    canal.instance.filter.query.dcl = false

    canal.instance.filter.query.dml = false

    canal.instance.filter.query.ddl = false

    canal.instance.filter.table.error = false

    canal.instance.filter.rows = false

    canal.instance.filter.transaction.entry = false

    # binlog format/image check

    canal.instance.binlog.format = ROW,STATEMENT,MIXED

    canal.instance.binlog.image = FULL,MINIMAL,NOBLOB

    # binlog ddl isolation

    canal.instance.get.ddl.isolation = false

    # parallel parser config

    canal.instance.parser.parallel = true

    ## concurrent thread number, default 60% available processors, suggest not to exceed Runtime.getRuntime().availableProcessors()

    #canal.instance.parser.parallelThreadSize = 16

    ## disruptor ringbuffer size, must be power of 2

    canal.instance.parser.parallelBufferSize = 256

    # table meta tsdb info

    canal.instance.tsdb.enable = true

    canal.instance.tsdb.dir = ${canal.file.data.dir:../conf}/${canal.instance.destination:}

    canal.instance.tsdb.url = jdbc:h2:${canal.instance.tsdb.dir}/h2;CACHE_SIZE=1000;MODE=MYSQL;

    canal.instance.tsdb.dbUsername = canal

    canal.instance.tsdb.dbPassword = canal

    # dump snapshot interval, default 24 hour

    canal.instance.tsdb.snapshot.interval = 24

    # purge snapshot expire , default 360 hour(15 days)

    canal.instance.tsdb.snapshot.expire = 360

    # aliyun ak/sk , support rds/mq

    canal.aliyun.accessKey =

    canal.aliyun.secretKey =

    #################################################

    ######### destinations #############

    #################################################

    canal.destinations =

    # conf root dir

    canal.conf.dir = ../conf

    # auto scan instance dir add/remove and start/stop instance

    canal.auto.scan = true

    canal.auto.scan.interval = 5

    canal.instance.tsdb.spring.xml = classpath:spring/tsdb/h2-tsdb.xml

    #canal.instance.tsdb.spring.xml = classpath:spring/tsdb/mysql-tsdb.xml

    canal.instance.global.mode = manager

    canal.instance.global.lazy = false

    canal.instance.global.manager.address = ${canal.admin.manager}

    #canal.instance.global.spring.xml = classpath:spring/memory-instance.xml

    canal.instance.global.spring.xml = classpath:spring/file-instance.xml

    #canal.instance.global.spring.xml = classpath:spring/default-instance.xml

    ##################################################

    #########     MQ     #############

    ##################################################

    canal.mq.servers = 127.0.0.1:6667

    canal.mq.retries = 0

    canal.mq.batchSize = 16384

    canal.mq.maxRequestSize = 1048576

    canal.mq.lingerMs = 100

    canal.mq.bufferMemory = 33554432

    canal.mq.canalBatchSize = 50

    canal.mq.canalGetTimeout = 100

    canal.mq.flatMessage = true

    canal.mq.compressionType = none

    canal.mq.acks = all

    #canal.mq.properties. =

    canal.mq.producerGroup = test

    # Set this value to "cloud", if you want open message trace feature in aliyun.

    canal.mq.accessChannel = local

    # aliyun mq namespace

    #canal.mq.namespace =

    ##################################################

    #########    Kafka Kerberos Info    #############

    ##################################################

    canal.mq.kafka.kerberos.enable = false

    canal.mq.kafka.kerberos.krb5FilePath = "../conf/kerberos/krb5.conf"

    canal.mq.kafka.kerberos.jaasFilePath = "../conf/kerberos/jaas.conf"

    二、安装canal_deployer

    从官方下载canal.deployer-1.1.4.tar.gz文件。

    1)解压文件canal.admin-1.1.4.tar.gz

    2)修改canal_local.properties

    我们有网段要求,需要直接绑定到指定IP。


    # register ip

    canal.register.ip = 172.30.xxx.xxx

    # canal admin config

    canal.admin.manager = 172.30.xxx.xxx:8089

    canal.admin.port = 11110

    canal.admin.user = admin

    canal.admin.passwd = 29C41D42A39BA0FD07C298463639112C9A190F9A

    # admin auto register

    canal.admin.register.auto = true

    canal.admin.register.cluster = Canal_Master


    3)启动

    ./startup.sh local

    4)通过界面进行实例新建即可。

    instance.properties配置参考:


    ## mysql serverId , v1.0.26+ will autoGen

    # canal.instance.mysql.slaveId=0

    # enable gtid use true/false

    canal.instance.gtidon=false

    # position info

    canal.instance.master.address=172.30.xxx.xxx:13306

    canal.instance.master.journal.name=

    canal.instance.master.position=

    canal.instance.master.timestamp=

    canal.instance.master.gtid=

    # rds oss binlog

    canal.instance.rds.accesskey=

    canal.instance.rds.secretkey=

    canal.instance.rds.instanceId=

    # table meta tsdb info

    canal.instance.tsdb.enable=true

    #canal.instance.tsdb.url=jdbc:mysql://127.0.0.1:3306/canal_tsdb

    #canal.instance.tsdb.dbUsername=canal

    #canal.instance.tsdb.dbPassword=canal

    #canal.instance.standby.address =

    #canal.instance.standby.journal.name =

    #canal.instance.standby.position =

    #canal.instance.standby.timestamp =

    #canal.instance.standby.gtid=

    # username/password

    canal.instance.dbUsername=root

    canal.instance.dbPassword=xxx#2020

    canal.instance.connectionCharset = UTF-8

    # enable druid Decrypt database password

    canal.instance.enableDruid=false

    #canal.instance.pwdPublicKey=MFwwDQYJKoZIhvcNAQEBBQADSwAwSAJBALK4BUxdDltRRE5/zXpVEVPUgunvscYFtEip3pmLlhrWpacX7y7GCMo2/JM6LeHmiiNdH1FWgGCpUfircSwlWKUCAwEAAQ==

    # table regex

    canal.instance.filter.regex=dmp_device_0\.device_[0-9]+,dmp_device_1\.device_[0-9]+

    # table black regex

    canal.instance.filter.black.regex=

    # table field filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)

    #canal.instance.filter.field=test1.t_product:id/subject/keywords,test2.t_company:id/name/contact/ch

    # table field black filter(format: schema1.tableName1:field1/field2,schema2.tableName2:field1/field2)

    #canal.instance.filter.black.field=test1.t_product:subject/product_image,test2.t_company:id/name/contact/ch

    # mq config

    canal.mq.topic=example

    # dynamic topic route by schema or table regex

    #canal.mq.dynamicTopic=mytest1.user,mytest2\\..*,.*\\..*

    canal.mq.partition=0

    # hash partition config

    #canal.mq.partitionsNum=3

    #canal.mq.partitionHash=test.table:id^name,.*\\..*


    需要注意的点:

    1、canal_deployer部署的时候,两台机器与canal-admin建议都在一个用户下,建议别在root下安装。

    2、canal_deployer IP信息具体指定下。

    3、canal admin password要和数据库一致。canal_deployer里的密码配置也需要配置正确。

    密码生成方式,MySQL里执行如下语句:select password('xxx#canal');

    相关文章

      网友评论

          本文标题:Canal安装

          本文链接:https://www.haomeiwen.com/subject/catphltx.html