场景
- spark streaming接受到数据后,在spark中进行存储,之后将kafka确认信息存储到zookeeper上(kafka的偏移量),这种情况下,如果streaming接收器宕机了,zookeeper上还没来得及确认信息,那么这条消息会再次被发送并处理,这种属于接收器挂掉。
- 将元数据存储到外部存储系统中,如HDFS中,这种就是对元数据进行checkpoint,这种属于驱动器挂掉。
上面存在可能都是的情况
- 当executor接收到消息后,并将其存储到了executor的内存中,再向数据源通知已经接收到的消息后,executor开始处理消息时,driver挂掉,如果driver挂掉,则所有的task都会被kill掉。存储在executor内存中的消息就不服存在。
这种解决方案是使用WAL(Write ahead log),启用WAL机制,将已经收到的消息被接收器写入到容器存储中,如HDFS,由于采用了WAL机制,driver可以从失败的点重新读取数据。
WAL可以确保数据不丢失,但是无法确保消息只被处理一次(exactly-once),
场景:接收器接收到数据,并将其存储到了WAL中,在向zookeeper中设置kafka的偏移量之前executor突然挂掉了。
这种场景下,spark将数据存储到了WAL中,而kafka却认为消息还没有被消费(zookeeper中的偏移量还没有设置成功),这是spark会重复接受这条消息,就无法达到消息只发送一次。
从spark1.3以后,spark提供了kafka direct api,spark driver只需要简单计算下一个batch需要处理的kafka中偏移量的范围,然后命令spark executor直接从kafka相应的topic中消费消息。
最终
- 不再需要Kafka接收器,Exectuor直接采用Simple Consumer API从Kafka中消费数据。
- 不再需要WAL机制,我们仍然可以从失败恢复之后从Kafka中重新消费数据;
- exactly-once语义得以保存,我们不再从WAL中读取重复的数据。
网友评论