Kafka Connect JDBC Connector使用教程
本文章介绍如何使用kafka JDBC Connector,步骤很详细,Connector部署本教程在此省略,如有其他疑问可在底部留言或者在底部扫描添加作者微信,添加时还请备注。
1. Kafka Connect JDBC Mysql Source Connector
以下示例演示如何将数据从mysql导入至kafka的topic中
- 下载mysql驱动包,mysql-connector-java,经测试8.0.15版本的目前不可用,使用的是 5.1.47 这个版本。
- 将下载好的mysql驱动包 mysql-connector-java-5.1.47.jar 复制到插件安装目录,我是用docker启动的,所有将jar包copy至容器的目录 eg: docker container: /usr/share/java/kafka-connect-jdbc,重启Kafka Connect。具体安装可参考官方文档:Install Connectors
- 在mysql数据库中新建表 person
CREATE TABLE `person` (
`pid` INT(11) NOT NULL AUTO_INCREMENT,
`firstname` VARCHAR(255) CHARACTER SET utf8 DEFAULT NULL,
`age` INT(11) DEFAULT NULL,
PRIMARY KEY (`pid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
- 使用 REST API创建connectors
curl -X POST -H "Content-Type: application/json" \
--data '{"name":"mysql-source-person","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","connection.url":"jdbc:mysql://127.0.0.1:3306/kyletest","connection.user":"root","connection.password":"123456","value.converter.schemas.enable":"true","value.converter":"org.apache.kafka.connect.json.JsonConverter","topic.prefix":"mysql","tasks.max":"1","table.whitelist":"person","mode":"incrementing","incrementing.column.name":"pid"}}' \
http://127.0.0.1:8083/connectors
# 请求响应如下
{
"name":"mysql-source-person",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
"connection.url":"jdbc:mysql://127.0.0.1:3306/kyletest",
"connection.user":"root",
"connection.password":"123456",
"value.converter.schemas.enable":"true",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"topic.prefix":"mysql",
"tasks.max":"1",
"table.whitelist":"person",
"mode":"incrementing",
"incrementing.column.name":"pid",
"name":"mysql-source-person"
},
"tasks":[],
"type":"source"
}
- 使用REST API创建connectors查看connectors是否正常
curl -X GET http://127.0.0.1:8083/connectors/mysql-source-person/status
# 请求响应如下
{
"name":"mysql-source-person",
"connector":{
"state":"RUNNING",
"worker_id":"127.0.0.1:8083"
},
"tasks":[
{
"id":0,
"state":"RUNNING",
"worker_id":"127.0.0.1:8083"
}
],
"type":"source"
}
- 在表中插入一条数据,然后查看topic:mysqlperson 数据是否成功写入。
$ ./kafka-console-consumer --bootstrap-server 127.0.0.1:8092 --topic mysqlperson --from-beginning
{"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"pid"},{"type":"string","optional":true,"field":"firstname"},{"type":"int32","optional":true,"field":"age"}],"optional":false,"name":"person"},"payload":{"pid":1,"firstname":"kyle","age":20}}
- 关于该Connector的详细参数可参考:JDBC Source Configuration Options
- 官方示例: Kafka Connect JDBC Source Connector
2. Kafka Connect JDBC Mysql Sink Connector
以下示例演示如何将数据从kafka的topic导入至mysql中
- 下载mysql驱动包,mysql-connector-java,经测试8.0.15版本的目前不可用,使用的是 5.1.47 这个版本。
- 将下载好的mysql驱动包 mysql-connector-java-5.1.47.jar 复制到插件安装目录,我是用docker启动的,所有将jar包copy至容器的目录 eg: docker container: /usr/share/java/kafka-connect-jdbc,重启Kafka Connect。具体安装可参考官方文档:Install Connectors
- 在mysql数据库中新建表 kafkaperson
CREATE TABLE `kafkaperson` (
`pid` INT(11) NOT NULL AUTO_INCREMENT,
`firstname` VARCHAR(255) CHARACTER SET utf8 DEFAULT NULL,
`age` INT(11) DEFAULT NULL,
PRIMARY KEY (`pid`)
) ENGINE=INNODB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
- 使用 REST API创建connectors
curl -X POST -H "Content-Type: application/json" \
--data '{"name":"mysql-sink-person","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","connection.url":"jdbc:mysql://127.0.0.1:3306/kylesinktest","connection.user":"root","connection.password":"123456","tasks.max":"1","topics":"mysqlperson","value.converter.schemas.enable":"true","value.converter":"org.apache.kafka.connect.json.JsonConverter","table.name.format":"kafkaperson","auto.create":"true","insert.mode":"upsert","pk.mode":"record_value","pk.fields":"pid"}}' \
http://127.0.0.1:8083/connectors
# 请求响应如下
{
"name":"mysql-sink-person",
"config":{
"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
"connection.url":"jdbc:mysql://127.0.0.1:3306/kylesinktest",
"connection.user":"root",
"connection.password":"123456",
"tasks.max":"1",
"topics":"mysqlperson",
"value.converter.schemas.enable":"true",
"value.converter":"org.apache.kafka.connect.json.JsonConverter",
"table.name.format":"kafkaperson",
"auto.create":"true",
"insert.mode":"upsert",
"pk.mode":"record_value",
"pk.fields":"pid",
"name":"mysql-sink-person"
},
"tasks":[
],
"type":"sink"
}
- 使用REST API创建connectors查看connectors是否正常
curl -X GET http://127.0.0.1:8083/connectors/mysql-sink-person/status
# 请求响应如下
{
"name":"mysql-sink-person",
"connector":{
"state":"RUNNING",
"worker_id":"127.0.0.1:8083"
},
"tasks":[
{
"id":0,
"state":"RUNNING",
"worker_id":"127.0.0.1:8083"
}
],
"type":"sink"
}
- 查看表 kafkaperson 是否成功有数据写入
SELECT * FROM kafkaperson
firstname | pid | age |
---|---|---|
kyle | 1 | 2 |
- 关于该Connector的详细参数可参考:JDBC Source Configuration Options
- 官方示例: Kafka Connect JDBC Source Connector
3. Kafka Connect JDBC Cassandra Sink Connector
以下示例演示如何将数据从kafka的topic导入至Cassandra中
- 下载 Kafka Connect Cassandra插件,并安装,具体安装可参考官方文档:Install Connectors (本示例Kafka Connect 使用docker启动,故直接将下载好的confluentinc-kafka-connect-cassandra-1.0.2.zip解压,将所有文件copy值 容器 /usr/share/java/kafka-connect-cassandra目录内,重启容器即可。)
- 使用 REST API创建connectors
curl -X POST -H "Content-Type: application/json" \
--data '{"name":"cassandraSink","config":{"connector.class":"io.confluent.connect.cassandra.CassandraSinkConnector","transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey","cassandra.contact.points":"127.0.0.1","tasks.max":"1","topics":"mysqlperson","cassandra.keyspace.create.enabled":"true","transforms":"createKey","cassandra.keyspace":"kyletest","cassandra.write.mode":"Insert","transforms.createKey.fields":"pid","value.converter.schemas.enable":"true","value.converter":"org.apache.kafka.connect.json.JsonConverter"}}' \
http://127.0.0.1:8083/connectors
# 请求响应如下
{
"name": "cassandraSink",
"config": {
"connector.class": "io.confluent.connect.cassandra.CassandraSinkConnector",
"transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
"cassandra.contact.points": "127.0.0.1",
"tasks.max": "1",
"topics": "mysqlperson",
"cassandra.keyspace.create.enabled": "true",
"transforms": "createKey",
"cassandra.keyspace": "kyletest",
"cassandra.write.mode": "Insert",
"transforms.createKey.fields": "pid",
"value.converter.schemas.enable": "true",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"name": "cassandraSink"
},
"tasks": [],
"type": "sink"
}
- 使用REST API创建connectors查看connectors是否正常
curl -X GET http://127.0.0.1:8083/connectors/cassandraSink/status
# 请求响应如下
{
"name": "cassandraSink",
"connector": {
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
},
"tasks": [
{
"id": 0,
"state": "RUNNING",
"worker_id": "127.0.0.1:8083"
}
],
"type": "sink"
}
- 查看表 cassandra库中 数据是否成功写入
cqlsh> select * from kyletest.cassdataperson;
pid | age | firstname
-----+-----+-----------
1 | 20 | kyle
2 | 18 | test
3 | 19 | test2
- 关于该Connector的详细参数可参考:Cassandra Sink Connector Configuration Options
- 官方示例: Kafka Connect Cassandra Connector
相关文章:
Kafka Connect 简介
Kafka Connector 开发指南
可以添加作者微信进行相互学习交流,还请填写备注信息。
网友评论