美文网首页
Spark Streaming kafka集成

Spark Streaming kafka集成

作者: lj72808up | 来源:发表于2019-11-06 00:00 被阅读0次

    Spark Streaming集成Kafka的方式有3种:

    • 早期版本的Receiver方式
    • spark1.3以后的Direct方式
    • spark2.4版本后出现使用新kafka api的读取方式, 类似于direct方式, kafka partition和rdd partition一一对应

    方法1: Receiver-based Approach

    这种方法需要一个Receiver来接收数据. Receiver使用kafka的high level消费者api. 对于所有的Receiver, 从kafka中接收的数据会存储在多个executor的BlockManager中, 然后再启动job进行数据处理. 但是在默认情况下, 这种方式可能会丢数据, 所以需要启动WAL(write ahead log)日志, 所有从kafka接收的数据都会写入到日志中, 并将日志文件存到hdfs, 所以所有的数据都会在任务失败时恢复

    使用KafkaUtils来创建一个DStream:

    import org.apache.spark.streaming.kafka._
    val kafkaStream = KafkaUtils.createStream(streamingContext,
         [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
    

    有几点需要说明:

    1. kafka的分区数和Spark Streaming的RDD分区数没有关系
      增加KafkaUtils.createStream()方法中的分区个数, 仅仅会增加单个Reciever中消费Topic的线程数
    2. 不同的groupsandtopics会产生不同的DStream, 他们会并行的收集数据
    3. 如果开启了WAL日志功能, 需要设置storageLevel: KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)).

    方法2: Direct Approach (No Receivers)

    direct方式用来确保端到端的一致性, 这种方式没有Receiver, 而是通过轮训kafka, 获得每个topic+partition最近的offset, 根据定义, 每个批处理内处理一定范围的offset数据. 使用kafka的simple api读取一定范围的offset数据. 该方法相比Reciever方式有如下几个优点:

    1. 简化的并行度:
      不再需要创建多个kafka streams然后union到一起. 使用directStream并行接收数据, kafka的partition个数等于rdd的partition个数. 便于理解和调优
    2. 高效
      方法一中为了达到数据的零丢失, 使用WAL日志功能, 这在失败恢复时从hdfs文件恢复会很低效; 而Direct方式因为没有Reciever也就不需要开启WAL日志, 将使用数据副本的方式存储kafka收到的数据, 失败恢复时仍然很高效
    3. Exactly once语义的保证:
      第一种方式, 即使开启了WAL日志功能, 数据仍可能在很小的可能性下发生二次消费, 这是at least once语义的实现. 因为这种方式是自动提交offset到zookeeper中的; 而第二种方式, 使用simple Kafka API就不需要zookeeper了, offset会被保存在spark Straming的checkpoint中. 这排除了spark streaming和kafka之间的不一致性, 因此可以即使在任务失败时仍可以达到exactly once语义; 此外, 为了完全实现exactly once, 还需要output的外部数据存储是幂等的, 或者可以实现原子事务, 保证offset和save result的一致性

    这种方式唯一的不足是不再依赖zookeeper保存offset, 使得依靠zookeeper监控的组件失效, 但是仍然可以在每次批处理后手动更新zookeeper

    创建DirectStream:

     import org.apache.spark.streaming.kafka._
    
     val directKafkaStream = KafkaUtils.createDirectStream[
         [key class], [value class], [key decoder class], [value decoder class] ](
         streamingContext, [map of Kafka parameters], [set of topics to consume])
    

    默认情况下, 从kafka partition的最近offset出开始消费, 如果指定auto.offset.reset=smallest将从最小的offset处开始消费
    如果使用了基于zookeeper的kafka监控工具, 可以用以下代码收到过更新zookeeper:

    // Hold a reference to the current offset ranges, so it can be used downstream
     var offsetRanges = Array.empty[OffsetRange]
    
     directKafkaStream.transform { rdd =>
       offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
       rdd
     }.map {
               ...
     }.foreachRDD { rdd =>
       for (o <- offsetRanges) {
         println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
       }
       ...
     }
    

    使用参数spark.streaming.kafka.maxRatePerPartition可以限制每秒最多从kafka的partition中读取多少数据

    相关文章

      网友评论

          本文标题:Spark Streaming kafka集成

          本文链接:https://www.haomeiwen.com/subject/acjybctx.html