美文网首页
Windows Flume+Kafka 获取MySQL增量数据

Windows Flume+Kafka 获取MySQL增量数据

作者: CNSTT | 来源:发表于2018-11-21 17:49 被阅读0次

    前言:

    本文章适用于在Windows上使用Flume准实时获取MySQL表中增量数据,并输出到Kafka 消费。其中跳过个别步骤可自行百度,整个过程大约配置半个小时即可。

    版本:

    apache-flume-1.8.0-bin
    kafka_2.11-2.0.1
    mysql-connector-java-5.1.25-bin.jar
    sbt-0.13.15.msi
    jdk1.8.0_45

    准备内容:

    Flume: http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
    Kafka:  http://kafka.apache.org/downloads
    sbt:      https://www.scala-sbt.org/download.html
    Flume: https://github.com/keedio/flume-ng-sql-source
    下载完毕解压到指定目录,本例保存在D:\com\目录下

    配置环境变量:

    打开“环境变量”,新建或修改系统变量
    变量: FLUME_HOME 值: D:\com\apache-flume-1.8.0-bin
    变量: KA_HOME 值: D:\com\kafka_2.11-2.0.1
    变量: JAVA_HOME 值: C:\Program Files\Java\jdk1.8.0_45
    变量: CLASSPATH.;%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar
    变量: PATH
    追加;%FLUME_HOME%\conf;%FLUME_HOME%\bin;%KA_HOME%\bin\windows
    如果在启动zookeeper.properties出错,编辑修改文件D:\com\kafka_2.11-2.0.1\bin\windows\kafka-run-class.bat中的如下语句,将CLASSPATH前后加上引号
    set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*

    Kafka配置

    首先安装sbt-0.13.15.msi,配置环境变量等略去
    打开cmd命令行 cd 到目录D:\com\kafka_2.11-2.0.1
    小技巧:shift+右键选择在此处打开命令窗口
    依次执行以下命令,等待出现绿色的success再执行下一个命令

    sbt update
    
    sbt package
    
    sbt sbt-dependency
    

    根据需要可以自行修改文件zookeeper.propertieslog4j.propertiesserver.properties等的log存放位置(可忽略)。

    配置Flume采集MySQL

    1、下载Flume连接DB插件

    https://github.com/keedio/flume-ng-sql-source Download Zip
    解压到任意目录(如D:\com)
    对插件编译 并跳过测试用例 切换到目录D:\com\flume-ng-sql-source-develop

    mvn compile
    
    mvn package -Dmaven.test.skip=true 
    

    复制target文件夹下jar文件flume-ng-sql-source-1.5.3-SNAPSHOT.jar以及mysql-connector-java-5.1.25-bin.jar到文件夹D:\com\apache-flume-1.8.0-bin\lib内即可。

    2、编写properties

    切换到目录D:\com\apache-flume-1.8.0-bin\conf
    新建一个文件flume.properties

    a1.channels = ch-1
    a1.sources = src-1
    a1.sinks = k1
    ###########sql source#################
    # For each one of the sources, the type is defined
    a1.sources.src-1.type = org.keedio.flume.source.SQLSource
    a1.sources.src-1.hibernate.connection.url = jdbc:mysql://你的URL:3306/你的DB
    
    # Hibernate Database connection properties
    a1.sources.src-1.hibernate.connection.user = 你的用户名
    a1.sources.src-1.hibernate.connection.password = 你的密码
    a1.sources.src-1.hibernate.connection.autocommit = true
    a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
    a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
    a1.sources.src-1.run.query.delay=10000
    a1.sources.src-1.status.file.path = D://com//apache-flume-1.8.0-bin//status
    a1.sources.src-1.status.file.name = sqlSource.status
    #sqlSource.status文件中记录了增量字段的值 $@$
    
    # Custom query="你的SQL语句"
    a1.sources.src-1.start.from = 0
    a1.sources.src-1.custom.query = select id,country,sum from income where id > $@$ order by id asc
    a1.sources.src-1.batch.size = 1000
    a1.sources.src-1.max.rows = 1000
    
    a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
    a1.sources.src-1.hibernate.c3p0.min_size=1
    a1.sources.src-1.hibernate.c3p0.max_size=10
    ##############################
    a1.channels.ch-1.type = memory
    a1.channels.ch-1.capacity = 10000
    a1.channels.ch-1.transactionCapacity = 10000
    a1.channels.ch-1.byteCapacityBufferPercentage = 20
    a1.channels.ch-1.byteCapacity = 800000
    
    #Kafka sink配置 a1.sinks.k1.topic = 你的topic名字
    a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
    a1.sinks.k1.topic = mysqltest
    a1.sinks.k1.brokerList = localhost:9092
    a1.sinks.k1.requiredAcks = 1
    a1.sinks.k1.batchSize = 20
    
    # combination  a1.sources.src-1.channels不要漏掉s
    a1.sinks.k1.channel = ch-1
    a1.sources.src-1.channels=ch-1
    
    

    正式开始

    cmd切换到指定路径 cd D:\com\kafka_2.11-2.0.1\bin\windows

    1、启动Kafka内置zookeeper

    D:\com\kafka_2.11-2.0.1\bin\windows>zookeeper-server-start.bat ../../config/zookeeper.properties
    

    启动完成后不要关闭,否则导致zookeeper进程停止。

    2、启动Kafka

    D:\com\kafka_2.11-2.0.1\bin\windows>kafka-server-start.bat ../../config/server.properties
    

    启动完成后不要关闭,否则导致Kafka进程停止。

    3、创建一个topic mysqltest

    D:\com\kafka_2.11-2.0.1\bin\windows>kafka-run-class.bat kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mysqltest
    

    4、消费者接收消息

    D:\com\kafka_2.11-2.0.1\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic mysqltest --from-beginning
    

    此时因为没有发送消息为空,保持开启状态。

    5、启动flume-ng

    切换目录 cd D:\com\apache-flume-1.8.0-bin

    D:\com\apache-flume-1.8.0-bin>flume-ng agent -c conf -f conf/flume.properties -n a1
    

    查看刚刚打开的"接收消息"命令行


    image.png

    再次插入一条数据后观察自动获取id为7的data(可能需要等待几秒)。


    image.png

    至此在Windows环境下使用Flume+Kafka获取MySQL中表内增量数据完成!

    谢谢阅读,有帮助的点个❤!

    相关文章

      网友评论

          本文标题:Windows Flume+Kafka 获取MySQL增量数据

          本文链接:https://www.haomeiwen.com/subject/ejycqqtx.html