美文网首页Apache Kafkaconfluent
数据库实时转移之Confluent环境搭建(二)

数据库实时转移之Confluent环境搭建(二)

作者: 七分熟pizza | 来源:发表于2019-03-18 21:30 被阅读2次

    1.前言

    上一篇主要介绍了Confluent的基本概念,如果对Confluent不了解的请回看上篇文章。

    2.系统架构

    为了保证系统可靠性,真实生产环境中都会以集群的方式搭建,以避免单机宕机造成的影响。本文以3台机器,MySQL作为源/目的数据库来进行数据库的转移实验。

    整个系统的整体结构如下图所示,因为每个组件都是独立提供服务且都能以集群的方式进行工作,因此本实验把每个服务都分别部署到3台机器来模拟集群环境。本系统主要用了Zookeeper、Kafka、Kafka-Connect、Schema-Registry 4种服务,整体架构如下:


    集群架构

    整个系统工作流程是Source Connector集群从源MySQL DB中不断实时读取变动数据(增/删/改)再经过Schema-Registry序列化后插入到Kafka消息队列中,Sink Connector会不断从Kafka消息队列中获取数据再经过反序列化插入到目的MySQL DB中。

    3.Confluent 安装

    本文以CentOS操作系统为实验环境。

    3.1 安装JDK1.8

    下载JDK1.8 64位,解压到安装目录。

    设置java环境变量,在~/.bashrc文件中增加以下信息。

     export PATH=<path-to-java>/bin:${PATH}; 
     export CLASSPATH=.:<path-to-java>/lib/dt.jar: <path-to-java>/lib/tools.jar
    

    执行source~/.bashrc 使之生效。

    3.2 Confluent 安装

    下载Confluent Community版,下载链接为https://www.confluent.io/download/,解压到安装目录,添加环境变量。

    export PATH=<path-to-confluent>/bin:${PATH};
    export CLASSPATH=<path-to-confluent>/share/java/*:${CLASSPATH}
    

    执行source~/.bashrc 使之生效。

    3.3 安装mysql-connector-jdbc.jar

    因为实验数据库为MySQL,因此下载MySQL驱动包。

    下载jar包,然后放到<path-to-confluent>/java/kafka/目录下面。

    4.服务配置

    4.1 Zookeeper 配置

    编辑<path-to-confluent>/etc/kafka/zookeeper.properites文件,修改以下配置信息。

    完整配置信息可以参看链接:

    https://docs.confluent.io/current/zookeeper/deployment.html

    tickTime=2000 #时间单元,毫秒单位
    dataDir=/var/lib/zookeeper/ #数据存储路径
    clientPort=2181 #zookeeper 客户端监听端口 initLimit=5 #followers 初始化时间
    syncLimit=2 #followers 同步时间
    maxClientCnxns=0 #最大client连接数,值为0的时候没有上限
    server.< myid >=< hostname >:< leaderport >:< electionport > 集群配置
    server.1=< IP1>:2888:3888 #server1 地址,修改为自身地址
    server.2=< IP2>:2888:3888 #server2 地址
    server.3=< IP3>:2888:3888 #server3 地址
    autopurge.snapRetainCount=3 #最近的快照保存数目
    autopurge.purgeInterval=24 #快照自动清除时间间隔

    修改好配置文件后,需要在每台Zookeeper Server的dataDir目录下创建myid文件来在集群中作为

    唯一标示。

    例如:

    server1:  echo 1 > myid
    server2:  echo 2 > myid
    server3:  echo 3 > myid
    

    集群中所有配置信息需保持一致,每次修改配置文件,都必须重启相应机器的服务才能生效。

    4.2 Kafka 配置

    编辑<path-to-confluent>/etc/kafka/server.properites文件,修改以下配置信息。

    完整配置信息可以参看链接:

    https://docs.confluent.io/current/kafka/operations.html

    集群地址,修改为自己地址

    zookeeper.connect=xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181
    broker.id=[1,2,3] #节点标识,每台机器依次改为1,2,3….依次递增 log.dirs=/xxx/log/kafka #日志目录,修改为自身log目录 listeners=PLAINTEXT://0.0.0.0:9092 #监听地址 advertised.listeners=PLAINTEXT://0.0.0.0:9092 #发布到zookeeper供客户端连接的地址
    num.partitions=3 #分区数设置,分区间数据无序,分区内数据有序。若需要保证有序,此处设置为1
    default.replication.factor=3 #消息备份数目默认1不做复制。此处改为2,备份一份。
    port=9092 #服务端口,默认9092

    集群中除broker.id外所有配置信息需保持一致,每次修改配置文件,都必须重启相应机器的服务才能生效。

    4.3 Schema Registry 配置

    编辑<path-to-confluent>/etc/schema-registry/schema-registry.properites 文件,修改以下配置信息。

    完整配置信息可以参看链接

    https://docs.confluent.io/current/schema-registry/docs/index.html

    listeners=http://0.0.0.0:8081 #监听地址
    host.name=172.21.101.186 #主机地址,改为机器本身地址 port #端口号,默认8081
    kafkastore.connection.url=xxx.xxx.xxx.xxx: 2181,xxx.xxx.xxx.xxx: 2181,xxx.xxx.xxx.xxx:2181 #集群列表,改为机器相应地址

    集群中除host.name设置为本身ip外,所有配置信息均需保持一致,每次修改配置文件,都必须重启相应机器的服务才能生效。

    4.4 Kafka-Connect

    4.4.1 Connect 配置

    编辑<path-to-confluent>/etc/schema-registry/connect-avro-distributed.properties.properites 文件,修改以下配置信息。

    完整参数参考链接:

    https://docs.confluent.io/current/connect/index.html

    group.id=connect-cluster #Connect集群组标示,集群中此处所有配置必须相同。
    kafka节点列表,host1:port1,host2:port2,...修改为相应机器地址 bootstrap.servers=xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181,xxx.xxx.xxx.xxx:2181
    key.converter=io.confluent.connect.avro.AvroConverter #使用Avro为key转化类
    key.converter.schema.registry.url=http:// xxx.xxx.xxx.xxx:8081 #key的schema 访问URL value.converter= io.confluent.connect.avro.AvroConverter #使用Avro为value 转化类
    value.Converter.schema.registry.url=http:// xxx.xxx.xxx.xxx:8081 #value的schema 访问URL
    config.storage.replication.factor=3 #config信息备份因子,不超过集群数目 offset.storage.replication.factor=3 #偏移量备份因子,不超过集群数目 status.storage.replication.factor=3 #任务状态备份因子,不超过集群数目

    集群中除schema.registry.url设置为本身ip外,集群中所有配置信息均需保持一致,每次修改配置文件,都必须重启相应机器的服务才能生效。

    4.4.2 Connect-JDBC

    Kafka Connect主要由两部分组成,分别是Source Connector 和Sink Connector。Source Connector 负责从数据库中把信息读入到Kafka,Sink Connector负责把数据从Kafka 中读到数据库。实验使用的是MySql数据库,因此我们主要介绍Confluent JDBC Connector。

    完整参数参考链接:

    https://docs.confluent.io/current/connect/connect-jdbc/docs/index.html

    4.4.2.1 JDBC Source Connector

    以下是各个参数说明,可以根据实际业务场景来配置各个参数。

    connection.url #数据库连接地址

    connection.user #数据库用户名
    connection.password #数据库密码
    table.whitelist #查询表白名单
    connection.attempts #数据库连接次数
    numeric.precision.mapping #是否根据进度判断数据类型
    table.blacklist #表黑名单
    connection.backoff.ms #数据库尝试连接间隔时间
    schema.pattern #查询表时使用的schema mode #更新表时候用的模式,主要有以下4种模式.

      Bulk:每次都全部查询。 
      timestamp:根据时间戳字段是否变化来检测数据是否增加或更新。
      incrementing :用自增长字段来检测数据是否有增加,不能检测数据变化和删除。
      timestamp+incrementing :根据时间戳和自增长字段来检查新增更新数据,根据自增长字段来标示唯一的流数据。 incrementing.column.name  #用来判断是否有新增数据的自增长字段名称,该字段值不允许为空值。
    

    timestamp.column.name #用来判断数据是否有新增和更新的时间戳字段,该字段值不允许为空值
    validate.non.null #设置是否检测数据库中自增长和时间戳字段不允许为空值,如果检查失败connector就停止启动。
    query #设置数据查询语句,如果设置了就不进行全表轮询,而是只是用此sql语句去提取数据
    query.condition #设置自定义查询条件,会拼接到where语句后面。(非自带,修改代码添加此参数)
    poll.interval.ms #数据轮询时间间隔,对数据库执行查询语句的时间间隔,此参数对数据库性能有影响
    batch.max.rows # connector每次轮询获取数据的最大数,默认100条,和connector获取数据的性能有关
    table.poll.interval.ms #检查表是否有增加或删除的时间间隔。
    topic.prefix #使用普通查询时候以topic.prefix +表名 作为topic.自定义query语句时候以topic.prefix作为topic,此处写目的库的表名
    table.types #设置要查询的表类型,默认为table,还可以设置view、system table。 timestamp.delay.interval.ms #延迟转移时间间隔,可以延迟数据转移

    4.4.2.2 JDBC Sink Connector

    以下是各个参数说明,根据实际业务场景来配置各个参数。

    connection.url #数据库连接URL
    connection.user #数据库连接用户名
    connection.password #数据库密码。
    insert.mode # 插入数据模式 insert,upsert,update。本项目使用upsert模式,在没有相应主键数据时候直接插入,否则进行更新操作。
    batch.size #每次插入数据的最大数,和数据库性能有关
    topics #订阅的主题,也是目的数据库的表名
    table.name.format #插入表面格式化,默认为${topic}
    pk.mode #主键模式。None:不设置主键,kafka:使用kafka坐标作为主键,record_key: 使用record 的key字段作为主键,record_value:使用record 的value 字段作为主键。
    pk.fields #主键字段,以逗号分隔。项目中使用目的表的虚拟主键或者唯一约束字段。
    fields.whitelist #插入字段白名单,如果为空则所有字段都使用
    auto.create #是否自动创建目的表。
    auto.evolve #当表结构发生变化时候,是否自动修改目的表
    max.retries #失败重试次数 retry.backoff.ms #重试时间间隔

    4.4.3 Connector REST API

    在集群环境下,Connector参数只能通过RESTful接口进行参数配置,通过RESTful接口可以给任意一个服务器发送配置参数,Connector参数信息会自动转发给集群中的其它机器。使用RESTful接口配置参数后,Connector服务无需重启,即可生效。

    下面详细描述Connector RESTful接口的配置参数。

    4.3.3.1 RESTful Header

    目前 REST接口只支持Json格式参数,因此请求头应该如下设置。

    Accept: application/json
    Content-Type: application/json 
    
    4.3.3.2 RESTful URL
    接口列表

    5. 服务启动

    5.1 启动Zookeeper

    zookeeper-server-start  <path-to-confluent>/etc/kafka/zookeeper.properties 
    

    5.2 启动 Kafka

    kafka-server-start      <path-to-confluent>/etc/kafka/server.properties
    

    5.3 启动Schema-registry-start

    schema-registry-start   <path-to-confluent>/etc/schema-registry/schema-registry.properties
    

    5.4 启动Connector

    connect-distributed     <path-to-confluent>/etc/schema-registry/connect-avro-distributed.properties
    

    6. 启动Connector任务

    使用Postman对Connector的REST API 接口进行访问配置。示例如下:

    6.1 Header 设置

    Headers Tab页面增加以下参数。


    header设置

    6.2 配置Source Connector

    Source Connectors配置

    发送请求后即可完成任务的配置。

    6.3 配置Sink Connector

    Sink Connector 配置

    配置完毕点击Send按钮,即完成了Source、Sink Connector任务的配置和启动。此时若向Source 数据库添加或更改数据,目的数据库会实时更新过来。

    7.服务停止

    在集群中的机器中依次关闭以下服务,关闭时候,需要保证关闭顺序的正确性。

    7.1 关闭Connect-jdbc

    confluent connect stop
    

    执行命令后若提示 connect is [DOWN],则代表connect服务关闭成功。

    7.2 关闭Schema-registry

    confluent schema-registry stop
    

    执行命令后若提示 schema-registry is [DOWN],则代表schema-registry服务关闭成功。

    7.3 关闭Kafka

    confluent kafka stop
    

    执行命令后若提示 kafka is [DOWN],则代表upkafka服务关闭成功。

    7.4 关闭Zookeeper

    confluent zookeeper stop
    

    执行命令后若提示 zookeeper is [DOWN],则代表zookeeper服务关闭成功。

    8. 小结

    通过实验所有数据都能实时进行转移并没有遗漏。

    在破坏性测试中如杀掉服务进程、下线集群中的某台机器等异常测试中,集群都能正常的进行转移工作。

    即使把集群中所有机器的服务都停止,当服务重启后,Confluent会把宕机期间的数据转移过来。可以看出Confluent确实如官方宣传所言,是一个高性能,高可靠的系统。

    到此,本文也基本结束,希望此教程能帮助所有需要的人。

    相关文章

      网友评论

        本文标题:数据库实时转移之Confluent环境搭建(二)

        本文链接:https://www.haomeiwen.com/subject/sdrimqtx.html