美文网首页
2、mysql-canal-zk-kafka-es数据同步

2、mysql-canal-zk-kafka-es数据同步

作者: kobe0429 | 来源:发表于2020-09-01 10:27 被阅读0次

方案总体介绍:通过zookeeper管理canal和kafka集群, zk本身也做集群配置;通过canal作为mysql的从库实时读取binlog,然后将数据以json格式发送到kafka平台,会有一个专门消费kafka消息的微服务,负责数据处理和转换;处理后的数据存储到elasticSearch,通过es的rest api向外提供查询服务。

一、mysql

1、首先为mysql数据库新建一个只读用户

2、设置mysql的模式ROW

[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复

service mysqld restart

二、zookeeper集群搭建

zk集群搭建比较简单,就是修改配置文件,废话不多,直接上命令:


# 设置zookeeper集群
cd /usr/local/src
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5-bin.tar.gz
tar -zxvf apache-zookeeper-3.5.5-bin.tar.gz -C /usr/local
cd /usr/local/apache-zookeeper-3.5.5-bin
 
mkdir -p /zkdata/{zookeeper-1,zookeeper-2,zookeeper-3}
 
cp conf/zoo_sample.cfg conf/zoo-1.cfg
# vim conf/zoo-1.cfg
    dataDir=/zkdata/zookeeper-1
    clientPort=2181
 
    server.1=127.0.0.1:2888:3888
    server.2=127.0.0.1:2889:3889
    server.3=127.0.0.1:2890:3890
 
cp conf/zoo-1.cfg conf/zoo-2.cfg
cp conf/zoo-1.cfg conf/zoo-3.cfg
 
vim conf/zoo-2.cfg
    dataDir=/zkdata/zookeeper-2
    clientPort=2182
 
    server.1=127.0.0.1:2888:3888
    server.2=127.0.0.1:2889:3889
    server.3=127.0.0.1:2890:3890
 
vim conf/zoo-3.cfg
    dataDir=/zkdata/zookeeper-3
    clientPort=2183
 
    server.1=127.0.0.1:2888:3888
    server.2=127.0.0.1:2889:3889
    server.3=127.0.0.1:2890:3890
 
echo '1' > /zkdata/zookeeper-1/myid
echo '2' > /zkdata/zookeeper-2/myid 
echo '3' > /zkdata/zookeeper-3/myid 
 
# 集群启动脚本
vim start.sh
    bash bin/zkServer.sh start conf/zoo-1.cfg
    bash bin/zkServer.sh start conf/zoo-2.cfg
    bash bin/zkServer.sh start conf/zoo-3.cfg
 
# 集群关闭脚本
vim start.sh
    bash bin/zkServer.sh stop conf/zoo-1.cfg
    bash bin/zkServer.sh stop conf/zoo-2.cfg
    bash bin/zkServer.sh stop conf/zoo-3.cfg

三、canal搭建

从下载到安装,启动,设置HA模式

# 下载
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
mkdir -p /usr/local/canal
tar -zxv -f canal.deployer-1.1.4.tar.gz -C /usr/local/canal

# 修改连接数据库的配置文件
cd /usr/local/canal
vim conf/example/instance.properties
    ## mysql serverId
    canal.instance.mysql.slaveId = 123
    #position info,需要改成自己的数据库信息
    canal.instance.master.address = 127.0.0.1:3306 
    canal.instance.master.journal.name = 
    canal.instance.master.position = 
    canal.instance.master.timestamp = 
    #canal.instance.standby.address = 
    #canal.instance.standby.journal.name =
    #canal.instance.standby.position = 
    #canal.instance.standby.timestamp = 
    #username/password,需要改成自己的数据库信息
    canal.instance.dbUsername = canal  
    canal.instance.dbPassword = canal!%123AD
    canal.instance.defaultDatabaseName =
    canal.instance.connectionCharset = UTF-8
    #table regex
    canal.instance.filter.regex = .\*\\\\..\*

# 启动
bash bin/startup.sh

# 查看 server 日志
tail -n 30 logs/canal/canal.log
    2019-09-20 09:48:46.987 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
    2019-09-20 09:48:47.019 [main] INFO  com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
    2019-09-20 09:48:47.028 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
    2019-09-20 09:48:47.059 [main] INFO  com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.0.187(192.168.0.187):11111]
    2019-09-20 09:48:48.228 [main] INFO  com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......

