美文网首页Apache KafkakafkaConfluent 组件
Kafka Connect JDBC Connector使用教程

Kafka Connect JDBC Connector使用教程

作者: 扶我起来改bug | 来源:发表于2019-04-09 21:12 被阅读8次

Kafka Connect JDBC Connector使用教程

本文章介绍如何使用kafka JDBC Connector,步骤很详细,Connector部署本教程在此省略,如有其他疑问可在底部留言或者在底部扫描添加作者微信,添加时还请备注。

1. Kafka Connect JDBC Mysql Source Connector

以下示例演示如何将数据从mysql导入至kafka的topic中

  1. 下载mysql驱动包,mysql-connector-java,经测试8.0.15版本的目前不可用,使用的是 5.1.47 这个版本。
  2. 将下载好的mysql驱动包 mysql-connector-java-5.1.47.jar 复制到插件安装目录,我是用docker启动的,所有将jar包copy至容器的目录 eg: docker container: /usr/share/java/kafka-connect-jdbc,重启Kafka Connect。具体安装可参考官方文档:Install Connectors
  3. 在mysql数据库中新建表 person
CREATE TABLE `person` (
  `pid` INT(11) NOT NULL AUTO_INCREMENT,
  `firstname` VARCHAR(255) CHARACTER SET utf8 DEFAULT NULL,
  `age` INT(11) DEFAULT NULL,
  PRIMARY KEY (`pid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
  1. 使用 REST API创建connectors
curl -X POST -H "Content-Type: application/json" \
--data '{"name":"mysql-source-person","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","connection.url":"jdbc:mysql://127.0.0.1:3306/kyletest","connection.user":"root","connection.password":"123456","value.converter.schemas.enable":"true","value.converter":"org.apache.kafka.connect.json.JsonConverter","topic.prefix":"mysql","tasks.max":"1","table.whitelist":"person","mode":"incrementing","incrementing.column.name":"pid"}}' \
http://127.0.0.1:8083/connectors

# 请求响应如下
{
    "name":"mysql-source-person",
    "config":{
        "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
        "connection.url":"jdbc:mysql://127.0.0.1:3306/kyletest",
        "connection.user":"root",
        "connection.password":"123456",
        "value.converter.schemas.enable":"true",
        "value.converter":"org.apache.kafka.connect.json.JsonConverter",
        "topic.prefix":"mysql",
        "tasks.max":"1",
        "table.whitelist":"person",
        "mode":"incrementing",
        "incrementing.column.name":"pid",
        "name":"mysql-source-person"
    },
    "tasks":[],
    "type":"source"
}
  1. 使用REST API创建connectors查看connectors是否正常
curl -X GET http://127.0.0.1:8083/connectors/mysql-source-person/status

# 请求响应如下
{
    "name":"mysql-source-person",
    "connector":{
        "state":"RUNNING",
        "worker_id":"127.0.0.1:8083"
    },
    "tasks":[
        {
            "id":0,
            "state":"RUNNING",
            "worker_id":"127.0.0.1:8083"
        }
    ],
    "type":"source"
}
  1. 在表中插入一条数据,然后查看topic:mysqlperson 数据是否成功写入。
$ ./kafka-console-consumer --bootstrap-server 127.0.0.1:8092 --topic mysqlperson --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"pid"},{"type":"string","optional":true,"field":"firstname"},{"type":"int32","optional":true,"field":"age"}],"optional":false,"name":"person"},"payload":{"pid":1,"firstname":"kyle","age":20}}
  1. 关于该Connector的详细参数可参考:JDBC Source Configuration Options
  2. 官方示例: Kafka Connect JDBC Source Connector

2. Kafka Connect JDBC Mysql Sink Connector

以下示例演示如何将数据从kafka的topic导入至mysql中

  1. 下载mysql驱动包,mysql-connector-java,经测试8.0.15版本的目前不可用,使用的是 5.1.47 这个版本。
  2. 将下载好的mysql驱动包 mysql-connector-java-5.1.47.jar 复制到插件安装目录,我是用docker启动的,所有将jar包copy至容器的目录 eg: docker container: /usr/share/java/kafka-connect-jdbc,重启Kafka Connect。具体安装可参考官方文档:Install Connectors
  3. 在mysql数据库中新建表 kafkaperson
CREATE TABLE `kafkaperson` (
  `pid` INT(11) NOT NULL AUTO_INCREMENT,
  `firstname` VARCHAR(255) CHARACTER SET utf8 DEFAULT NULL,
  `age` INT(11) DEFAULT NULL,
  PRIMARY KEY (`pid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
  1. 使用 REST API创建connectors
curl -X POST -H "Content-Type: application/json" \
--data '{"name":"mysql-sink-person","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","connection.url":"jdbc:mysql://127.0.0.1:3306/kylesinktest","connection.user":"root","connection.password":"123456","tasks.max":"1","topics":"mysqlperson","value.converter.schemas.enable":"true","value.converter":"org.apache.kafka.connect.json.JsonConverter","table.name.format":"kafkaperson","auto.create":"true","insert.mode":"upsert","pk.mode":"record_value","pk.fields":"pid"}}' \
http://127.0.0.1:8083/connectors

# 请求响应如下
{
    "name":"mysql-sink-person",
    "config":{
        "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
        "connection.url":"jdbc:mysql://127.0.0.1:3306/kylesinktest",
        "connection.user":"root",
        "connection.password":"123456",
        "tasks.max":"1",
        "topics":"mysqlperson",
        "value.converter.schemas.enable":"true",
        "value.converter":"org.apache.kafka.connect.json.JsonConverter",
        "table.name.format":"kafkaperson",
        "auto.create":"true",
        "insert.mode":"upsert",
        "pk.mode":"record_value",
        "pk.fields":"pid",
        "name":"mysql-sink-person"
    },
    "tasks":[
 
    ],
    "type":"sink"
}
  1. 使用REST API创建connectors查看connectors是否正常
curl -X GET http://127.0.0.1:8083/connectors/mysql-sink-person/status

# 请求响应如下
{
    "name":"mysql-sink-person",
    "connector":{
        "state":"RUNNING",
        "worker_id":"127.0.0.1:8083"
    },
    "tasks":[
        {
            "id":0,
            "state":"RUNNING",
            "worker_id":"127.0.0.1:8083"
        }
    ],
    "type":"sink"
}
  1. 查看表 kafkaperson 是否成功有数据写入
SELECT * FROM kafkaperson
firstname pid age
kyle 1 2
  1. 关于该Connector的详细参数可参考:JDBC Source Configuration Options
  2. 官方示例: Kafka Connect JDBC Source Connector

3. Kafka Connect JDBC Cassandra Sink Connector

以下示例演示如何将数据从kafka的topic导入至Cassandra中

  1. 下载 Kafka Connect Cassandra插件,并安装,具体安装可参考官方文档:Install Connectors (本示例Kafka Connect 使用docker启动,故直接将下载好的confluentinc-kafka-connect-cassandra-1.0.2.zip解压,将所有文件copy值 容器 /usr/share/java/kafka-connect-cassandra目录内,重启容器即可。)
  2. 使用 REST API创建connectors
curl -X POST -H "Content-Type: application/json" \
--data '{"name":"cassandraSink","config":{"connector.class":"io.confluent.connect.cassandra.CassandraSinkConnector","transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey","cassandra.contact.points":"127.0.0.1","tasks.max":"1","topics":"mysqlperson","cassandra.keyspace.create.enabled":"true","transforms":"createKey","cassandra.keyspace":"kyletest","cassandra.write.mode":"Insert","transforms.createKey.fields":"pid","value.converter.schemas.enable":"true","value.converter":"org.apache.kafka.connect.json.JsonConverter"}}' \
http://127.0.0.1:8083/connectors

# 请求响应如下
{
    "name": "cassandraSink",
    "config": {
        "connector.class": "io.confluent.connect.cassandra.CassandraSinkConnector",
        "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
        "cassandra.contact.points": "127.0.0.1",
        "tasks.max": "1",
        "topics": "mysqlperson",
        "cassandra.keyspace.create.enabled": "true",
        "transforms": "createKey",
        "cassandra.keyspace": "kyletest",
        "cassandra.write.mode": "Insert",
        "transforms.createKey.fields": "pid",
        "value.converter.schemas.enable": "true",
        "value.converter": "org.apache.kafka.connect.json.JsonConverter",
        "name": "cassandraSink"
    },
    "tasks": [],
    "type": "sink"
}
  1. 使用REST API创建connectors查看connectors是否正常
curl -X GET http://127.0.0.1:8083/connectors/cassandraSink/status

# 请求响应如下
{
    "name": "cassandraSink",
    "connector": {
        "state": "RUNNING",
        "worker_id": "127.0.0.1:8083"
    },
    "tasks": [
        {
            "id": 0,
            "state": "RUNNING",
            "worker_id": "127.0.0.1:8083"
        }
    ],
    "type": "sink"
}
  1. 查看表 cassandra库中 数据是否成功写入
cqlsh> select * from kyletest.cassdataperson;
 
 pid | age | firstname
-----+-----+-----------
   1 |  20 |      kyle
   2 |  18 |      test
   3 |  19 |      test2
  1. 关于该Connector的详细参数可参考:Cassandra Sink Connector Configuration Options
  2. 官方示例: Kafka Connect Cassandra Connector

相关文章:

Kafka Connect 简介

Kafka Connector 开发指南

可以添加作者微信进行相互学习交流,还请填写备注信息。

image

相关文章

网友评论

    本文标题:Kafka Connect JDBC Connector使用教程

    本文链接:https://www.haomeiwen.com/subject/hclwiqtx.html