美文网首页
Windows Flume+Kafka 获取MySQL增量数据

Windows Flume+Kafka 获取MySQL增量数据

作者: CNSTT | 来源:发表于2018-11-21 17:49 被阅读0次

前言:

本文章适用于在Windows上使用Flume准实时获取MySQL表中增量数据,并输出到Kafka 消费。其中跳过个别步骤可自行百度,整个过程大约配置半个小时即可。

版本:

apache-flume-1.8.0-bin
kafka_2.11-2.0.1
mysql-connector-java-5.1.25-bin.jar
sbt-0.13.15.msi
jdk1.8.0_45

准备内容:

Flume: http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
Kafka:  http://kafka.apache.org/downloads
sbt:      https://www.scala-sbt.org/download.html
Flume: https://github.com/keedio/flume-ng-sql-source
下载完毕解压到指定目录,本例保存在D:\com\目录下

配置环境变量:

打开“环境变量”,新建或修改系统变量
变量: FLUME_HOME 值: D:\com\apache-flume-1.8.0-bin
变量: KA_HOME 值: D:\com\kafka_2.11-2.0.1
变量: JAVA_HOME 值: C:\Program Files\Java\jdk1.8.0_45
变量: CLASSPATH.;%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar
变量: PATH
追加;%FLUME_HOME%\conf;%FLUME_HOME%\bin;%KA_HOME%\bin\windows
如果在启动zookeeper.properties出错,编辑修改文件D:\com\kafka_2.11-2.0.1\bin\windows\kafka-run-class.bat中的如下语句,将CLASSPATH前后加上引号
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*

Kafka配置

首先安装sbt-0.13.15.msi,配置环境变量等略去
打开cmd命令行 cd 到目录D:\com\kafka_2.11-2.0.1
小技巧:shift+右键选择在此处打开命令窗口
依次执行以下命令,等待出现绿色的success再执行下一个命令

sbt update
sbt package
sbt sbt-dependency

根据需要可以自行修改文件zookeeper.propertieslog4j.propertiesserver.properties等的log存放位置(可忽略)。

配置Flume采集MySQL

1、下载Flume连接DB插件

https://github.com/keedio/flume-ng-sql-source Download Zip
解压到任意目录(如D:\com)
对插件编译 并跳过测试用例 切换到目录D:\com\flume-ng-sql-source-develop

mvn compile
mvn package -Dmaven.test.skip=true 

复制target文件夹下jar文件flume-ng-sql-source-1.5.3-SNAPSHOT.jar以及mysql-connector-java-5.1.25-bin.jar到文件夹D:\com\apache-flume-1.8.0-bin\lib内即可。

2、编写properties

切换到目录D:\com\apache-flume-1.8.0-bin\conf
新建一个文件flume.properties

a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1
###########sql source#################
# For each one of the sources, the type is defined
a1.sources.src-1.type = org.keedio.flume.source.SQLSource
a1.sources.src-1.hibernate.connection.url = jdbc:mysql://你的URL:3306/你的DB

# Hibernate Database connection properties
a1.sources.src-1.hibernate.connection.user = 你的用户名
a1.sources.src-1.hibernate.connection.password = 你的密码
a1.sources.src-1.hibernate.connection.autocommit = true
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.src-1.run.query.delay=10000
a1.sources.src-1.status.file.path = D://com//apache-flume-1.8.0-bin//status
a1.sources.src-1.status.file.name = sqlSource.status
#sqlSource.status文件中记录了增量字段的值 $@$

# Custom query="你的SQL语句"
a1.sources.src-1.start.from = 0
a1.sources.src-1.custom.query = select id,country,sum from income where id > $@$ order by id asc
a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 1000

a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10
##############################
a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 10000
a1.channels.ch-1.transactionCapacity = 10000
a1.channels.ch-1.byteCapacityBufferPercentage = 20
a1.channels.ch-1.byteCapacity = 800000

#Kafka sink配置 a1.sinks.k1.topic = 你的topic名字
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = mysqltest
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20

# combination  a1.sources.src-1.channels不要漏掉s
a1.sinks.k1.channel = ch-1
a1.sources.src-1.channels=ch-1

正式开始

cmd切换到指定路径 cd D:\com\kafka_2.11-2.0.1\bin\windows

1、启动Kafka内置zookeeper

D:\com\kafka_2.11-2.0.1\bin\windows>zookeeper-server-start.bat ../../config/zookeeper.properties

启动完成后不要关闭,否则导致zookeeper进程停止。

2、启动Kafka

D:\com\kafka_2.11-2.0.1\bin\windows>kafka-server-start.bat ../../config/server.properties

启动完成后不要关闭,否则导致Kafka进程停止。

3、创建一个topic mysqltest

D:\com\kafka_2.11-2.0.1\bin\windows>kafka-run-class.bat kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mysqltest

4、消费者接收消息

D:\com\kafka_2.11-2.0.1\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic mysqltest --from-beginning

此时因为没有发送消息为空,保持开启状态。

5、启动flume-ng

切换目录 cd D:\com\apache-flume-1.8.0-bin

D:\com\apache-flume-1.8.0-bin>flume-ng agent -c conf -f conf/flume.properties -n a1

查看刚刚打开的"接收消息"命令行


image.png

再次插入一条数据后观察自动获取id为7的data(可能需要等待几秒)。


image.png

至此在Windows环境下使用Flume+Kafka获取MySQL中表内增量数据完成!

谢谢阅读,有帮助的点个❤!

相关文章

网友评论

      本文标题:Windows Flume+Kafka 获取MySQL增量数据

      本文链接:https://www.haomeiwen.com/subject/ejycqqtx.html