# 查看 instance 的日志
    2019-09-20 09:48:47.395 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
    2019-09-20 09:48:47.399 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
    2019-09-20 09:48:47.580 [main] WARN  o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]
    2019-09-20 09:48:47.626 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
    2019-09-20 09:48:47.626 [main] INFO  c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
    2019-09-20 09:48:48.140 [main] INFO  c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example 
    2019-09-20 09:48:48.147 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
    2019-09-20 09:48:48.147 [main] WARN  c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter : 
    2019-09-20 09:48:48.165 [main] INFO  c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
    2019-09-20 09:48:48.288 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
    2019-09-20 09:48:48.288 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
    2019-09-20 09:48:49.288 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN  c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000004,position=4,serverId=1,gtid=<null>,timestamp=1568943354000] cost : 989ms , the next step is binlog dump

# 关闭
bash bin/stop.sh

# 端口使用情况
ss -tlnp
State       Recv-Q Send-Q            Local Address:Port      Peer Address:Port              
LISTEN      0      50                   *:11110                  *:*                   users:(("java",pid=2078,fd=109))
LISTEN      0      50                   *:11111                  *:*                   users:(("java",pid=2078,fd=105))
LISTEN      0      3                    *:11112                  *:*                   users:(("java",pid=2078,fd=87))

# 端口号说明
# admin端口:11110
# tcp端口:11111
# metric端口:11112

四、kafka安装配置

# kafka集群,伪集群
cd /usr/local/src
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar -zxv -f kafka_2.12-2.3.0.tgz -C /usr/local/
cd /usr/local/kafka_2.12-2.3.0/config
mkdir -p /kafkadata/{kafka-1,kafka-2,kafka-3}
cp server.properties server-1.properties
vim server-1.properties
    broker.id=1
    delete.topic.enable=true
    listeners=PLAINTEXT://:9092
    advertised.listeners=PLAINTEXT://localhost:9092
    log.dirs=/kafkadata/kafka-1
    zookeeper.connect=localhost:2181,localhost:2182,localhost:2183

cp server-1.properties server-2.properties
vim server-2.properties
    broker.id=2
    delete.topic.enable=true
    listeners=PLAINTEXT://:9093
    log.dirs=/kafkadata/kafka-2
    zookeeper.connect=localhost:2181,localhost:2182,localhost:2183

cp server-1.properties server-3.properties
vim server-3.properties
    broker.id=3
    delete.topic.enable=true
    listeners=PLAINTEXT://:9094
    log.dirs=/kafkadata/kafka-3
    zookeeper.connect=localhost:2181,localhost:2182,localhost:2183

# 启动集群
vim start.sh
    #!/bin/bash

    bash bin/kafka-server-start.sh -daemon config/server-1.properties
    bash bin/kafka-server-start.sh -daemon config/server-2.properties
    bash bin/kafka-server-start.sh -daemon config/server-3.properties

# 停止集群
vim stop.sh 
    #!/bin/bash

    bash bin/kafka-server-stop.sh -daemon config/server-1.properties
    bash bin/kafka-server-stop.sh -daemon config/server-2.properties
    bash bin/kafka-server-stop.sh -daemon config/server-3.properties

五、消费kafka的微服务

这个微服务(基于springCloud框架)是个性化的,不同的场景(业务数据、日志监控)有不同的数据处理方案,不具有共通性,在此就不列出了。

六、ElasticSearch集群搭建

1.安装JDK

Elasticsearch是基于Java开发是一个Java程序,运行在Jvm中,所以第一步要安装JDK
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: &quot;Courier New&quot; !important; font-size: 12px !important;">yum install -y java-1.8.0-openjdk-devel # 安装1.8或1.8以上版本</pre>

2.下载elasticsearch

