美文网首页
mysql+canal+kafka+canal-adapter+

mysql+canal+kafka+canal-adapter+

作者: 宋小红帽 | 来源:发表于2020-04-20 10:47 被阅读0次

背景:公司订单数据越来越多,因为有海外业务,数据也比较分散,业务查询数据,需要遍历多个库,查询数据慢,需要做一个数仓来统一查询且需要延时控制

架构图如下:

测试环境,资源有限,没有做canal-server集群,为了改善性能,canal-server  跟canal-adapter之间 加了一层 kafka,图中没有展示

 资源下载:

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.adapter-1.1.4.tar.gz

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.admin-1.1.4.tar.gz

wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz

wget https://mirror.bit.edu.cn/apache/kafka/2.5.0/kafka_2.12-2.5.0.tgz

wget http://mirrors.hust.edu.cn/apache/zookeeper/zookeeper-3.4.14/zookeeper-3.4.14.tar.gz

提前安装好java 环境 版本推荐jdk1.8

步骤一:

先安装 kafka,zk,es  安装都比较简单,如果需要集群,请参考官网文档

1.1 zookeeper

解压

#tar -zxvf zookeeper-3.4.14.tar.gz &&cd zookeeper-3.4.14

#cp conf/zoo_sample.cfg conf/zoo.cfg

 修改数据目录与日志目录

cat  conf/zoo.cfg<< EOF

dataDir=/mnt/zookeeper/data

dataLogDir=/mnt/zookeeper/log

EOF

启动 zookeeper

# bin/zkServer.sh start

1.2 kafka

解压

#tar zxvf kafka_2.12-2.5.0.tgz  && cd  kafka_2.12-2.5.0

修改依赖的zookeeper地址,及其自己的port  跟日志存放路径 

#cat config/server.properties << EOF

zookeeper.connect=localhost:2181

listeners=PLAINTEXT://:9092

log.dirs=/mnt/kafka-logs

启动

./bin/kafka-server-start.sh -daemon ../config/server.properties

1.3 ES

创建 ES 用户 useradd es && passwd es && su - es

解压

#tar -zxvf elasticsearch-6.2.4.tar.gz && cd elasticsearch-6.2.4

修改配置文件

#cat conf/elasticsearch.yml <<EOF

cluster.name: test  ##集群名称,如果是单机可不填

network.host: 0.0.0.0

http.port:9200

path.data: /data/es/data

path.logs: /data/es/logs

# 增加参数,使head插件可以访问es

http.cors.enabled: true

http.cors.allow-origin: "*"

EOF

启动

./bin/elasticsearch  -d

验证:curl -X GET http://localhost:9200

1.4 安装elasticsearch-head 查看ES数据

wget https://github.com/mobz/elasticsearch-head/archive/master.zip

unzip master.zip && cd elasticsearch-head-master

修改配置文件

cat   Gruntfile.js <<EOF

安装 node.js

#curl-sL https://rpm.nodesource.com/setup_8.x|bash-

#yum install-y nodejs

#npm install -g grunt-cli && npm install 

启动

grunt server

IE  访问 http://ip:9400

1.5 安装canal 伪装 slave  并配置web界面 

解压 tar zxvf  canal.deployer-1.1.4.tar.gz  -C /data/canal-server

解压 tar zxvf  canal.admin-1.1.4.tar.gz  -C /data/canal-web

先配置canal-web

cd  /data/canal-web   && cat /data/canal-web/conf/application.yml<<EOF

server:

  port: 8089

spring:

  jackson:

    date-format: yyyy-MM-dd HH:mm:ss

    time-zone: GMT+8

spring.datasource:

  address: 127.0.0.1:3306

  database: canal_manager

  username: admin

  password: admin

  driver-class-name: com.mysql.jdbc.Driver

  url: jdbc:mysql://${spring.datasource.address}/${spring.datasource.database}?useUnicode=true&characterEncoding=UTF-8&useSSL=false

  hikari:

    maximum-pool-size: 30

    minimum-idle: 1

canal:

  adminUser: admin

  adminPasswd: 123456

配置元数据库,并初始化 

mysql -uadmin -padmin <canal_manager.sql

启动:

/bin/start.sh

http://127.0.0.1:8089

配置canal

cd  /data/canal-server   && cat /data/canal-server/conf/canal.properties<<EOF

# canal admin config

canal.admin.manager = 127.0.0.1:8089

canal.admin.port = 11110

canal.admin.user = admin

canal.admin.passwd = 6BB4837EB74329105EE4568DDA7DC67ED2CA2AD9

canal.admin.register.auto = true

canal.admin.register.cluster =

##配置kafka

canal.serverMode = kafka

canal.mq.servers = 127.0.0.1:9092

canal.mq.retries = 0

canal.mq.batchSize = 16384

canal.mq.maxRequestSize = 1048576

canal.mq.lingerMs = 100

canal.mq.bufferMemory = 33554432

canal.mq.canalBatchSize = 50

canal.mq.canalGetTimeout = 100

canal.mq.flatMessage = true

canal.mq.compressionType = none

启动 ./bin/start.sh

 这个时候可以通过 canal-web  配置 instance  http://127.0.0.1:8089

1.6安装canal-adapter

解压 tar zxvf  canal.deployer-1.1.4.tar.gz  -C /data/canal-deployer && cd /data/canal-deployer

修改配置文件:

srcDataSources:

    defaultDS:

      url: jdbc:mysql://127.0.0.1:3306/test?useUnicode=true

      username: admin

      password: admin

  canalAdapters:

  - instance: example # canal instance Name or mq topic name

    groups:

    - groupId: g1

      outerAdapters:

      - name: logger

- name: es

        hosts: 127.0.0.1:9200 # 127.0.0.1:9200 for rest mode

        properties:

          mode: rest # or rest

          cluster.name: test

建立表的映射关系

cat es/conf/canal.yml <<

dataSourceKey: defaultDS

destination: example

groupId: g1

esMapping:

  _index: test

  _type: _doc

  _id: _id

  pk: id

  upsert: true

  sql: "select t.id as _id, t.name, t.address from test t"

  etlCondition: "where t._id>={100}"

  commitBatch: 3000

启动

./bin/start.sh

 通过 es-head 创建索引

{

    "mappings":{

        "_doc":{

            "properties":{

                "name":{

                    "type":"text"                },

                "address":{

                    "type":"text"                }

            }

        }

    }

}

测试

相关文章

  • mysql+canal+kafka+canal-adapter+

    背景:公司订单数据越来越多,因为有海外业务,数据也比较分散,业务查询数据,需要遍历多个库,查询数据慢,需要做一个数...

网友评论

      本文标题:mysql+canal+kafka+canal-adapter+

      本文链接:https://www.haomeiwen.com/subject/jatlihtx.html