前言:
本文章适用于在Windows上使用Flume准实时获取MySQL表中增量数据,并输出到Kafka 消费。其中跳过个别步骤可自行百度,整个过程大约配置半个小时即可。
版本:
apache-flume-1.8.0-bin
kafka_2.11-2.0.1
mysql-connector-java-5.1.25-bin.jar
sbt-0.13.15.msi
jdk1.8.0_45
准备内容:
Flume: http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
Kafka: http://kafka.apache.org/downloads
sbt: https://www.scala-sbt.org/download.html
Flume: https://github.com/keedio/flume-ng-sql-source
下载完毕解压到指定目录,本例保存在D:\com\目录下
配置环境变量:
打开“环境变量”,新建或修改系统变量
变量: FLUME_HOME
值: D:\com\apache-flume-1.8.0-bin
变量: KA_HOME
值: D:\com\kafka_2.11-2.0.1
变量: JAVA_HOME
值: C:\Program Files\Java\jdk1.8.0_45
变量: CLASSPATH
值 .;%JAVA_HOME%\lib\dt.jar;%JAVA_HOME%\lib\tools.jar
变量: PATH
追加;%FLUME_HOME%\conf;%FLUME_HOME%\bin;%KA_HOME%\bin\windows
如果在启动zookeeper.properties出错,编辑修改文件D:\com\kafka_2.11-2.0.1\bin\windows\kafka-run-class.bat
中的如下语句,将CLASSPATH前后加上引号
set COMMAND=%JAVA% %KAFKA_HEAP_OPTS% %KAFKA_JVM_PERFORMANCE_OPTS% %KAFKA_JMX_OPTS% %KAFKA_LOG4J_OPTS% -cp "%CLASSPATH%" %KAFKA_OPTS% %*
Kafka配置
首先安装sbt-0.13.15.msi,配置环境变量等略去
打开cmd命令行 cd
到目录D:\com\kafka_2.11-2.0.1
小技巧:shift+右键
选择在此处打开命令窗口
依次执行以下命令,等待出现绿色的success
再执行下一个命令
sbt update
sbt package
sbt sbt-dependency
根据需要可以自行修改文件zookeeper.properties
、log4j.properties
、server.properties
等的log存放位置(可忽略)。
配置Flume采集MySQL
1、下载Flume连接DB插件
https://github.com/keedio/flume-ng-sql-source
Download Zip
解压到任意目录(如D:\com)
对插件编译 并跳过测试用例 切换到目录D:\com\flume-ng-sql-source-develop
内
mvn compile
mvn package -Dmaven.test.skip=true
复制target文件夹下jar文件flume-ng-sql-source-1.5.3-SNAPSHOT.jar
以及mysql-connector-java-5.1.25-bin.jar
到文件夹D:\com\apache-flume-1.8.0-bin\lib
内即可。
2、编写properties
切换到目录D:\com\apache-flume-1.8.0-bin\conf
新建一个文件flume.properties
a1.channels = ch-1
a1.sources = src-1
a1.sinks = k1
###########sql source#################
# For each one of the sources, the type is defined
a1.sources.src-1.type = org.keedio.flume.source.SQLSource
a1.sources.src-1.hibernate.connection.url = jdbc:mysql://你的URL:3306/你的DB
# Hibernate Database connection properties
a1.sources.src-1.hibernate.connection.user = 你的用户名
a1.sources.src-1.hibernate.connection.password = 你的密码
a1.sources.src-1.hibernate.connection.autocommit = true
a1.sources.src-1.hibernate.dialect = org.hibernate.dialect.MySQL5Dialect
a1.sources.src-1.hibernate.connection.driver_class = com.mysql.jdbc.Driver
a1.sources.src-1.run.query.delay=10000
a1.sources.src-1.status.file.path = D://com//apache-flume-1.8.0-bin//status
a1.sources.src-1.status.file.name = sqlSource.status
#sqlSource.status文件中记录了增量字段的值 $@$
# Custom query="你的SQL语句"
a1.sources.src-1.start.from = 0
a1.sources.src-1.custom.query = select id,country,sum from income where id > $@$ order by id asc
a1.sources.src-1.batch.size = 1000
a1.sources.src-1.max.rows = 1000
a1.sources.src-1.hibernate.connection.provider_class = org.hibernate.connection.C3P0ConnectionProvider
a1.sources.src-1.hibernate.c3p0.min_size=1
a1.sources.src-1.hibernate.c3p0.max_size=10
##############################
a1.channels.ch-1.type = memory
a1.channels.ch-1.capacity = 10000
a1.channels.ch-1.transactionCapacity = 10000
a1.channels.ch-1.byteCapacityBufferPercentage = 20
a1.channels.ch-1.byteCapacity = 800000
#Kafka sink配置 a1.sinks.k1.topic = 你的topic名字
a1.sinks.k1.type = org.apache.flume.sink.kafka.KafkaSink
a1.sinks.k1.topic = mysqltest
a1.sinks.k1.brokerList = localhost:9092
a1.sinks.k1.requiredAcks = 1
a1.sinks.k1.batchSize = 20
# combination a1.sources.src-1.channels不要漏掉s
a1.sinks.k1.channel = ch-1
a1.sources.src-1.channels=ch-1
正式开始
cmd切换到指定路径 cd D:\com\kafka_2.11-2.0.1\bin\windows
1、启动Kafka内置zookeeper
D:\com\kafka_2.11-2.0.1\bin\windows>zookeeper-server-start.bat ../../config/zookeeper.properties
启动完成后不要关闭,否则导致zookeeper进程停止。
2、启动Kafka
D:\com\kafka_2.11-2.0.1\bin\windows>kafka-server-start.bat ../../config/server.properties
启动完成后不要关闭,否则导致Kafka进程停止。
3、创建一个topic mysqltest
D:\com\kafka_2.11-2.0.1\bin\windows>kafka-run-class.bat kafka.admin.TopicCommand --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic mysqltest
4、消费者接收消息
D:\com\kafka_2.11-2.0.1\bin\windows>kafka-console-consumer.bat --bootstrap-server localhost:9092 --topic mysqltest --from-beginning
此时因为没有发送消息为空,保持开启状态。
5、启动flume-ng
切换目录 cd D:\com\apache-flume-1.8.0-bin
D:\com\apache-flume-1.8.0-bin>flume-ng agent -c conf -f conf/flume.properties -n a1
查看刚刚打开的"接收消息"命令行
image.png
再次插入一条数据后观察自动获取id为7的data(可能需要等待几秒)。
image.png
网友评论