Spark Streaming集成Kafka的方式有3种:
- 早期版本的Receiver方式
- spark1.3以后的Direct方式
- spark2.4版本后出现使用新kafka api的读取方式, 类似于direct方式, kafka partition和rdd partition一一对应
方法1: Receiver-based Approach
这种方法需要一个Receiver
来接收数据. Receiver
使用kafka的high level
消费者api. 对于所有的Receiver
, 从kafka中接收的数据会存储在多个executor的BlockManager中, 然后再启动job进行数据处理. 但是在默认情况下, 这种方式可能会丢数据, 所以需要启动WAL(write ahead log
)日志, 所有从kafka接收的数据都会写入到日志中, 并将日志文件存到hdfs, 所以所有的数据都会在任务失败时恢复
使用KafkaUtils
来创建一个DStream:
import org.apache.spark.streaming.kafka._
val kafkaStream = KafkaUtils.createStream(streamingContext,
[ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
有几点需要说明:
- kafka的分区数和Spark Streaming的RDD分区数没有关系
增加KafkaUtils.createStream()
方法中的分区个数, 仅仅会增加单个Reciever
中消费Topic的线程数 - 不同的
groups
andtopics
会产生不同的DStream
, 他们会并行的收集数据 - 如果开启了
WAL
日志功能, 需要设置storageLevel:KafkaUtils.createStream(..., StorageLevel.MEMORY_AND_DISK_SER)).
方法2: Direct Approach (No Receivers)
direct方式用来确保端到端的一致性, 这种方式没有Receiver
, 而是通过轮训kafka, 获得每个topic+partition
最近的offset, 根据定义, 每个批处理内处理一定范围的offset数据. 使用kafka的simple api
读取一定范围的offset数据. 该方法相比Reciever方式有如下几个优点:
- 简化的并行度:
不再需要创建多个kafka streams然后union到一起. 使用directStream
并行接收数据, kafka的partition个数等于rdd的partition个数. 便于理解和调优 - 高效
方法一中为了达到数据的零丢失, 使用WAL日志功能, 这在失败恢复时从hdfs文件恢复会很低效; 而Direct
方式因为没有Reciever也就不需要开启WAL日志, 将使用数据副本的方式存储kafka收到的数据, 失败恢复时仍然很高效 - Exactly once语义的保证:
第一种方式, 即使开启了WAL日志功能, 数据仍可能在很小的可能性下发生二次消费, 这是at least once
语义的实现. 因为这种方式是自动提交offset到zookeeper中的; 而第二种方式, 使用simple Kafka API
就不需要zookeeper了, offset会被保存在spark Straming的checkpoint
中. 这排除了spark streaming和kafka之间的不一致性, 因此可以即使在任务失败时仍可以达到exactly once
语义; 此外, 为了完全实现exactly once
, 还需要output的外部数据存储是幂等的, 或者可以实现原子事务, 保证offset和save result的一致性
这种方式唯一的不足是不再依赖zookeeper保存offset, 使得依靠zookeeper监控的组件失效, 但是仍然可以在每次批处理后手动更新zookeeper
创建DirectStream:
import org.apache.spark.streaming.kafka._
val directKafkaStream = KafkaUtils.createDirectStream[
[key class], [value class], [key decoder class], [value decoder class] ](
streamingContext, [map of Kafka parameters], [set of topics to consume])
默认情况下, 从kafka partition的最近offset出开始消费, 如果指定auto.offset.reset=smallest
将从最小的offset处开始消费
如果使用了基于zookeeper的kafka监控工具, 可以用以下代码收到过更新zookeeper:
// Hold a reference to the current offset ranges, so it can be used downstream
var offsetRanges = Array.empty[OffsetRange]
directKafkaStream.transform { rdd =>
offsetRanges = rdd.asInstanceOf[HasOffsetRanges].offsetRanges
rdd
}.map {
...
}.foreachRDD { rdd =>
for (o <- offsetRanges) {
println(s"${o.topic} ${o.partition} ${o.fromOffset} ${o.untilOffset}")
}
...
}
使用参数spark.streaming.kafka.maxRatePerPartition
可以限制每秒最多从kafka的partition中读取多少数据
网友评论