美文网首页
Spark学习之Spark Streaming(二)

Spark学习之Spark Streaming(二)

作者: 羋学僧 | 来源:发表于2020-10-19 21:15 被阅读0次

    三、高级数据源

    1、Spark Streaming接收Flume数据

      基于Flume的Push模式

    Flume被用于在Flume agents之间推送数据.在这种方式下,Spark Streaming可以很方便的建立一个receiver,起到一个Avro agent的作用.Flume可以将数据推送到改receiver.

    • 第一步:Flume的配置文件
    #bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console
    #定义agent名, source、channel、sink的名称
    a4.sources = r1
    a4.channels = c1
    a4.sinks = k1
    
    #具体定义source
    a4.sources.r1.type = spooldir
    a4.sources.r1.spoolDir = /home/bigdata/data/training/logs
    
    #具体定义channel
    a4.channels.c1.type = memory
    a4.channels.c1.capacity = 10000
    a4.channels.c1.transactionCapacity = 100
    
    #具体定义sink
    a4.sinks = k1
    a4.sinks.k1.type = avro
    a4.sinks.k1.channel = c1
    a4.sinks.k1.hostname = 192.168.123.1
    a4.sinks.k1.port = 1234
    
    #组装source、channel、sink
    a4.sources.r1.channels = c1
    a4.sinks.k1.channel = c1
    
    • 第二步:Spark Streaming程序
    object MyFlumeStream {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(5))
    
        //创建FlumeEvent的DStream
        val flumeDStream = FlumeUtils.createStream(ssc,"192.168.123.1",1234)
    
        //将FlumeEvent中的事件转成字符串
        val data = flumeDStream.map( e => {
           new String(e.event.getBody.array)
        })
    
        //输出结果
        data.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    • 第三步:注意除了需要使用Flume的lib的jar包以外,还需要以下jar包:
      spark-streaming-flume_2.10-2.1.0.jar

    • 第四步:测试

    启动Spark Streaming程序

    启动Flume

    bin/flume-ng agent -n a4 -f myagent/a4.conf -c conf -Dflume.root.logger=INFO,console
    

    拷贝日志文件到/root/training/logs目录

    观察输出,采集到数据



      基于Custom Sink的Pull模式

    不同于Flume直接将数据推送到Spark Streaming中,第二种模式通过以下条件运行一个正常的Flume sink。Flume将数据推送到sink中,并且数据保持buffered状态。Spark Streaming使用一个可靠的Flume接收器和转换器从sink拉取数据。只要当数据被接收并且被Spark Streaming备份后,转换器才运行成功。

    这样,与第一种模式相比,保证了很好的健壮性和容错能力。然而,这种模式需要为Flume配置一个正常的sink。

    以下为配置步骤:

    • 第一步:Flume的配置文件
    #bin/flume-ng agent -n a1 -f myagent/a1.conf -c conf -Dflume.root.logger=INFO,console
    a1.channels = c1
    a1.sinks = k1
    a1.sources = r1
    
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /home/bigdata/data/training/logs
    
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 100000
    a1.channels.c1.transactionCapacity = 100000
    
    a1.sinks.k1.type = org.apache.spark.streaming.flume.sink.SparkSink
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hostname = bigdata02
    a1.sinks.k1.port = 1234
    
    #组装source、channel、sink
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    
    • 第二步:Spark Streaming程序
    object FlumeLogPull {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(10))
    
        //创建FlumeEvent的DStream
        val flumeEvent = FlumeUtils.createPollingStream(ssc,"bigdata02",1234,StorageLevel.MEMORY_ONLY_SER_2)
    
        //将FlumeEvent中的事件转成字符串
        val lineDStream = flumeEvent.map( e => {
          new String(e.event.getBody.array)
        })
    
        //输出结果
        lineDStream.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    
    • 第三步:需要的jar包

    spark-streaming-flume-sink_2.10-2.1.0.jar

    将Spark的jar包拷贝到Flume的lib目录下

    下面的这个jar包也需要拷贝到Flume的lib目录下,同时加入IDEA工程的classpath

    • 第四步:测试

    启动Flume

    bin/flume-ng agent -n a1 -f myagent/a1.conf -c conf -Dflume.root.logger=INFO,console
    

    在IDEA中启动FlumeLogPull

    将测试数据拷贝到/root/training/logs

    观察IDEA中的输出


    2、Spark Streaming接收Kafka数据

    Apache Kafka是一种高吞吐量的分布式发布订阅消息系统。


    • 启动Kafka
      集群个节点全部执行
    cd /home/bigdata/apps/kafka_2.11-0.10.0.0/
    
    bin/kafka-server-start.sh -daemon config/server.properties
    
    • 测试Kafka

    创建Topic

    bin/kafka-topics.sh --create --zookeeper bigdata02:2181 -replication-factor 1 --partitions 3 --topic mydemo1
    

    发送消息

    bin/kafka-console-producer.sh --broker-list bigdata02:9092 --topic mydemo1
    

    接收消息

    bin/kafka-console-consumer.sh --zookeeper bigdata02:2181 --topic mydemo1
    

    搭建Spark Streaming和Kafka的集成开发环境

    由于Spark Streaming和Kafka集成的时候,依赖的jar包比较多,而且还会产生冲突。强烈建议使用Maven的方式来搭建项目工程

    下面是依赖的pom.xml文件:

      <properties>
            <spark.version>2.1.0</spark.version>
            <scala.version>2.11</scala.version>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_${scala.version}</artifactId>
                <version>${spark.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
                <version>2.1.1</version>
            </dependency>
        </dependencies>
    

    基于Receiver的方式

    这个方法使用了Receivers来接收数据。Receivers的实现使用到Kafka高层次的消费者API。对于所有的Receivers,接收到的数据将会保存在Spark executors中,然后由Spark Streaming启动的Job来处理这些数据。


    • 开发Spark Streaming的Kafka Receivers
    object KafkaWordCount {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(10))
    
        //创建topic名称,1表示一次从这个topic中获取一条记录
        val topics = Map("mydemo1" ->1)
    
        //创建Kafka的输入流,指定ZooKeeper的地址
        val kafkaStream = KafkaUtils.createStream(ssc,"bigdata02:2181","mygroup",topics)
    
        //处理每次接收到的数据
        val lineDStream = kafkaStream.map(e => {
          new String(e.toString())
        })
        //输出结果
        lineDStream.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    • 测试

    启动Kafka消息的生产者

    bin/kafka-console-producer.sh --broker-list bigdata02:9092 --topic mydemo1 
    

    在IDEA中启动任务,接收Kafka消息

    直接读取方式

    和基于Receiver接收数据不一样,这种方式定期地从Kafka的topic+partition中查询最新的偏移量,再根据定义的偏移量范围在每个batch里面处理数据。当作业需要处理的数据来临时,spark通过调用Kafka的简单消费者API读取一定范围的数据。


    • 开发Spark Streaming的程序
    object DirectKafkaWordCount {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("SparkFlumeNGWordCount").setMaster("local[2]")
        val ssc = new StreamingContext(conf, Seconds(10))
    
        //创建topic名称,1表示一次从这个topic中获取一条记录
        val topics = Set("mydemo1")
        //指定Kafka的broker地址
        val kafkaParams = Map[String, String]("metadata.broker.list" -> "bigdata02:9092")
    
        //创建DStream,接收Kafka的数据
        val kafkaStream = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topics)
    
        //处理每次接收到的数据
        val lineDStream = kafkaStream.map(e => {
          new String(e.toString())
        })
        //输出结果
        lineDStream.print()
    
        ssc.start()
        ssc.awaitTermination()
      }
    }
    
    
    • 测试

    启动Kafka消息的生产者

    bin/kafka-console-producer.sh --broker-list bigdata02:9092 --topic mydemo1
    

    在IDEA中启动任务,接收Kafka消息


    四、性能优化

    1、减少批数据的执行时间

    在Spark中有几个优化可以减少批处理的时间:

    ① 数据接收的并行水平

    通过网络(如kafka,flume,socket等)接收数据需要这些数据反序列化并被保存到Spark中。如果数据接收成为系统的瓶颈,就要考虑并行地接收数据。注意,每个输入DStream创建一个receiver(运行在worker机器上)接收单个数据流。创建多个输入DStream并配置它们可以从源中接收不同分区的数据流,从而实现多数据流接收。例如,接收两个topic数据的单个输入DStream可以被切分为两个kafka输入流,每个接收一个topic。这将在两个worker上运行两个receiver,因此允许数据并行接收,提高整体的吞吐量。多个DStream可以被合并生成单个DStream,这样运用在单个输入DStream的transformation操作可以运用在合并的DStream上。


    ② 数据处理的并行水平

    如果运行在计算stage上的并发任务数不足够大,就不会充分利用集群的资源。默认的并发任务数通过配置属性来确定spark.default.parallelism

    ③ 数据序列化

    可以通过改变序列化格式来减少数据序列化的开销。在流式传输的情况下,有两种类型的数据会被序列化:

    • 输入数据

    • 由流操作生成的持久RDD

    在上述两种情况下,使用Kryo序列化格式可以减少CPU和内存开销。

    2、设置正确的批容量

    为了Spark Streaming应用程序能够在集群中稳定运行,系统应该能够以足够的速度处理接收的数据(即处理速度应该大于或等于接收数据的速度)。这可以通过流的网络UI观察得到。批处理时间应该小于批间隔时间。

    根据流计算的性质,批间隔时间可能显著的影响数据处理速率,这个速率可以通过应用程序维持。可以考虑WordCountNetwork这个例子,对于一个特定的数据处理速率,系统可能可以每2秒打印一次单词计数(批间隔时间为2秒),但无法每500毫秒打印一次单词计数。所以,为了在生产环境中维持期望的数据处理速率,就应该设置合适的批间隔时间(即批数据的容量)。

    找出正确的批容量的一个好的办法是用一个保守的批间隔时间(5-10,秒)和低数据速率来测试你的应用程序。

    3、内存调优

    在这一节,我们重点介绍几个强烈推荐的自定义选项,它们可以减少Spark Streaming应用程序垃圾回收的相关暂停,获得更稳定的批处理时间。

    •   Default persistence level of DStreams:和RDDs不同的是,默认的持久化级别是序列化数据到内存中(DStream是StorageLevel.MEMORY_ONLY_SER,RDD是StorageLevel.MEMORY_ONLY)。即使保存数据为序列化形态会增加序列化/反序列化的开销,但是可以明显的减少垃圾回收的暂停。

    •   Clearing persistent RDDs:默认情况下,通过Spark内置策略(LUR),Spark Streaming生成的持久化RDD将会从内存中清理掉。如果spark.cleaner.ttl已经设置了,比这个时间存在更老的持久化RDD将会被定时的清理掉。正如前面提到的那样,这个值需要根据Spark Streaming应用程序的操作小心设置。然而,可以设置配置选项spark.streaming.unpersist为true来更智能的去持久化(unpersist)RDD。这个配置使系统找出那些不需要经常保有的RDD,然后去持久化它们。这可以减少Spark RDD的内存使用,也可能改善垃圾回收的行为。

    •   Concurrent garbage collector:使用并发的标记-清除垃圾回收可以进一步减少垃圾回收的暂停时间。尽管并发的垃圾回收会减少系统的整体吞吐量,但是仍然推荐使用它以获得更稳定的批处理时间。

    相关文章

      网友评论

          本文标题:Spark学习之Spark Streaming(二)

          本文链接:https://www.haomeiwen.com/subject/gfvlmktx.html