美文网首页
Spark Streaming kafka集成

Spark Streaming kafka集成

作者: lj72808up | 来源:发表于2019-11-06 00:00 被阅读0次

Spark Streaming集成Kafka的方式有3种:

  • 早期版本的Receiver方式
  • spark1.3以后的Direct方式
  • spark2.4版本后出现使用新kafka api的读取方式, 类似于direct方式, kafka partition和rdd partition一一对应

方法1: Receiver-based Approach

这种方法需要一个Receiver来接收数据. Receiver使用kafka的high level消费者api. 对于所有的Receiver, 从kafka中接收的数据会存储在多个executor的BlockManager中, 然后再启动job进行数据处理. 但是在默认情况下, 这种方式可能会丢数据, 所以需要启动WAL(write ahead log)日志, 所有从kafka接收的数据都会写入到日志中, 并将日志文件存到hdfs, 所以所有的数据都会在任务失败时恢复

使用KafkaUtils来创建一个DStream:

import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
     [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])

有几点需要说明:

  1. kafka的分区数和Spark Streaming的RDD分区数没有关系
    增加KafkaUtils.createStream()方法中的分区个数, 仅仅会增加单个Reciever中消费Topic的线程数
  2. 不同的groupsandtopics会产生不同的DStream, 他们会并行的收集数据
  3. 如果开启了WAL日志功能, 需要设置storageLevel: KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)).

方法2: Direct Approach (No Receivers)

direct方式用来确保端到端的一致性, 这种方式没有Receiver, 而是通过轮训kafka, 获得每个topic+partition最近的offset, 根据定义, 每个批处理内处理一定范围的offset数据. 使用kafka的simple api读取一定范围的offset数据. 该方法相比Reciever方式有如下几个优点:

  1. 简化的并行度:
    不再需要创建多个kafka streams然后union到一起. 使用directStream并行接收数据, kafka的partition个数等于rdd的partition个数. 便于理解和调优
  2. 高效
    方法一中为了达到数据的零丢失, 使用WAL日志功能, 这在失败恢复时从hdfs文件恢复会很低效; 而Direct方式因为没有Reciever也就不需要开启WAL日志, 将使用数据副本的方式存储kafka收到的数据, 失败恢复时仍然很高效
  3. Exactly once语义的保证:
    第一种方式, 即使开启了WAL日志功能, 数据仍可能在很小的可能性下发生二次消费, 这是at least once语义的实现. 因为这种方式是自动提交offset到zookeeper中的; 而第二种方式, 使用simple Kafka API就不需要zookeeper了, offset会被保存在spark Straming的checkpoint中. 这排除了spark streaming和kafka之间的不一致性, 因此可以即使在任务失败时仍可以达到exactly once语义; 此外, 为了完全实现exactly once, 还需要output的外部数据存储是幂等的, 或者可以实现原子事务, 保证offset和save result的一致性

这种方式唯一的不足是不再依赖zookeeper保存offset, 使得依靠zookeeper监控的组件失效, 但是仍然可以在每次批处理后手动更新zookeeper

创建DirectStream:

 import org.apache.spark.streaming.kafka._

 val directKafkaStream = KafkaUtils.createDirectStream[
     [key class], [value class], [key decoder class], [value decoder class] ](
     streamingContext, [map of Kafka parameters], [set of topics to consume])

默认情况下, 从kafka partition的最近offset出开始消费, 如果指定auto.offset.reset=smallest将从最小的offset处开始消费
如果使用了基于zookeeper的kafka监控工具, 可以用以下代码收到过更新zookeeper:

// Hold a reference to the current offset ranges, so it can be used downstream
 var offsetRanges = Array.empty[OffsetRange]

 directKafkaStream.transform { rdd =>
   offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
   rdd
 }.map {
           ...
 }.foreachRDD { rdd =>
   for (o <- offsetRanges) {
     println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
   }
   ...
 }

使用参数spark.streaming.kafka.maxRatePerPartition可以限制每秒最多从kafka的partition中读取多少数据

相关文章

网友评论

      本文标题:Spark Streaming kafka集成

      本文链接:https://www.haomeiwen.com/subject/acjybctx.html