美文网首页
Spark学习之Spark Streaming(二)

Spark学习之Spark Streaming(二)

作者: 羋学僧 | 来源:发表于2020-10-19 21:15 被阅读0次

三、高级数据源

1、Spark Streaming接收Flume数据

  基于Flume的Push模式

Flume被用于在Flume agents之间推送数据.在这种方式下,Spark Streaming可以很方便的建立一个receiver,起到一个Avro agent的作用.Flume可以将数据推送到改receiver.

  • 第一步:Flume的配置文件
#bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console
#定义agent名, source、channel、sink的名称
a4.sources = r1
a4.channels = c1
a4.sinks = k1

#具体定义source
a4.sources.r1.type = spooldir
a4.sources.r1.spoolDir = /home/bigdata/data/training/logs

#具体定义channel
a4.channels.c1.type = memory
a4.channels.c1.capacity = 10000
a4.channels.c1.transactionCapacity = 100

#具体定义sink
a4.sinks = k1
a4.sinks.k1.type = avro
a4.sinks.k1.channel = c1
a4.sinks.k1.hostname = 192.168.123.1
a4.sinks.k1.port = 1234

#组装source、channel、sink
a4.sources.r1.channels = c1
a4.sinks.k1.channel = c1
  • 第二步:Spark Streaming程序
object MyFlumeStream {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(5))

    //创建FlumeEvent的DStream
    val flumeDStream = FlumeUtils.createStream(ssc,"192.168.123.1",1234)

    //将FlumeEvent中的事件转成字符串
    val data = flumeDStream.map( e => {
       new String(e.event.getBody.array)
    })

    //输出结果
    data.print()

    ssc.start()
    ssc.awaitTermination()
  }
}
  • 第三步:注意除了需要使用Flume的lib的jar包以外,还需要以下jar包:
    spark-streaming-flume_2.10-2.1.0.jar

  • 第四步:测试

启动Spark Streaming程序

启动Flume

bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console

拷贝日志文件到/root/training/logs目录

观察输出,采集到数据



  基于Custom Sink的Pull模式

不同于Flume直接将数据推送到Spark Streaming中,第二种模式通过以下条件运行一个正常的Flume sink。Flume将数据推送到sink中,并且数据保持buffered状态。Spark Streaming使用一个可靠的Flume接收器和转换器从sink拉取数据。只要当数据被接收并且被Spark Streaming备份后,转换器才运行成功。

这样,与第一种模式相比,保证了很好的健壮性和容错能力。然而,这种模式需要为Flume配置一个正常的sink。

以下为配置步骤:

  • 第一步:Flume的配置文件
#bin/flume-ng agent -n a1 -f myagent/a1.conf -c conf -Dflume.root.logger=INFO,console
a1.channels = c1
a1.sinks = k1
a1.sources = r1

a1.sources.r1.type = spooldir
a1.sources.r1.spoolDir = /home/bigdata/data/training/logs

a1.channels.c1.type = memory
a1.channels.c1.capacity = 100000
a1.channels.c1.transactionCapacity = 100000

a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
a1.sinks.k1.channel = c1
a1.sinks.k1.hostname = bigdata02
a1.sinks.k1.port = 1234

#组装source、channel、sink
a1.sources.r1.channels = c1
a1.sinks.k1.channel = c1
  • 第二步:Spark Streaming程序
object FlumeLogPull {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(10))

    //创建FlumeEvent的DStream
    val flumeEvent = FlumeUtils.createPollingStream(ssc,"bigdata02",1234,StorageLevel.MEMORY_ONLY_SER_2)

    //将FlumeEvent中的事件转成字符串
    val lineDStream = flumeEvent.map( e => {
      new String(e.event.getBody.array)
    })

    //输出结果
    lineDStream.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

  • 第三步:需要的jar包

spark-streaming-flume-sink_2.10-2.1.0.jar

将Spark的jar包拷贝到Flume的lib目录下

下面的这个jar包也需要拷贝到Flume的lib目录下,同时加入IDEA工程的classpath

  • 第四步:测试

启动Flume

bin/flume-ng agent -n a1 -f myagent/a1.conf -c conf -Dflume.root.logger=INFO,console

在IDEA中启动FlumeLogPull

将测试数据拷贝到/root/training/logs

观察IDEA中的输出


2、Spark Streaming接收Kafka数据

Apache Kafka是一种高吞吐量的分布式发布订阅消息系统。


  • 启动Kafka
    集群个节点全部执行
cd /home/bigdata/apps/kafka_2.11-0.10.0.0/

bin/kafka-server-start.sh -daemon config/server.properties
  • 测试Kafka

创建Topic

bin/kafka-topics.sh --create --zookeeper bigdata02:2181 -replication-factor 1 --partitions 3 --topic mydemo1

发送消息

bin/kafka-console-producer.sh --broker-list bigdata02:9092 --topic mydemo1

接收消息

bin/kafka-console-consumer.sh --zookeeper bigdata02:2181 --topic mydemo1

搭建Spark Streaming和Kafka的集成开发环境

由于Spark Streaming和Kafka集成的时候,依赖的jar包比较多,而且还会产生冲突。强烈建议使用Maven的方式来搭建项目工程

下面是依赖的pom.xml文件:

  <properties>
        <spark.version>2.1.0</spark.version>
        <scala.version>2.11</scala.version>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_${scala.version}</artifactId>
            <version>${spark.version}</version>
        </dependency>

        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
            <version>2.1.1</version>
        </dependency>
    </dependencies>

基于Receiver的方式

这个方法使用了Receivers来接收数据。Receivers的实现使用到Kafka高层次的消费者API。对于所有的Receivers,接收到的数据将会保存在Spark executors中,然后由Spark Streaming启动的Job来处理这些数据。


  • 开发Spark Streaming的Kafka Receivers
object KafkaWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(10))

    //创建topic名称,1表示一次从这个topic中获取一条记录
    val topics = Map("mydemo1" ->1)

    //创建Kafka的输入流,指定ZooKeeper的地址
    val kafkaStream = KafkaUtils.createStream(ssc,"bigdata02:2181","mygroup",topics)

    //处理每次接收到的数据
    val lineDStream = kafkaStream.map(e => {
      new String(e.toString())
    })
    //输出结果
    lineDStream.print()

    ssc.start()
    ssc.awaitTermination()
  }
}
  • 测试

启动Kafka消息的生产者

bin/kafka-console-producer.sh --broker-list bigdata02:9092 --topic mydemo1 

在IDEA中启动任务,接收Kafka消息

直接读取方式

和基于Receiver接收数据不一样,这种方式定期地从Kafka的topic+partition中查询最新的偏移量,再根据定义的偏移量范围在每个batch里面处理数据。当作业需要处理的数据来临时,spark通过调用Kafka的简单消费者API读取一定范围的数据。


  • 开发Spark Streaming的程序
object DirectKafkaWordCount {
  def main(args: Array[String]) {
    val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
    val ssc = new StreamingContext(conf, Seconds(10))

    //创建topic名称,1表示一次从这个topic中获取一条记录
    val topics = Set("mydemo1")
    //指定Kafka的broker地址
    val kafkaParams = Map[String, String]("metadata.broker.list" -> "bigdata02:9092")

    //创建DStream,接收Kafka的数据
    val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)

    //处理每次接收到的数据
    val lineDStream = kafkaStream.map(e => {
      new String(e.toString())
    })
    //输出结果
    lineDStream.print()

    ssc.start()
    ssc.awaitTermination()
  }
}

  • 测试

启动Kafka消息的生产者

bin/kafka-console-producer.sh --broker-list bigdata02:9092 --topic mydemo1

在IDEA中启动任务,接收Kafka消息


四、性能优化

1、减少批数据的执行时间

在Spark中有几个优化可以减少批处理的时间:

① 数据接收的并行水平

通过网络(如kafka,flume,socket等)接收数据需要这些数据反序列化并被保存到Spark中。如果数据接收成为系统的瓶颈,就要考虑并行地接收数据。注意,每个输入DStream创建一个receiver(运行在worker机器上)接收单个数据流。创建多个输入DStream并配置它们可以从源中接收不同分区的数据流,从而实现多数据流接收。例如,接收两个topic数据的单个输入DStream可以被切分为两个kafka输入流,每个接收一个topic。这将在两个worker上运行两个receiver,因此允许数据并行接收,提高整体的吞吐量。多个DStream可以被合并生成单个DStream,这样运用在单个输入DStream的transformation操作可以运用在合并的DStream上。


② 数据处理的并行水平

如果运行在计算stage上的并发任务数不足够大,就不会充分利用集群的资源。默认的并发任务数通过配置属性来确定spark.default.parallelism

③ 数据序列化

可以通过改变序列化格式来减少数据序列化的开销。在流式传输的情况下,有两种类型的数据会被序列化:

  • 输入数据

  • 由流操作生成的持久RDD

在上述两种情况下,使用Kryo序列化格式可以减少CPU和内存开销。

2、设置正确的批容量

为了Spark Streaming应用程序能够在集群中稳定运行,系统应该能够以足够的速度处理接收的数据(即处理速度应该大于或等于接收数据的速度)。这可以通过流的网络UI观察得到。批处理时间应该小于批间隔时间。

根据流计算的性质,批间隔时间可能显著的影响数据处理速率,这个速率可以通过应用程序维持。可以考虑WordCountNetwork这个例子,对于一个特定的数据处理速率,系统可能可以每2秒打印一次单词计数(批间隔时间为2秒),但无法每500毫秒打印一次单词计数。所以,为了在生产环境中维持期望的数据处理速率,就应该设置合适的批间隔时间(即批数据的容量)。

找出正确的批容量的一个好的办法是用一个保守的批间隔时间(5-10,秒)和低数据速率来测试你的应用程序。

3、内存调优

在这一节,我们重点介绍几个强烈推荐的自定义选项,它们可以减少Spark Streaming应用程序垃圾回收的相关暂停,获得更稳定的批处理时间。

  •   Default persistence level of DStreams:和RDDs不同的是,默认的持久化级别是序列化数据到内存中(DStream是StorageLevel.MEMORY_ONLY_SER,RDD是StorageLevel.MEMORY_ONLY)。即使保存数据为序列化形态会增加序列化/反序列化的开销,但是可以明显的减少垃圾回收的暂停。

  •   Clearing persistent RDDs:默认情况下,通过Spark内置策略(LUR),Spark Streaming生成的持久化RDD将会从内存中清理掉。如果spark.cleaner.ttl已经设置了,比这个时间存在更老的持久化RDD将会被定时的清理掉。正如前面提到的那样,这个值需要根据Spark Streaming应用程序的操作小心设置。然而,可以设置配置选项spark.streaming.unpersist为true来更智能的去持久化(unpersist)RDD。这个配置使系统找出那些不需要经常保有的RDD,然后去持久化它们。这可以减少Spark RDD的内存使用,也可能改善垃圾回收的行为。

  •   Concurrent garbage collector:使用并发的标记-清除垃圾回收可以进一步减少垃圾回收的暂停时间。尽管并发的垃圾回收会减少系统的整体吞吐量,但是仍然推荐使用它以获得更稳定的批处理时间。

相关文章

网友评论

      本文标题:Spark学习之Spark Streaming(二)

      本文链接:https://www.haomeiwen.com/subject/gfvlmktx.html