美文网首页消息中间件Kafkaconfluent
confluent jdbc connector - mysql

confluent jdbc connector - mysql

作者: 愚公300代 | 来源:发表于2017-07-05 19:01 被阅读470次

    JDBC Source Connector

    Quickstart

    数据库环境准备

    CREATE DATABASE connector;
    USE connector;
    CREATE TABLE `from_source` (
      `fdsid` int(11) NOT NULL AUTO_INCREMENT,
      `dsid` int(11) DEFAULT NULL,
      `from` int(11) DEFAULT NULL,
      `stype` int(11) DEFAULT NULL,
      PRIMARY KEY (`fdsid`)
    ) ENGINE=InnoDB AUTO_INCREMENT=6 DEFAULT CHARSET=utf8mb4;
    INSERT INTO from_source VALUES(1, 2, 1, 1);
    INSERT INTO from_source VALUES(2, 2, 1, 2);
    INSERT INTO from_source VALUES(3, 2, 1, 5);
    INSERT INTO from_source VALUES(4, 2, 1, 6);
    

    MySQL JDBC 驱动准备

    测试环境使用的mysql版本信息如下:


    mysql-info.png

    在mysql官网上选择合适的驱动下载,测试中下载的是mysql-connector-java-5.1.42.tar.gz。
    将此驱动拷贝到$CONFLUENT_PATH/share/java/kafka-connect-jdbc目录下,使用解压命令解压

    tar -xzvf mysql-connector-java-5.1.42.tar.gz
    cd mysql-connector-java-5.1.42
    cp mysql-connector-java-5.1.42-bin.jar ../
    

    最终的目的就是将mysql-connector-java-5.1.42-bin.jar放在$CONFLUENT_PATH/share/java/kafka-connect-jdbc目录下,这样confluecnt connector在启动是就可以找到mysql的jdbc驱动了

    配置文件准备

    单机环境下运行connector的命令如下:

    //INFO Usage: ConnectStandalone worker.properties connector1.properties [connector2.properties ...] (org.apache.kafka.connect.cli.ConnectStandalone:61)
    bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/mysql-source.properties
    

    其中:connect-avro-standalone.properties可使用默认配置;
    mysql-source.properties的内容如下:

    # tasks to create:
    name=test-mysql-jdbc-autoincrement
    connector.class=io.confluent.connect.jdbc.JdbcSourceConnector
    tasks.max=1
    # a table called 'users' will be written to the topic 'test-mysql-jdbc-users'.
    connection.url=jdbc:mysql://172.24.8.114:3306/connector?user=$USER&password=$PASSWORD
    mode=incrementing
    incrementing.column.name=fdsid
    topic.prefix=test-mysql-jdbc-
    

    其中,$USER,$PASSWORD是访问mysql数据库的用户名和地址。
    参考:
    JDBC驱动下载地址
    MySQL Connector配置参考

    从MySQL导入数据到Kafka中

    启动Connector

     bin/connect-standalone etc/schema-registry/connect-avro-standalone.properties etc/kafka-connect-jdbc/mysql-source.properties
    

    验证产生了相应的topic

    bin/kafka-topics --zookeeper localhost:2181 --list
    

    topic列表中会包含:test-mysql-jdbc-from_source

    验证topic中的数据

    bin/kafka-avro-console-consumer --new-consumer --bootstrap-server localhost:9092 --topic test-mysql-jdbc-from_source --from-beginning
    

    此时,可以获取topic:test-mysql-jdbc-from_source中的所有数据。

    验证可增量从MySQL中导入数据

    向from_source表中添加一个数据

    insert into from_source values(7, 2, 1, 1)
    

    相应的消费者会接收到插入到from_source表中的数据。
    总参考:
    JDBC Source Connector 官网

    相关文章

      网友评论

        本文标题:confluent jdbc connector - mysql

        本文链接:https://www.haomeiwen.com/subject/eosqhxtx.html