方案总体介绍:通过zookeeper管理canal和kafka集群, zk本身也做集群配置;通过canal作为mysql的从库实时读取binlog,然后将数据以json格式发送到kafka平台,会有一个专门消费kafka消息的微服务,负责数据处理和转换;处理后的数据存储到elasticSearch,通过es的rest api向外提供查询服务。
一、mysql
1、首先为mysql数据库新建一个只读用户
2、设置mysql的模式ROW
[mysqld]
log-bin=mysql-bin # 开启 binlog
binlog-format=ROW # 选择 ROW 模式
server_id=1 # 配置 MySQL replaction 需要定义,不要和 canal 的 slaveId 重复
service mysqld restart
二、zookeeper集群搭建
zk集群搭建比较简单,就是修改配置文件,废话不多,直接上命令:
# 设置zookeeper集群
cd /usr/local/src
wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.5.5/apache-zookeeper-3.5.5-bin.tar.gz
tar -zxvf apache-zookeeper-3.5.5-bin.tar.gz -C /usr/local
cd /usr/local/apache-zookeeper-3.5.5-bin
mkdir -p /zkdata/{zookeeper-1,zookeeper-2,zookeeper-3}
cp conf/zoo_sample.cfg conf/zoo-1.cfg
# vim conf/zoo-1.cfg
dataDir=/zkdata/zookeeper-1
clientPort=2181
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
cp conf/zoo-1.cfg conf/zoo-2.cfg
cp conf/zoo-1.cfg conf/zoo-3.cfg
vim conf/zoo-2.cfg
dataDir=/zkdata/zookeeper-2
clientPort=2182
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
vim conf/zoo-3.cfg
dataDir=/zkdata/zookeeper-3
clientPort=2183
server.1=127.0.0.1:2888:3888
server.2=127.0.0.1:2889:3889
server.3=127.0.0.1:2890:3890
echo '1' > /zkdata/zookeeper-1/myid
echo '2' > /zkdata/zookeeper-2/myid
echo '3' > /zkdata/zookeeper-3/myid
# 集群启动脚本
vim start.sh
bash bin/zkServer.sh start conf/zoo-1.cfg
bash bin/zkServer.sh start conf/zoo-2.cfg
bash bin/zkServer.sh start conf/zoo-3.cfg
# 集群关闭脚本
vim start.sh
bash bin/zkServer.sh stop conf/zoo-1.cfg
bash bin/zkServer.sh stop conf/zoo-2.cfg
bash bin/zkServer.sh stop conf/zoo-3.cfg
三、canal搭建
从下载到安装,启动,设置HA模式
# 下载
wget https://github.com/alibaba/canal/releases/download/canal-1.1.4/canal.deployer-1.1.4.tar.gz
mkdir -p /usr/local/canal
tar -zxv -f canal.deployer-1.1.4.tar.gz -C /usr/local/canal
# 修改连接数据库的配置文件
cd /usr/local/canal
vim conf/example/instance.properties
## mysql serverId
canal.instance.mysql.slaveId = 123
#position info,需要改成自己的数据库信息
canal.instance.master.address = 127.0.0.1:3306
canal.instance.master.journal.name =
canal.instance.master.position =
canal.instance.master.timestamp =
#canal.instance.standby.address =
#canal.instance.standby.journal.name =
#canal.instance.standby.position =
#canal.instance.standby.timestamp =
#username/password,需要改成自己的数据库信息
canal.instance.dbUsername = canal
canal.instance.dbPassword = canal!%123AD
canal.instance.defaultDatabaseName =
canal.instance.connectionCharset = UTF-8
#table regex
canal.instance.filter.regex = .\*\\\\..\*
# 启动
bash bin/startup.sh
# 查看 server 日志
tail -n 30 logs/canal/canal.log
2019-09-20 09:48:46.987 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## set default uncaught exception handler
2019-09-20 09:48:47.019 [main] INFO com.alibaba.otter.canal.deployer.CanalLauncher - ## load canal configurations
2019-09-20 09:48:47.028 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## start the canal server.
2019-09-20 09:48:47.059 [main] INFO com.alibaba.otter.canal.deployer.CanalController - ## start the canal server[192.168.0.187(192.168.0.187):11111]
2019-09-20 09:48:48.228 [main] INFO com.alibaba.otter.canal.deployer.CanalStarter - ## the canal server is running now ......
# 查看 instance 的日志
2019-09-20 09:48:47.395 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2019-09-20 09:48:47.399 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2019-09-20 09:48:47.580 [main] WARN o.s.beans.GenericTypeAwarePropertyDescriptor - Invalid JavaBean property 'connectionCharset' being accessed! Ambiguous write methods found next to actually used [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.lang.String)]: [public void com.alibaba.otter.canal.parse.inbound.mysql.AbstractMysqlEventParser.setConnectionCharset(java.nio.charset.Charset)]
2019-09-20 09:48:47.626 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [canal.properties]
2019-09-20 09:48:47.626 [main] INFO c.a.o.c.i.spring.support.PropertyPlaceholderConfigurer - Loading properties file from class path resource [example/instance.properties]
2019-09-20 09:48:48.140 [main] INFO c.a.otter.canal.instance.spring.CanalInstanceWithSpring - start CannalInstance for 1-example
2019-09-20 09:48:48.147 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table filter : ^.*\..*$
2019-09-20 09:48:48.147 [main] WARN c.a.o.canal.parse.inbound.mysql.dbsync.LogEventConvert - --> init table black filter :
2019-09-20 09:48:48.165 [main] INFO c.a.otter.canal.instance.core.AbstractCanalInstance - start successful....
2019-09-20 09:48:48.288 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> begin to find start position, it will be long time for reset or first position
2019-09-20 09:48:48.288 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - prepare to find start position just show master status
2019-09-20 09:48:49.288 [destination = example , address = /127.0.0.1:3306 , EventParser] WARN c.a.o.c.p.inbound.mysql.rds.RdsBinlogEventParserProxy - ---> find start position successfully, EntryPosition[included=false,journalName=mysql-bin.000004,position=4,serverId=1,gtid=<null>,timestamp=1568943354000] cost : 989ms , the next step is binlog dump
# 关闭
bash bin/stop.sh
# 端口使用情况
ss -tlnp
State Recv-Q Send-Q Local Address:Port Peer Address:Port
LISTEN 0 50 *:11110 *:* users:(("java",pid=2078,fd=109))
LISTEN 0 50 *:11111 *:* users:(("java",pid=2078,fd=105))
LISTEN 0 3 *:11112 *:* users:(("java",pid=2078,fd=87))
# 端口号说明
# admin端口:11110
# tcp端口:11111
# metric端口:11112
四、kafka安装配置
# kafka集群,伪集群
cd /usr/local/src
wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/2.3.0/kafka_2.12-2.3.0.tgz
tar -zxv -f kafka_2.12-2.3.0.tgz -C /usr/local/
cd /usr/local/kafka_2.12-2.3.0/config
mkdir -p /kafkadata/{kafka-1,kafka-2,kafka-3}
cp server.properties server-1.properties
vim server-1.properties
broker.id=1
delete.topic.enable=true
listeners=PLAINTEXT://:9092
advertised.listeners=PLAINTEXT://localhost:9092
log.dirs=/kafkadata/kafka-1
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
cp server-1.properties server-2.properties
vim server-2.properties
broker.id=2
delete.topic.enable=true
listeners=PLAINTEXT://:9093
log.dirs=/kafkadata/kafka-2
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
cp server-1.properties server-3.properties
vim server-3.properties
broker.id=3
delete.topic.enable=true
listeners=PLAINTEXT://:9094
log.dirs=/kafkadata/kafka-3
zookeeper.connect=localhost:2181,localhost:2182,localhost:2183
# 启动集群
vim start.sh
#!/bin/bash
bash bin/kafka-server-start.sh -daemon config/server-1.properties
bash bin/kafka-server-start.sh -daemon config/server-2.properties
bash bin/kafka-server-start.sh -daemon config/server-3.properties
# 停止集群
vim stop.sh
#!/bin/bash
bash bin/kafka-server-stop.sh -daemon config/server-1.properties
bash bin/kafka-server-stop.sh -daemon config/server-2.properties
bash bin/kafka-server-stop.sh -daemon config/server-3.properties
五、消费kafka的微服务
这个微服务(基于springCloud框架)是个性化的,不同的场景(业务数据、日志监控)有不同的数据处理方案,不具有共通性,在此就不列出了。
六、ElasticSearch集群搭建
1.安装JDK
Elasticsearch是基于Java开发是一个Java程序,运行在Jvm中,所以第一步要安装JDK
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">yum install -y java-1.8.0-openjdk-devel # 安装1.8或1.8以上版本</pre>
2.下载elasticsearch
[https://www.elastic.co/cn/downloads/elasticsearch]是ELasticsearch的官方站点,如果需要下载最新的版本,进入官网下载即可。可以下载到本地电脑然后再导入CentOS中,也可以直接在CentOS中下载。
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">wget https://artifacts.elastic.co/downloads/elasticsearch/elasticsearch-7.6.2.rpm</pre>
3.安装elasticsearch
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">rpm -ivh elasticsearch-7.6.2.rpm</pre>
4.配置目录
安装完毕后会生成很多文件,包括配置文件日志文件等等,下面几个是最主要的配置文件路径
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">/etc/elasticsearch/elasticsearch.yml # els的配置文件 /etc/elasticsearch/jvm.options # JVM相关的配置,内存大小等等 /etc/elasticsearch/log4j2.properties # 日志系统定义 /usr/share/elasticsearch # elasticsearch 默认安装目录 /var/lib/elasticsearch # 数据的默认存放位置</pre>
5.创建用于存放数据与日志的目录
数据文件会随着系统的运行飞速增长,所以默认的日志文件与数据文件的路径不能满足我们的需求,那么手动创建日志与数据文件路径,可以使用NFS、可以使用Raid等等方便以后的管理与扩展
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">mkdir -p /opt/elasticsearch/data
mkdir -p /opt/elasticsearch/log
chown -R elasticsearch.elasticsearch /opt/elasticsearch/*</pre>
6.集群配置
集群配置中最重要的两项是node.name
与network.host
,每个节点都必须不同。其中node.name
是节点名称主要是在Elasticsearch自己的日志加以区分每一个节点信息。
```discovery.zen.ping.unicast.hosts`是集群中的节点信息,可以使用IP地址、可以使用主机名(必须可以解析)。
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">vim /etc/elasticsearch/elasticsearch.yml
cluster.name: my-els # 集群名称
node.name: els-node1 # 节点名称,仅仅是描述名称,用于在日志中区分
path.data: /opt/elasticsearch/data # 数据的默认存放路径
path.logs: /opt/elasticsearch/log # 日志的默认存放路径
network.host: 192.168.60.201 # 当前节点的IP地址
http.port: 9200 # 对外提供服务的端口,9300为集群服务的端口
添加如下内容
culster transport port
transport.tcp.port: 9300 transport.tcp.compress: true discovery.zen.ping.unicast.hosts: ["192.168.60.201", "192.168.60.202","192.168.60.203"]
集群个节点IP地址,也可以使用els、els.shuaiguoxia.com等名称,需要各节点能够解析
discovery.zen.minimum_master_nodes: 2 # 为了避免脑裂,集群节点数最少为 半数+1</pre>
注意:不要在elasticsearch.yml中添加index开头的配置项。如
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;">#index.number_of_shards: 5 #index.number_of_replicas: 1</pre>
####7、启动服务
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;"># 需切换为es用户
su es
# 启动服务(当前的路径为:/usr/share/elasticsearch/)
./bin/elasticsearch</pre>
####8、后台运行ES
可以加入-p 命令 让es在后台运行, -p 参数 记录进程ID为一个文件
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;"># 设置后台启动
./bin/elasticsearch -p /tmp/elasticsearch-pid -d</pre>
####9、结束进程
<pre style="margin: 0px; padding: 0px; overflow-wrap: break-word; font-family: "Courier New" !important; font-size: 12px !important;"># 查看运行的pid
cat /tmp/elasticsearch-pid && echo
# 结束进程
kill -SIGTERM {pid}</pre>
网友评论