1.基于receiver的方式
这种方式使用receiver来获取数据。receiver是使用kafka的高层次Consumer API来实现的。receiver从kafka中获取的数据是存储在spark executor的内存中(如果突然数据暴增,大量batch堆积,很容易出现内存溢出的问题),然后spark Streaming启动的job会去处理那些数据。
然而,在默认的配置下,这种方式可能会因为底层的失败而丢失数据。如果要启动高可靠机制,让数据零丢失,就必须启用spark Streaming的预写日志机制(write Ahead Log,WAL)。该机制会同步地将接收到的kafka数据写入分布式文件系统(比如HDFS)上的预写日志中。所以,即使底层节点出现了失败,也可以使用预写日志中的数据进行恢复。
需要注意的要点:
kafka中topic的partition和spark中RDD的partition是没有关系的。KafkaUtils.createStream()中提高partition的数量,只会增加一个receiver中读取partition的线程的数量,不会增加spark处理数据的并行度。
可以创建多个kafka输入DStream,使用不同的consumer group和topic,来通过多个receiver并行接收数据。
如果基于容错的文件系统,比如HDFS,启动了预写日志机制,接收到的数据都会被复制一份到预写日志中。因此,在KafkaUtils.createStream()中,设置的持久化级别是StorageLevel.MEMORY_AND_DISK_SER。
2.基于Direct的方式
这种新的不基于receiver的直接方式,是在spark 1.3中引入的,从而能够确保更加健壮的机制。替代掉使用receiver来接收数据后,这种方式会周期性地查询kafka,来获得每个topic+partition的最新的offset,从而定义每个batch的offset的范围。当处理数据的job启动时,就会使用kafka的简单consumer api来获取kafka指定的offset范围的数据。
优点如下
简化并行读取:如果要读取多个partition,不需要创建多个输入DStream然后对它们进行union操作。spark会创建跟kafka partition一样多的RDD partition,并且会并行从kafka中读取数据,所以在kafka partition和RDD partition之间,有一个一对一的映射关系。
高性能:如果要保证零数据丢失,在基于receiver的方式中,需要开启WAL机制。这种方式其实效率低下,因为数据实际上被复制了两份,kafka本身就有高可靠的机制,会对数据复制一份,而这里又会复制一份到WAL中。基于direct的方式,不依赖receiver,不用开启WAL机制,只要kafka中做了数据复制,就可以通过kafka的副本进行恢复。
一次且仅一次的事务机制。
3.对比
基于receiver的方式,是使用kafka的高阶API来在ZooKeeper中保存消费过的offset的,这是消费kafka数据的传统方式。这种方式配合着WAL机制,可以保证数据零丢失的高可靠性,但是无法保证数据仅被处理一次,可能会处理多次,因为spark和zookeeper之间可能是不同步的。
基于direct的方式,使用kafka的简单api,spark streaming自己就负责追踪消费的offset,并保存在checkpoint中。spark自己一定是同步的,因此可以保证数据仅消费一次。
生产中大多用Direct方式。
网友评论