三、高级数据源
1、Spark Streaming接收Flume数据
基于Flume的Push模式
Flume被用于在Flume agents之间推送数据.在这种方式下,Spark Streaming可以很方便的建立一个receiver,起到一个Avro agent的作用.Flume可以将数据推送到改receiver.
- 第一步:Flume的配置文件
#bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console
#定义agent名, source、channel、sink的名称
a4.sources = r1
a4.channels = c1
a4.sinks = k1
#具体定义source
a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /home/bigdata/data/training/logs
#具体定义channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100
#具体定义sink
a4.sinks = k1
a4.sinks.k1.type = avro
a4.sinks.k1.channel = c1
a4.sinks.k1.hostname = 192.168.123.1
a4.sinks.k1.port = 1234
#组装source、channel、sink
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1
- 第二步:Spark Streaming程序
object MyFlumeStream {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(5))
//创建FlumeEvent的DStream
val flumeDStream = FlumeUtils.createStream(ssc,"192.168.123.1",1234)
//将FlumeEvent中的事件转成字符串
val data = flumeDStream.map( e => {
new String(e.event.getBody.array)
})
//输出结果
data.print()
ssc.start()
ssc.awaitTermination()
}
}
-
第三步:注意除了需要使用Flume的lib的jar包以外,还需要以下jar包:
spark-streaming-flume_2.10-2.1.0.jar -
第四步:测试
启动Spark Streaming程序
启动Flume
bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console
拷贝日志文件到/root/training/logs目录
观察输出,采集到数据
基于Custom Sink的Pull模式
不同于Flume直接将数据推送到Spark Streaming中,第二种模式通过以下条件运行一个正常的Flume sink。Flume将数据推送到sink中,并且数据保持buffered状态。Spark Streaming使用一个可靠的Flume接收器和转换器从sink拉取数据。只要当数据被接收并且被Spark Streaming备份后,转换器才运行成功。
这样,与第一种模式相比,保证了很好的健壮性和容错能力。然而,这种模式需要为Flume配置一个正常的sink。
以下为配置步骤:
- 第一步:Flume的配置文件
#bin/flume-ng agent -n a1 -f myagent/a1.conf -c conf -Dflume.root.logger=INFO,console
a1.channels = c1
a1.sinks = k1
a1.sources = r1
a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/bigdata/data/training/logs
a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 100000
a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = bigdata02
a1.sinks.k1.port = 1234
#组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
- 第二步:Spark Streaming程序
object FlumeLogPull {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
//创建FlumeEvent的DStream
val flumeEvent = FlumeUtils.createPollingStream(ssc,"bigdata02",1234,StorageLevel.MEMORY_ONLY_SER_2)
//将FlumeEvent中的事件转成字符串
val lineDStream = flumeEvent.map( e => {
new String(e.event.getBody.array)
})
//输出结果
lineDStream.print()
ssc.start()
ssc.awaitTermination()
}
}
- 第三步:需要的jar包
spark-streaming-flume-sink_2.10-2.1.0.jar
将Spark的jar包拷贝到Flume的lib目录下
下面的这个jar包也需要拷贝到Flume的lib目录下,同时加入IDEA工程的classpath
- 第四步:测试
启动Flume
bin/flume-ng agent -n a1 -f myagent/a1.conf -c conf -Dflume.root.logger=INFO,console
在IDEA中启动FlumeLogPull
将测试数据拷贝到/root/training/logs
观察IDEA中的输出
2、Spark Streaming接收Kafka数据
Apache Kafka是一种高吞吐量的分布式发布订阅消息系统。
- 启动Kafka
集群个节点全部执行
cd /home/bigdata/apps/kafka_2.11-0.10.0.0/
bin/kafka-server-start.sh -daemon config/server.properties
- 测试Kafka
创建Topic
bin/kafka-topics.sh --create --zookeeper bigdata02:2181 -replication-factor 1 --partitions 3 --topic mydemo1
发送消息
bin/kafka-console-producer.sh --broker-list bigdata02:9092 --topic mydemo1
接收消息
bin/kafka-console-consumer.sh --zookeeper bigdata02:2181 --topic mydemo1
搭建Spark Streaming和Kafka的集成开发环境
由于Spark Streaming和Kafka集成的时候,依赖的jar包比较多,而且还会产生冲突。强烈建议使用Maven的方式来搭建项目工程。
下面是依赖的pom.xml文件:
<properties>
<spark.version>2.1.0</spark.version>
<scala.version>2.11</scala.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-core_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-sql_${scala.version}</artifactId>
<version>${spark.version}</version>
</dependency>
<dependency>
<groupId>org.apache.spark</groupId>
<artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
<version>2.1.1</version>
</dependency>
</dependencies>
基于Receiver的方式
这个方法使用了Receivers来接收数据。Receivers的实现使用到Kafka高层次的消费者API。对于所有的Receivers,接收到的数据将会保存在Spark executors中,然后由Spark Streaming启动的Job来处理这些数据。
- 开发Spark Streaming的Kafka Receivers
object KafkaWordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
//创建topic名称,1表示一次从这个topic中获取一条记录
val topics = Map("mydemo1" ->1)
//创建Kafka的输入流,指定ZooKeeper的地址
val kafkaStream = KafkaUtils.createStream(ssc,"bigdata02:2181","mygroup",topics)
//处理每次接收到的数据
val lineDStream = kafkaStream.map(e => {
new String(e.toString())
})
//输出结果
lineDStream.print()
ssc.start()
ssc.awaitTermination()
}
}
- 测试
启动Kafka消息的生产者
bin/kafka-console-producer.sh --broker-list bigdata02:9092 --topic mydemo1
在IDEA中启动任务,接收Kafka消息
直接读取方式
和基于Receiver接收数据不一样,这种方式定期地从Kafka的topic+partition中查询最新的偏移量,再根据定义的偏移量范围在每个batch里面处理数据。当作业需要处理的数据来临时,spark通过调用Kafka的简单消费者API读取一定范围的数据。
- 开发Spark Streaming的程序
object DirectKafkaWordCount {
def main(args: Array[String]) {
val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
val ssc = new StreamingContext(conf, Seconds(10))
//创建topic名称,1表示一次从这个topic中获取一条记录
val topics = Set("mydemo1")
//指定Kafka的broker地址
val kafkaParams = Map[String, String]("metadata.broker.list" -> "bigdata02:9092")
//创建DStream,接收Kafka的数据
val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
//处理每次接收到的数据
val lineDStream = kafkaStream.map(e => {
new String(e.toString())
})
//输出结果
lineDStream.print()
ssc.start()
ssc.awaitTermination()
}
}
- 测试
启动Kafka消息的生产者
bin/kafka-console-producer.sh --broker-list bigdata02:9092 --topic mydemo1
在IDEA中启动任务,接收Kafka消息
四、性能优化
1、减少批数据的执行时间
在Spark中有几个优化可以减少批处理的时间:
① 数据接收的并行水平
通过网络(如kafka,flume,socket等)接收数据需要这些数据反序列化并被保存到Spark中。如果数据接收成为系统的瓶颈,就要考虑并行地接收数据。注意,每个输入DStream创建一个receiver(运行在worker机器上)接收单个数据流。创建多个输入DStream并配置它们可以从源中接收不同分区的数据流,从而实现多数据流接收。例如,接收两个topic数据的单个输入DStream可以被切分为两个kafka输入流,每个接收一个topic。这将在两个worker上运行两个receiver,因此允许数据并行接收,提高整体的吞吐量。多个DStream可以被合并生成单个DStream,这样运用在单个输入DStream的transformation操作可以运用在合并的DStream上。
② 数据处理的并行水平
如果运行在计算stage上的并发任务数不足够大,就不会充分利用集群的资源。默认的并发任务数通过配置属性来确定spark.default.parallelism。
③ 数据序列化
可以通过改变序列化格式来减少数据序列化的开销。在流式传输的情况下,有两种类型的数据会被序列化:
-
输入数据
-
由流操作生成的持久RDD
在上述两种情况下,使用Kryo序列化格式可以减少CPU和内存开销。
2、设置正确的批容量
为了Spark Streaming应用程序能够在集群中稳定运行,系统应该能够以足够的速度处理接收的数据(即处理速度应该大于或等于接收数据的速度)。这可以通过流的网络UI观察得到。批处理时间应该小于批间隔时间。
根据流计算的性质,批间隔时间可能显著的影响数据处理速率,这个速率可以通过应用程序维持。可以考虑WordCountNetwork这个例子,对于一个特定的数据处理速率,系统可能可以每2秒打印一次单词计数(批间隔时间为2秒),但无法每500毫秒打印一次单词计数。所以,为了在生产环境中维持期望的数据处理速率,就应该设置合适的批间隔时间(即批数据的容量)。
找出正确的批容量的一个好的办法是用一个保守的批间隔时间(5-10,秒)和低数据速率来测试你的应用程序。
3、内存调优
在这一节,我们重点介绍几个强烈推荐的自定义选项,它们可以减少Spark Streaming应用程序垃圾回收的相关暂停,获得更稳定的批处理时间。
-
Default persistence level of DStreams:和RDDs不同的是,默认的持久化级别是序列化数据到内存中(DStream是StorageLevel.MEMORY_ONLY_SER,RDD是StorageLevel.MEMORY_ONLY)。即使保存数据为序列化形态会增加序列化/反序列化的开销,但是可以明显的减少垃圾回收的暂停。
-
Clearing persistent RDDs:默认情况下,通过Spark内置策略(LUR),Spark Streaming生成的持久化RDD将会从内存中清理掉。如果spark.cleaner.ttl已经设置了,比这个时间存在更老的持久化RDD将会被定时的清理掉。正如前面提到的那样,这个值需要根据Spark Streaming应用程序的操作小心设置。然而,可以设置配置选项spark.streaming.unpersist为true来更智能的去持久化(unpersist)RDD。这个配置使系统找出那些不需要经常保有的RDD,然后去持久化它们。这可以减少Spark RDD的内存使用,也可能改善垃圾回收的行为。
-
Concurrent garbage collector:使用并发的标记-清除垃圾回收可以进一步减少垃圾回收的暂停时间。尽管并发的垃圾回收会减少系统的整体吞吐量,但是仍然推荐使用它以获得更稳定的批处理时间。
网友评论