美文网首页Apache Kafka@IT·大数据
SparkStreaming入门教程(三)高级输入源:Flume

SparkStreaming入门教程(三)高级输入源:Flume

作者: 胖滚猪学编程 | 来源:发表于2018-02-21 22:44 被阅读81次

    SparkStreaming+Kafka


    SparkStreaming整合Kafka有两种方式,一种是基于接收器的方法,另一种是直接方法(无接收器)。

    • Receiver方式:由Spark executors中的Receiver来接收kafka中的数据。
    • Direct方式:此方法不使用接收器接收数据,而是周期性查询Kafka中每个主题+分区中的最新偏移量,并相应地定义要在每批中处理的偏移量范围。处理数据的作业启动后,Kafka consumerAPI读取Kafka中定义的偏移量范围(类似于从文件系统读取文件)。

    由于Direct相比Receiver有诸多优势:简化并行性、效率高等,因此我们选择Direct方式。

    API文档:KafkaUtils

    def createStream[K, V, U <: Decoder[], T <: Decoder[]](ssc: StreamingContext, kafkaParams: Map[String, String], topics: Map[String, Int], storageLevel: StorageLevel)(implicit arg0: ClassTag[K], arg1: ClassTag[V], arg2: ClassTag[U], arg3: ClassTag[T]): ReceiverInputDStream[(K, V)]
    K:Kafka消息中key的类型,例如String
    V:Kafka消息中value的类型
    U:Kafka message key 的解码器,例如StringDecoder
    T:Kafka message value 的解码器
    ssc:StreamingContext 对象
    kafkaParams:KafKa配置参数的Map集合
    topics:streaming需消费的kafka中topics的集合.
    storageLevel:存储级别,如仅基于内存或仅基于磁盘
    returns DStream of (Kafka message key, Kafka message value)

    编写代码:

    • maven配置pom.xml
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-kafka-0-8_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    
    • DirectKafkaWordCount.scala
    import kafka.serializer.StringDecoder
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.kafka._
    import org.apache.spark.SparkConf
    
    object DirectKafkaWordCount {
    def main(args: Array[String]) {
    
      val Array(brokers, topics) = args
    
      val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount")
      val ssc = new StreamingContext(sparkConf, Seconds(5))
      //kafka的topic集合,即可以订阅多个topic,args传参的时候用,隔开
      val topicsSet = topics.split(",").toSet
      //设置kafka参数,定义brokers集合
      val kafkaParams = Map[String, String]("metadata.broker.list" -> brokers)
      val messages = KafkaUtils.createDirectStream[String, String, StringDecoder, StringDecoder](
        ssc, kafkaParams, topicsSet)
      print("---------:" +messages)
    
      val lines = messages.map(_._2)
      val words = lines.flatMap(_.split(" "))
      val wordCounts = words.map(x => (x, 1)).reduceByKey(_ + _)
      wordCounts.print()
    
      ssc.start()
      ssc.awaitTermination()
     }
    }
    

    程序运行:

    • IDEA中运行sparkstreaming程序,注意传参master:9092 test
    • 启动kafka服务
      bin/kafka-server-start.sh config/server.properties
    • 生产者实时生产数据
      bin/kafka-console-producer.sh --broker-list master:9092 --topic test
    • 可以看到IDEA的控制台中,spark实时处理了来自kafka的数据
    kafka + sparkstreaming.gif

    SparkStreaming+Flume


    Flume-style Push-based Approach

    推式接收器:Spark Streaming设置了一个接收器,该接收器作为Flume的Avro代理,以Avro数据池的方式工作,Flume可以将数据推送到该接收器

    • 配置Flume:把数据发到Avro数据池

    cd /usr/local/flume-1.6.0-cdh5.11.1/conf/
    vim flume-spark.conf

    a1.sources = r1
    a1.channels = c1
    a1.sinks = k1
    
    a1.sources.r1.type = spooldir
    a1.sources.r1.spoolDir = /usr/local/flume-1.6.0-cdh5.11.1/spool-test/spooldir
    a1.sources.r1.channels = c1
    
    a1.channels.c1.type = file
    a1.channels.c1.useDualCheckpoints = true
    a1.channels.c1.backupCheckpointDir = /usr/local/flume-1.6.0-cdh5.11.1/spool-test/channel_check_back
    a1.channels.c1.checkpointDir = /usr/local/flume-1.6.0-cdh5.11.1/spool-test/channel_check
    a1.channels.c1.dataDirs = /usr/local/flume-1.6.0-cdh5.11.1/spool-test/channel_data
    
    a1.sinks.k1.type = avro
    a1.sinks.k1.channel = c1
    a1.sinks.k1.hostname = master
    a1.sinks.k1.port = 9998
    
    • 编写Spark代码

    def createStream(ssc: StreamingContext, hostname: String, port: Int, storageLevel: StorageLevel = StorageLevel.MEMORY_AND_DISK_SER_2): ReceiverInputDStream[SparkFlumeEvent]
    Create a input stream from a Flume source.

    SparkFlume .scala

    package com.lyl.spark
    import org.apache.spark.SparkConf
    import org.apache.spark.streaming._
    import org.apache.spark.streaming.flume.FlumeUtils
    object SparkFlume {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Flume-Spark")
        val ssc = new StreamingContext(conf,Seconds(5))
        val lines = FlumeUtils.createStream(ssc,"master",9998)//FlumeUtils把接收器配置在特点的主机名和端口上,必须与flume中配置的端口吻合
        lines.count().map(x => "Received: " + x +"events").print()
        ssc.start()
        ssc.awaitTermination()
      }
    }
    

    pom.xml

    <!-- https://mvnrepository.com/artifact/org.apache.spark/spark-streaming-flume -->
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-streaming-flume_2.11</artifactId>
        <version>2.2.0</version>
    </dependency>
    
    • 程序运行

    (1)打包jar放入spark目录下,提交运行:
    bin/spark-submit --master local[2] --class com.lyl.spark.SparkFlume SparkStreamStudy.jar
    (2)启动flume
    bin/flume-ng agent --conf conf --name a1 --conf-file conf/flume-spark.conf
    (3)实时传输文件进入之前flume配置中定义好的目录中(a1.sources.r1.spoolDir = /usr/local/flume-1.6.0-cdh5.11.1/spool-test/spooldir)。

    [root@master spooldir]# ll
    total 4
    -rw-r--r-- 1 root root 7 Feb 12 02:16 test.txt
    [root@master spooldir]# cp test.txt test2.txt
    [root@master spooldir]# cp test.txt test3.txt
    [root@master spooldir]# cp test.txt test4.txt
    
    • 可以看到Spark控制台实时输出event个数。

    虽然这种方式简单,但是它没有事物支持,会增加运行接收器的工作节点发生错误而丢失数据的概率,不仅如此,如果运行接收器的节点故障,系统会尝试从另一个位置启动接收器,这时候需要重新配置flume才能将数据发给新节点,这比较麻烦,因此我们引进拉式接收器

    Pull-based Approach using a Custom Sink

    拉式接收器:该接收器可以从自定义的中间数据池中主动的拉取数据,而其他进程可以使用Flume把数据推进该中间数据池。spark Streaming使用可靠的Flume接收器和事务机制从该中间数据池中提取并复制数据,在收到事物成功完成的通知前,这些数据还保留在数据池中。

    补充:

    可靠的接收器 - 当数据已被接收并确认已经存储在Spark中时,可靠的接收器会向源发送确认。具有强大的容错保证,可确保零数据丢失。

    不可靠的接收器 - 不会向源发送确认信息。优点是简单实施。

    由于和Push方式类似,请自行参考:Spark Streaming + Flume Integration Guide

    相关文章

      网友评论

        本文标题:SparkStreaming入门教程(三)高级输入源:Flume

        本文链接:https://www.haomeiwen.com/subject/eccotftx.html