1.前言
上一篇主要介绍了Confluent的基本概念,如果对Confluent不了解的请回看上篇文章。
2.系统架构
为了保证系统可靠性,真实生产环境中都会以集群的方式搭建,以避免单机宕机造成的影响。本文以3台机器,MySQL作为源/目的数据库来进行数据库的转移实验。
整个系统的整体结构如下图所示,因为每个组件都是独立提供服务且都能以集群的方式进行工作,因此本实验把每个服务都分别部署到3台机器来模拟集群环境。本系统主要用了Zookeeper、Kafka、Kafka-Connect、Schema-Registry 4种服务,整体架构如下:
集群架构
整个系统工作流程是Source Connector集群从源MySQL DB中不断实时读取变动数据(增/删/改)再经过Schema-Registry序列化后插入到Kafka消息队列中,Sink Connector会不断从Kafka消息队列中获取数据再经过反序列化插入到目的MySQL DB中。
3.Confluent 安装
本文以CentOS操作系统为实验环境。
3.1 安装JDK1.8
下载JDK1.8 64位,解压到安装目录。
设置java环境变量,在~/.bashrc文件中增加以下信息。
export PATH=<path-to-java>/bin:${PATH};
export CLASSPATH=.:<path-to-java>/lib/dt.jar: <path-to-java>/lib/tools.jar
执行source~/.bashrc 使之生效。
3.2 Confluent 安装
下载Confluent Community版,下载链接为https://www.confluent.io/download/,解压到安装目录,添加环境变量。
export PATH=<path-to-confluent>/bin:${PATH};
export CLASSPATH=<path-to-confluent>/share/java/*:${CLASSPATH}
执行source~/.bashrc 使之生效。
3.3 安装mysql-connector-jdbc.jar
因为实验数据库为MySQL,因此下载MySQL驱动包。
下载jar包,然后放到<path-to-confluent>/java/kafka/目录下面。
4.服务配置
4.1 Zookeeper 配置
编辑<path-to-confluent>/etc/kafka/zookeeper.properites文件,修改以下配置信息。
完整配置信息可以参看链接:
https://docs.confluent.io/current/zookeeper/deployment.html
tickTime=2000 #时间单元,毫秒单位
dataDir=/var/lib/zookeeper/ #数据存储路径
clientPort=2181 #zookeeper 客户端监听端口 initLimit=5 #followers 初始化时间
syncLimit=2 #followers 同步时间
maxClientCnxns=0 #最大client连接数,值为0的时候没有上限
server.< myid >=< hostname >:< leaderport >:< electionport > 集群配置
server.1=< IP1>:2888:3888 #server1 地址,修改为自身地址
server.2=< IP2>:2888:3888 #server2 地址
server.3=< IP3>:2888:3888 #server3 地址
autopurge.snapRetainCount=3 #最近的快照保存数目
autopurge.purgeInterval=24 #快照自动清除时间间隔
修改好配置文件后,需要在每台Zookeeper Server的dataDir目录下创建myid文件来在集群中作为
唯一标示。
例如:
server1: echo 1 > myid
server2: echo 2 > myid
server3: echo 3 > myid
集群中所有配置信息需保持一致,每次修改配置文件,都必须重启相应机器的服务才能生效。
4.2 Kafka 配置
编辑<path-to-confluent>/etc/kafka/server.properites文件,修改以下配置信息。
完整配置信息可以参看链接:
https://docs.confluent.io/current/kafka/operations.html
集群地址,修改为自己地址
zookeeper.connect=xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181
broker.id=[1,2,3] #节点标识,每台机器依次改为1,2,3….依次递增 log.dirs=/xxx/log/kafka #日志目录,修改为自身log目录 listeners=PLAINTEXT://0.0.0.0:9092 #监听地址 advertised.listeners=PLAINTEXT://0.0.0.0:9092 #发布到zookeeper供客户端连接的地址
num.partitions=3 #分区数设置,分区间数据无序,分区内数据有序。若需要保证有序,此处设置为1
default.replication.factor=3 #消息备份数目默认1不做复制。此处改为2,备份一份。
port=9092 #服务端口,默认9092
集群中除broker.id外所有配置信息需保持一致,每次修改配置文件,都必须重启相应机器的服务才能生效。
4.3 Schema Registry 配置
编辑<path-to-confluent>/etc/schema-registry/schema-registry.properites 文件,修改以下配置信息。
完整配置信息可以参看链接
https://docs.confluent.io/current/schema-registry/docs/index.html
listeners=http://0.0.0.0:8081 #监听地址
host.name=172.21.101.186 #主机地址,改为机器本身地址 port #端口号,默认8081
kafkastore.connection.url=xxx.xxx.xxx.xxx: 2181,xxx.xxx.xxx.xxx: 2181,xxx.xxx.xxx.xxx:2181 #集群列表,改为机器相应地址
集群中除host.name设置为本身ip外,所有配置信息均需保持一致,每次修改配置文件,都必须重启相应机器的服务才能生效。
4.4 Kafka-Connect
4.4.1 Connect 配置
编辑<path-to-confluent>/etc/schema-registry/connect-avro-distributed.properties.properites 文件,修改以下配置信息。
完整参数参考链接:
https://docs.confluent.io/current/connect/index.html
group.id=connect-cluster #Connect集群组标示,集群中此处所有配置必须相同。
kafka节点列表,host1:port1,host2:port2,...修改为相应机器地址 bootstrap.servers=xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181
key.converter=io.confluent.connect.avro.AvroConverter #使用Avro为key转化类
key.converter.schema.registry.url=http:// xxx.xxx.xxx.xxx:8081 #key的schema 访问URL value.converter= io.confluent.connect.avro.AvroConverter #使用Avro为value 转化类
value.Converter.schema.registry.url=http:// xxx.xxx.xxx.xxx:8081 #value的schema 访问URL
config.storage.replication.factor=3 #config信息备份因子,不超过集群数目 offset.storage.replication.factor=3 #偏移量备份因子,不超过集群数目 status.storage.replication.factor=3 #任务状态备份因子,不超过集群数目
集群中除schema.registry.url设置为本身ip外,集群中所有配置信息均需保持一致,每次修改配置文件,都必须重启相应机器的服务才能生效。
4.4.2 Connect-JDBC
Kafka Connect主要由两部分组成,分别是Source Connector 和Sink Connector。Source Connector 负责从数据库中把信息读入到Kafka,Sink Connector负责把数据从Kafka 中读到数据库。实验使用的是MySql数据库,因此我们主要介绍Confluent JDBC Connector。
完整参数参考链接:
https://docs.confluent.io/current/connect/connect-jdbc/docs/index.html
4.4.2.1 JDBC Source Connector
以下是各个参数说明,可以根据实际业务场景来配置各个参数。
connection.url #数据库连接地址
connection.user #数据库用户名
connection.password #数据库密码
table.whitelist #查询表白名单
connection.attempts #数据库连接次数
numeric.precision.mapping #是否根据进度判断数据类型
table.blacklist #表黑名单
connection.backoff.ms #数据库尝试连接间隔时间
schema.pattern #查询表时使用的schema mode #更新表时候用的模式,主要有以下4种模式.Bulk:每次都全部查询。 timestamp:根据时间戳字段是否变化来检测数据是否增加或更新。 incrementing :用自增长字段来检测数据是否有增加,不能检测数据变化和删除。 timestamp+incrementing :根据时间戳和自增长字段来检查新增更新数据,根据自增长字段来标示唯一的流数据。 incrementing.column.name #用来判断是否有新增数据的自增长字段名称,该字段值不允许为空值。
timestamp.column.name #用来判断数据是否有新增和更新的时间戳字段,该字段值不允许为空值
validate.non.null #设置是否检测数据库中自增长和时间戳字段不允许为空值,如果检查失败connector就停止启动。
query #设置数据查询语句,如果设置了就不进行全表轮询,而是只是用此sql语句去提取数据
query.condition #设置自定义查询条件,会拼接到where语句后面。(非自带,修改代码添加此参数)
poll.interval.ms #数据轮询时间间隔,对数据库执行查询语句的时间间隔,此参数对数据库性能有影响
batch.max.rows # connector每次轮询获取数据的最大数,默认100条,和connector获取数据的性能有关
table.poll.interval.ms #检查表是否有增加或删除的时间间隔。
topic.prefix #使用普通查询时候以topic.prefix +表名 作为topic.自定义query语句时候以topic.prefix作为topic,此处写目的库的表名
table.types #设置要查询的表类型,默认为table,还可以设置view、system table。 timestamp.delay.interval.ms #延迟转移时间间隔,可以延迟数据转移
4.4.2.2 JDBC Sink Connector
以下是各个参数说明,根据实际业务场景来配置各个参数。
connection.url #数据库连接URL
connection.user #数据库连接用户名
connection.password #数据库密码。
insert.mode # 插入数据模式 insert,upsert,update。本项目使用upsert模式,在没有相应主键数据时候直接插入,否则进行更新操作。
batch.size #每次插入数据的最大数,和数据库性能有关
topics #订阅的主题,也是目的数据库的表名
table.name.format #插入表面格式化,默认为${topic}
pk.mode #主键模式。None:不设置主键,kafka:使用kafka坐标作为主键,record_key: 使用record 的key字段作为主键,record_value:使用record 的value 字段作为主键。
pk.fields #主键字段,以逗号分隔。项目中使用目的表的虚拟主键或者唯一约束字段。
fields.whitelist #插入字段白名单,如果为空则所有字段都使用
auto.create #是否自动创建目的表。
auto.evolve #当表结构发生变化时候,是否自动修改目的表
max.retries #失败重试次数 retry.backoff.ms #重试时间间隔
4.4.3 Connector REST API
在集群环境下,Connector参数只能通过RESTful接口进行参数配置,通过RESTful接口可以给任意一个服务器发送配置参数,Connector参数信息会自动转发给集群中的其它机器。使用RESTful接口配置参数后,Connector服务无需重启,即可生效。
下面详细描述Connector RESTful接口的配置参数。
4.3.3.1 RESTful Header
目前 REST接口只支持Json格式参数,因此请求头应该如下设置。
Accept: application/json
Content-Type: application/json
4.3.3.2 RESTful URL
接口列表5. 服务启动
5.1 启动Zookeeper
zookeeper-server-start <path-to-confluent>/etc/kafka/zookeeper.properties
5.2 启动 Kafka
kafka-server-start <path-to-confluent>/etc/kafka/server.properties
5.3 启动Schema-registry-start
schema-registry-start <path-to-confluent>/etc/schema-registry/schema-registry.properties
5.4 启动Connector
connect-distributed <path-to-confluent>/etc/schema-registry/connect-avro-distributed.properties
6. 启动Connector任务
使用Postman对Connector的REST API 接口进行访问配置。示例如下:
6.1 Header 设置
Headers Tab页面增加以下参数。
header设置
6.2 配置Source Connector
Source Connectors配置发送请求后即可完成任务的配置。
6.3 配置Sink Connector
Sink Connector 配置配置完毕点击Send按钮,即完成了Source、Sink Connector任务的配置和启动。此时若向Source 数据库添加或更改数据,目的数据库会实时更新过来。
7.服务停止
在集群中的机器中依次关闭以下服务,关闭时候,需要保证关闭顺序的正确性。
7.1 关闭Connect-jdbc
confluent connect stop
执行命令后若提示 connect is [DOWN],则代表connect服务关闭成功。
7.2 关闭Schema-registry
confluent schema-registry stop
执行命令后若提示 schema-registry is [DOWN],则代表schema-registry服务关闭成功。
7.3 关闭Kafka
confluent kafka stop
执行命令后若提示 kafka is [DOWN],则代表upkafka服务关闭成功。
7.4 关闭Zookeeper
confluent zookeeper stop
执行命令后若提示 zookeeper is [DOWN],则代表zookeeper服务关闭成功。
8. 小结
通过实验所有数据都能实时进行转移并没有遗漏。
在破坏性测试中如杀掉服务进程、下线集群中的某台机器等异常测试中,集群都能正常的进行转移工作。
即使把集群中所有机器的服务都停止,当服务重启后,Confluent会把宕机期间的数据转移过来。可以看出Confluent确实如官方宣传所言,是一个高性能,高可靠的系统。
到此,本文也基本结束,希望此教程能帮助所有需要的人。
网友评论