前几章介绍了Flume、Spark Streaming入门、Spark Streaming进阶。这一章一起学习Spark Streaming整合Flume。
概述
flume作为日志实时采集的框架,可以与SparkStreaming实时处理框架进行对接,flume实时产生数据,sparkStreaming做实时处理。Spark Streaming对接Flume有两种方式,一种是Flume将消息Push推给Spark Streaming,还有一种是Spark Streaming从flume 中Pull拉取数据。在实际中用Pull方式较多,因为其中使用到的SparkSink具有缓冲区的作用,SparkStreaming通过有效的flume receiver去从Sink中拉取数据,拉取的数据有多副本的存储方式,增加了容错性,稳定性好并且在拉取过程中具有事务性。而Push方式可能出现数据丢失的问题。所以这里只演示Pull的方式。
整合步骤
- 配置flume agent
监听 /root/data/data.log文件,在文件中有新的输入时,把输入的内容输出到SparkSink。同时SparkSink绑定的IP为虚拟机本地的IP,端口只要是未占用的都行。
# Name the components on this agent
a1.sources = r1
a1.sinks = k1
a1.channels = c1
# Describe/configure the source
a1.sources.r1.type = exec
a1.sources.r1.command= tail -F /root/data/data.log
a1.sources.r1.shell = /bin/sh -c
# Describe the sink
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.hostname=192.168.30.131
a1.sinks.k1.port = 8888
a1.sinks.k1.batchSize= 2000
# Use a channel which buffers events in memory
a1.channels.c1.type = memory
# Bind the source and sink to the channel
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 启动flume agent
../bin/flume-ng agent -n a1 -c conf/ -f exec-memory-spark.conf -Dflume.root.logger=INFO,console
- 本地Scala 程序
pom文件
<dependency>
<groupId>org.scala-lang</groupId>
<artifactId>scala-library</artifactId>
<version>${scala.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-flume_2.11</artifactId>
<version>2.2.0</version>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
<version>3.5</version>
</dependency>
代码
import org.apache.spark.SparkConf
import org.apache.spark.streaming.flume.FlumeUtils
import org.apache.spark.streaming.{Seconds, StreamingContext}
/**
* Spark Streaming整合Flume的Pull方式
*/
object FlumePullWordCount {
def main(args: Array[String]): Unit = {
if(args.length != 2) {
System.err.println("Usage: FlumePullWordCount <hostname> <port>")
System.exit(1)
}
val Array(hostname, port) = args
val sparkConf = new SparkConf().setMaster("local[2]").setAppName("FlumePullWordCount")
val ssc = new StreamingContext(sparkConf, Seconds(1))
//获取flume中数据
val flumeStream = FlumeUtils.createPollingStream(ssc, hostname, port.toInt)
flumeStream.map(x=> new String(x.event.getBody.array()).trim)
.flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).print()
ssc.start()
ssc.awaitTermination()
}
}
-
本地测试
在程序中监听的IP地址和端口是通过外部传参的方式处理,这么做方便在服务器部署时指定任意监听的位置,避免了硬编码的局限。
在本地测试时需要在如图位置指定参数。这里的IP和端口一定要和flume agent配置中的相同。
image.png
启动程序后,在data.log中输入字符。
image.png
观察控制台信息,发现可以统计出字符出现的次数。
image.png -
服务器测试
在实际开发中,程序开发完毕时需要部署在服务器运行。此时只需要把程序打成jar包,在根据spark-submit的方式运行即可。 -
首先通过idea跳过测试打包
image.png -
把jar包上传到服务器
-
spark-submit启动作业
要在有网的环境下,--packages需要拉取spark streaming和flume集成的包。
./spark-submit --master local[2] --name FlumePullWordCount --packages org.apache.spark:spark-streaming-flume_2.11:2.2.0 --class com.imooc.spark.FlumePullWordCount ~/lib/sparktrain-1.0.jar 192.168.30.131 8888
-
测试
在flume监听的输入文件中输入字符
image.png
观察服务器上运行作业的日志
image.png
成功
网友评论