[https://www.elastic.co/cn/downloads/elasticsearch]是ELasticsearch的官方站点,如果需要下载最新的版本,进入官网下载即可。可以下载到本地电脑然后再导入CentOS中,也可以直接在CentOS中下载。

<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: &quot;Courier New&quot; !important; font-size: 12px !important;">wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.2.rpm</pre>

3.安装elasticsearch

<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: &quot;Courier New&quot; !important; font-size: 12px !important;">rpm -ivh elasticsearch-7.6.2.rpm</pre>

4.配置目录

安装完毕后会生成很多文件,包括配置文件日志文件等等,下面几个是最主要的配置文件路径

<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: &quot;Courier New&quot; !important; font-size: 12px !important;">/etc/elasticsearch/elasticsearch.yml                            # els的配置文件 /etc/elasticsearch/jvm.options                                  # JVM相关的配置,内存大小等等 /etc/elasticsearch/log4j2.properties                            # 日志系统定义 /usr/share/elasticsearch                                        # elasticsearch 默认安装目录 /var/lib/elasticsearch                                          # 数据的默认存放位置</pre>

5.创建用于存放数据与日志的目录

数据文件会随着系统的运行飞速增长,所以默认的日志文件与数据文件的路径不能满足我们的需求,那么手动创建日志与数据文件路径,可以使用NFS、可以使用Raid等等方便以后的管理与扩展

<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: &quot;Courier New&quot; !important; font-size: 12px !important;">mkdir -p /opt/elasticsearch/data
mkdir -p /opt/elasticsearch/log
chown -R elasticsearch.elasticsearch /opt/elasticsearch/*</pre>

6.集群配置

集群配置中最重要的两项是node.namenetwork.host,每个节点都必须不同。其中node.name是节点名称主要是在Elasticsearch自己的日志加以区分每一个节点信息。
```discovery.zen.ping.unicast.hosts`是集群中的节点信息,可以使用IP地址、可以使用主机名(必须可以解析)。
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">vim /etc/elasticsearch/elasticsearch.yml

cluster.name: my-els # 集群名称
node.name: els-node1 # 节点名称,仅仅是描述名称,用于在日志中区分

path.data: /opt/elasticsearch/data # 数据的默认存放路径
path.logs: /opt/elasticsearch/log # 日志的默认存放路径

network.host: 192.168.60.201 # 当前节点的IP地址
http.port: 9200 # 对外提供服务的端口,9300为集群服务的端口

添加如下内容

culster transport port

transport.tcp.port: 9300 transport.tcp.compress: true discovery.zen.ping.unicast.hosts: ["192.168.60.201", "192.168.60.202","192.168.60.203"]

集群个节点IP地址,也可以使用els、els.shuaiguoxia.com等名称,需要各节点能够解析

discovery.zen.minimum_master_nodes: 2 # 为了避免脑裂,集群节点数最少为 半数+1</pre>
注意:不要在elasticsearch.yml中添加index开头的配置项。如

<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">#index.number_of_shards: 5 #index.number_of_replicas: 1</pre>

####7、启动服务
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: &quot;Courier New&quot; !important; font-size: 12px !important;"># 需切换为es用户
su es
# 启动服务(当前的路径为:/usr/share/elasticsearch/)
./bin/elasticsearch</pre>
####8、后台运行ES
可以加入-p 命令 让es在后台运行, -p 参数 记录进程ID为一个文件
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: &quot;Courier New&quot; !important; font-size: 12px !important;"># 设置后台启动
./bin/elasticsearch -p /tmp/elasticsearch-pid -d</pre>
####9、结束进程
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: &quot;Courier New&quot; !important; font-size: 12px !important;"># 查看运行的pid
cat /tmp/elasticsearch-pid && echo
# 结束进程
kill -SIGTERM {pid}</pre>


相关文章

  • 2、mysql-canal-zk-kafka-es数据同步

    方案总体介绍:通过zookeeper管理canal和kafka集群, zk本身也做集群配置;通过canal作为m...

  • 数据技术篇

    大数据阶段 数据采集层(1)数据库同步(DataX/同步中心)(2)消息中间件(离线、实时) 数据计算层 数据服务...

  • 1.PDMan使用及自动生成代码

    1.打开PDMan开始->创建模块->创建数据库2.同步数据库到数据库模型模板->同步配置->保存版本 3.同步数...

  • DolphinScheduler 调度 DataX 实现 MyS

    数据同步的方式 数据同步的2大方式 基于SQL查询的 CDC(Change Data Capture):离线调度查...

  • redis和 mysql 数据同步的实践

    场景1:统计访问数量 场景2:后台修改数据的同步 场景3:redis数据的不同步问题 其他待续............

  • 数据同步

    数据同步无论是主备还是主从都牵扯到数据同步的问题,这也分2种情况:同步方式:当主机收到客户端写操作后,以同步方式把...

  • 设备参照的页面为空

    1、没有在数据管理中同步设备数据 2、没有分配公共基础中设备档案的查询权限 3、同步数据失败,需要开发跟踪解决

  • MySQL集群设计

    1、MySQL Proxy 实现读写分离 2、MySQL自带的同步方案replication实现数据同步 3、为了...

  • 华为手表WATCH GT 2/2e的坑

    入手2e一周问题列表: 1.数据同步出现中断,包括睡眠数据,血氧饱和度数据等。尤其是睡眠数据,连续多天数据同步中断...

  • 面试UI相关

    UI相关 1. UI视图数据源同步 并发访问,数据同步 (内存消耗 ) 串行访问(子线程耗时,会有延时) 2. 事...

网友评论

      本文标题:2、mysql-canal-zk-kafka-es数据同步

      本文链接:https://www.haomeiwen.com/subject/luwmsktx.html