美文网首页Apache KafkakafkaConfluent 组件
Kafka Connect JDBC Connector使用教程

Kafka Connect JDBC Connector使用教程

作者: 扶我起来改bug | 来源:发表于2019-04-09 21:12 被阅读8次

    Kafka Connect JDBC Connector使用教程

    本文章介绍如何使用kafka JDBC Connector,步骤很详细,Connector部署本教程在此省略,如有其他疑问可在底部留言或者在底部扫描添加作者微信,添加时还请备注。

    1. Kafka Connect JDBC Mysql Source Connector

    以下示例演示如何将数据从mysql导入至kafka的topic中

    1. 下载mysql驱动包,mysql-connector-java,经测试8.0.15版本的目前不可用,使用的是 5.1.47 这个版本。
    2. 将下载好的mysql驱动包 mysql-connector-java-5.1.47.jar 复制到插件安装目录,我是用docker启动的,所有将jar包copy至容器的目录 eg: docker container: /usr/share/java/kafka-connect-jdbc,重启Kafka Connect。具体安装可参考官方文档:Install Connectors
    3. 在mysql数据库中新建表 person
    CREATE TABLE `person` (
      `pid` INT(11) NOT NULL AUTO_INCREMENT,
      `firstname` VARCHAR(255) CHARACTER SET utf8 DEFAULT NULL,
      `age` INT(11) DEFAULT NULL,
      PRIMARY KEY (`pid`)
    ) ENGINE=INNODB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
    
    1. 使用 REST API创建connectors
    curl -X POST -H "Content-Type: application/json" \
    --data '{"name":"mysql-source-person","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector","connection.url":"jdbc:mysql://127.0.0.1:3306/kyletest","connection.user":"root","connection.password":"123456","value.converter.schemas.enable":"true","value.converter":"org.apache.kafka.connect.json.JsonConverter","topic.prefix":"mysql","tasks.max":"1","table.whitelist":"person","mode":"incrementing","incrementing.column.name":"pid"}}' \
    http://127.0.0.1:8083/connectors
    
    # 请求响应如下
    {
        "name":"mysql-source-person",
        "config":{
            "connector.class":"io.confluent.connect.jdbc.JdbcSourceConnector",
            "connection.url":"jdbc:mysql://127.0.0.1:3306/kyletest",
            "connection.user":"root",
            "connection.password":"123456",
            "value.converter.schemas.enable":"true",
            "value.converter":"org.apache.kafka.connect.json.JsonConverter",
            "topic.prefix":"mysql",
            "tasks.max":"1",
            "table.whitelist":"person",
            "mode":"incrementing",
            "incrementing.column.name":"pid",
            "name":"mysql-source-person"
        },
        "tasks":[],
        "type":"source"
    }
    
    1. 使用REST API创建connectors查看connectors是否正常
    curl -X GET http://127.0.0.1:8083/connectors/mysql-source-person/status
    
    # 请求响应如下
    {
        "name":"mysql-source-person",
        "connector":{
            "state":"RUNNING",
            "worker_id":"127.0.0.1:8083"
        },
        "tasks":[
            {
                "id":0,
                "state":"RUNNING",
                "worker_id":"127.0.0.1:8083"
            }
        ],
        "type":"source"
    }
    
    1. 在表中插入一条数据,然后查看topic:mysqlperson 数据是否成功写入。
    $ ./kafka-console-consumer --bootstrap-server 127.0.0.1:8092 --topic mysqlperson --from-beginning
    {"schema":{"type":"struct","fields":[{"type":"int32","optional":false,"field":"pid"},{"type":"string","optional":true,"field":"firstname"},{"type":"int32","optional":true,"field":"age"}],"optional":false,"name":"person"},"payload":{"pid":1,"firstname":"kyle","age":20}}
    
    1. 关于该Connector的详细参数可参考:JDBC Source Configuration Options
    2. 官方示例: Kafka Connect JDBC Source Connector

    2. Kafka Connect JDBC Mysql Sink Connector

    以下示例演示如何将数据从kafka的topic导入至mysql中

    1. 下载mysql驱动包,mysql-connector-java,经测试8.0.15版本的目前不可用,使用的是 5.1.47 这个版本。
    2. 将下载好的mysql驱动包 mysql-connector-java-5.1.47.jar 复制到插件安装目录,我是用docker启动的,所有将jar包copy至容器的目录 eg: docker container: /usr/share/java/kafka-connect-jdbc,重启Kafka Connect。具体安装可参考官方文档:Install Connectors
    3. 在mysql数据库中新建表 kafkaperson
    CREATE TABLE `kafkaperson` (
      `pid` INT(11) NOT NULL AUTO_INCREMENT,
      `firstname` VARCHAR(255) CHARACTER SET utf8 DEFAULT NULL,
      `age` INT(11) DEFAULT NULL,
      PRIMARY KEY (`pid`)
    ) ENGINE=INNODB DEFAULT CHARSET=utf8 COLLATE=utf8_bin;
    
    1. 使用 REST API创建connectors
    curl -X POST -H "Content-Type: application/json" \
    --data '{"name":"mysql-sink-person","config":{"connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector","connection.url":"jdbc:mysql://127.0.0.1:3306/kylesinktest","connection.user":"root","connection.password":"123456","tasks.max":"1","topics":"mysqlperson","value.converter.schemas.enable":"true","value.converter":"org.apache.kafka.connect.json.JsonConverter","table.name.format":"kafkaperson","auto.create":"true","insert.mode":"upsert","pk.mode":"record_value","pk.fields":"pid"}}' \
    http://127.0.0.1:8083/connectors
    
    # 请求响应如下
    {
        "name":"mysql-sink-person",
        "config":{
            "connector.class":"io.confluent.connect.jdbc.JdbcSinkConnector",
            "connection.url":"jdbc:mysql://127.0.0.1:3306/kylesinktest",
            "connection.user":"root",
            "connection.password":"123456",
            "tasks.max":"1",
            "topics":"mysqlperson",
            "value.converter.schemas.enable":"true",
            "value.converter":"org.apache.kafka.connect.json.JsonConverter",
            "table.name.format":"kafkaperson",
            "auto.create":"true",
            "insert.mode":"upsert",
            "pk.mode":"record_value",
            "pk.fields":"pid",
            "name":"mysql-sink-person"
        },
        "tasks":[
     
        ],
        "type":"sink"
    }
    
    1. 使用REST API创建connectors查看connectors是否正常
    curl -X GET http://127.0.0.1:8083/connectors/mysql-sink-person/status
    
    # 请求响应如下
    {
        "name":"mysql-sink-person",
        "connector":{
            "state":"RUNNING",
            "worker_id":"127.0.0.1:8083"
        },
        "tasks":[
            {
                "id":0,
                "state":"RUNNING",
                "worker_id":"127.0.0.1:8083"
            }
        ],
        "type":"sink"
    }
    
    1. 查看表 kafkaperson 是否成功有数据写入
    SELECT * FROM kafkaperson
    
    firstname pid age
    kyle 1 2
    1. 关于该Connector的详细参数可参考:JDBC Source Configuration Options
    2. 官方示例: Kafka Connect JDBC Source Connector

    3. Kafka Connect JDBC Cassandra Sink Connector

    以下示例演示如何将数据从kafka的topic导入至Cassandra中

    1. 下载 Kafka Connect Cassandra插件,并安装,具体安装可参考官方文档:Install Connectors (本示例Kafka Connect 使用docker启动,故直接将下载好的confluentinc-kafka-connect-cassandra-1.0.2.zip解压,将所有文件copy值 容器 /usr/share/java/kafka-connect-cassandra目录内,重启容器即可。)
    2. 使用 REST API创建connectors
    curl -X POST -H "Content-Type: application/json" \
    --data '{"name":"cassandraSink","config":{"connector.class":"io.confluent.connect.cassandra.CassandraSinkConnector","transforms.createKey.type":"org.apache.kafka.connect.transforms.ValueToKey","cassandra.contact.points":"127.0.0.1","tasks.max":"1","topics":"mysqlperson","cassandra.keyspace.create.enabled":"true","transforms":"createKey","cassandra.keyspace":"kyletest","cassandra.write.mode":"Insert","transforms.createKey.fields":"pid","value.converter.schemas.enable":"true","value.converter":"org.apache.kafka.connect.json.JsonConverter"}}' \
    http://127.0.0.1:8083/connectors
    
    # 请求响应如下
    {
        "name": "cassandraSink",
        "config": {
            "connector.class": "io.confluent.connect.cassandra.CassandraSinkConnector",
            "transforms.createKey.type": "org.apache.kafka.connect.transforms.ValueToKey",
            "cassandra.contact.points": "127.0.0.1",
            "tasks.max": "1",
            "topics": "mysqlperson",
            "cassandra.keyspace.create.enabled": "true",
            "transforms": "createKey",
            "cassandra.keyspace": "kyletest",
            "cassandra.write.mode": "Insert",
            "transforms.createKey.fields": "pid",
            "value.converter.schemas.enable": "true",
            "value.converter": "org.apache.kafka.connect.json.JsonConverter",
            "name": "cassandraSink"
        },
        "tasks": [],
        "type": "sink"
    }
    
    1. 使用REST API创建connectors查看connectors是否正常
    curl -X GET http://127.0.0.1:8083/connectors/cassandraSink/status
    
    # 请求响应如下
    {
        "name": "cassandraSink",
        "connector": {
            "state": "RUNNING",
            "worker_id": "127.0.0.1:8083"
        },
        "tasks": [
            {
                "id": 0,
                "state": "RUNNING",
                "worker_id": "127.0.0.1:8083"
            }
        ],
        "type": "sink"
    }
    
    1. 查看表 cassandra库中 数据是否成功写入
    cqlsh> select * from kyletest.cassdataperson;
     
     pid | age | firstname
    -----+-----+-----------
       1 |  20 |      kyle
       2 |  18 |      test
       3 |  19 |      test2
    
    1. 关于该Connector的详细参数可参考:Cassandra Sink Connector Configuration Options
    2. 官方示例: Kafka Connect Cassandra Connector

    相关文章:

    Kafka Connect 简介

    Kafka Connector 开发指南

    可以添加作者微信进行相互学习交流,还请填写备注信息。

    image

    相关文章

      网友评论

        本文标题:Kafka Connect JDBC Connector使用教程

        本文链接:https://www.haomeiwen.com/subject/hclwiqtx.html