美文网首页Spark Tour
[译]Spark Streaming + Kafka集成指南

[译]Spark Streaming + Kafka集成指南

作者: steanxy | 来源:发表于2017-06-28 19:55 被阅读466次

    本文适用于Kafka broker 0.8.2.1及更高版本。

    这里会说明如何配置Spark Streaming接收Kafka的数据。有两种方法 - 老方法使用Receiver和Kafka的高层API,新方法不适用Receiver。两种方法具有不同的编程模型,性能特点和语义保证,下面具体介绍。两种方法对于当前版本的Spark(2.1.1)都有稳定的API。

    方法1:基于Receiver的方法

    这个方法使用Receiver接收数据。Receiver使用Kafka的高层消费者API实现。和所有receiver一样,通过Receiver从Kafka接收的数据存储到Spark executor中,然后由Spark Streaming启动的作业处理这些数据。

    但是,在默认配置下,这种方法会在出错时出现数据丢失(具体参见receiver reliability。为了保证零数据丢失,必须在Spark Streaming中额外启用Write Ahead Logs)。这样会同步保存所有接收到的Kafka数据到分布式文件系统(如HDFS)中,所有数据都可以从出错中进行恢复。

    下面,讨论如何使用这种方法编写streaming应用程序。

    1. 链接:对于使用SBT/Maven工程定义的Scala/Java应用程序,需要将你的streaming应用程序链接到下面的artifact。
     groupId = org.apache.spark
     artifactId = spark-streaming-kafka-0-8_2.11
     version = 2.1.1
    

    对于Python应用程序,必须字部署用用程序时添加上面的库及其依赖。参见下面的部署章节。

    1. 编程:在streaming应用程序代码中,引入KafkaUtils并创建一个输入DStream,如下。
     import org.apache.spark.streaming.kafka._
    
     val kafkaStream = KafkaUtils.createStream(streamingContext,
         [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
    

    需要记住的几点:

    • Kafka中的Topic分区和Spark Streaming中的RDD分区是不相关的。所以在KafkaUtils.createStream()增加指定topic分区数量只会增加单个receiver中消费topic的线程数量。不会增加Spark处理数据的并行性。
    • 对于不同group和topic可以创建多个Kafka输入DStream,使用多个receiver并行接收数据。
    • 如果已经启用了Write Ahead Logs,接收的数据会被复制到日志中。因此,需要将输入流的存储级别设置为StorageLevel.MEMORY_AND_DISK_SER(即KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER))。
    1. 部署:对于任何Spark应用程序,spark-submit用于启动应用程序。但是,对于Scala/Java应用程序和Python应用程序有些不同。

    对于Scala和Java应用程序,如果使用了SBT或者Maven管理项目,则会将spark-streaming-kafka-0-8_2.11及其依赖打包到应用程序JAR包中。确保spark-core_2.11spark-streaming_2.11标记为provided,它们在Spark安装包中已经存在了。然后使用spark-submit启动应用程序。

    对于Python应用程序缺少了SBT/Maven项目管理,需要将spark-streaming-kafka-0-8_2.11及其依赖直接添加到spark-submit,使用--packages(具体参见Application Submission Guide)。如下:

    ./bin/spark-submit --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.1.1 ...
    

    另外,也可以从Maven repository下载spark-streaming-kafka-0-8-assembly的JAR包,然后使用--jars添加到spark-submit

    方法2:直接方法(不适用Receiver)
    这种新方法从Spark 1.3开始支持,具有更强的端到端保证。和使用receiver接收数据不同,这种方法会周期性地查询Kafka每个topic+partition最新的偏移量,然后根据定义的偏移量范围在每个批次中处理数据。当处理数据的作业启动后,Kafka的简单消费者API会被用来读取定义偏移量范围的数据(和从文件系统中读取文件类似)。注意,这个特性是从Spark 1.3开始支持Scala和Java API,从Spark 1.4开始支持Python API。

    这种方法相比于基于receiver的方法具有以下优势:

    • 简化并行:不需要创建多个输入Kafka流,然后合并它们。使用directStream,Spark Streaming会创建和Kafka分区一样多的RDD分区进行消费,会并行读取Kafka的数据。所以Kafka分区和RDD分区会有一一对应,更容易理解和使用。
    • 效率:方法1中实现零数据丢失需要将数据存储到Write Ahead Log,这会复制一遍数据。这实际上是低效的,因为数据复制了两次,一次是Kafka,一次是Write Ahead Log。方法2解决了这个问题,因为没有receiver,也就不需要Write Ahead Logs。只要有足够的Kafka缓冲,可以从Kafka恢复消息。
    • 只有一次语义:方法1使用Kafka的高层API在Zookeeper中存储消费的偏移量。这是从Kafka消费数据的传统方法。虽然这种方法(结合write ahead logs)可以保证零数据丢失(即至少一次语义),但是还是会有一些情况会在出错时导致一些记录被消费两次。这是因为Spark Streaming接收数据和Zookeeper跟踪的偏移量不一致导致的。因此,在方法2中,使用了简单Kafka API不适用Zookeeper。偏移量是在Spark Streaming的检查点中跟踪的。这就消除了Spark Streaming和Zookeeper/Kafka的不一致,每条记录都只会被Spark Streaming接收一次,即便在出错的情况下。为了实现结果输出的只有一次语义,数据存到外部存储的输出操作必须是幂等的,或是保存结果和偏移量的原子事务。

    注意,这种方法的一个劣势是不在Zookeeper中更新偏移量,因此基于Zookeeper的Kafka监控工具就无法显示进度。但是,可以在每个批次中访问偏移量,然后自己更新到Zookeeper中。

    下面,讨论如何使用这种方法编程。

    1. 链接:这种方法只有Scala/Java应用程序支持。SBT/Maven工程链接下面的artifact。
     groupId = org.apache.spark
     artifactId = spark-streaming-kafka-0-8_2.11
     version = 2.1.1
    
    1. 编程:在streaming应用程序代码中,引入KafkaUtils然后创建一个输入DStream,如下。
     import org.apache.spark.streaming.kafka._
    
     val directKafkaStream = KafkaUtils.createDirectStream[
         [key class], [value class], [key decoder class], [value decoder class] ](
         streamingContext, [map of Kafka parameters], [set of topics to consume])
    

    可传递messageHandlercreateDirectStream,用于访问包含当前消息元数据的MessageAndMetadata并转为想要的格式。可参见API docsexample

    在Kafka参数中,必须指定metadata.broker.listbootstrap.servers。默认地,会从每个Kafka分区的最近偏移量开始消费。如果设置了配置auto.offset.resetsmallest,则会从最小的偏移量开始。

    也可以从任意偏移量开始消费,使用其它KafkaUtils.createDirectStream变量。另外,如果想要访问每个批次范根的Kafka偏移量,方法如下。

     // Hold a reference to the current offset ranges, so it can be used downstream
     var offsetRanges = Array.empty[OffsetRange]
    
     directKafkaStream.transform { rdd =>
       offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
       rdd
     }.map {
               ...
     }.foreachRDD { rdd =>
       for (o <- offsetRanges) {
         println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
       }
       ...
     }
    

    如果想要使用基于Zookeeper的Kafka监控工具显示streaming应用程序的过程,可使用上述代码自己更新偏移量的信息到Zookeeper。

    注意,HasOffsetRanges只在directKafkaStream调用的第一个方法中可以成功获取。可以使用transform()不用foreachRDD()作为第一个方法调用以便访问偏移量,然后再调用更多Spark方法。但是,需要意识到RDD分区和Kafka分区的一一对应关系在调用了shuffle或者repartition方法(如reduceByKey()或window())后就不存在了。

    另外需要注意的是,由于这种方法不使用Receiver,标准receiver(spark.streaming.receiver.*配置相关)不能应用于这里的输入DStream。相反,使用spark.streaming.kafka.*配置。非常重要的一个是spark.streaming.kafka.maxRatePerPartition设置每个Kafka分区通过直接API读取的最大速率(每秒钟的记录数)。

    1. 部署:和方法1一样的。

    相关文章

      网友评论

        本文标题:[译]Spark Streaming + Kafka集成指南

        本文链接:https://www.haomeiwen.com/subject/sfmucxtx.html