RDD通过persist
方法或cache
方法可以将前面得计算结果进行缓存,默认情况下persist会将数据序列化后缓存在JVM的堆中。
但并不是调用方法后立刻缓存,而是触发action算子是,该RDD会被缓存在Executor的内存中,供后续使用。
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def persist(): this.type = persist(StorageLevel.MEMORY_ONLY)
/**
* Persist this RDD with the default storage level (`MEMORY_ONLY`).
*/
def cache(): this.type = persist()
通过查看源码我们发现,cache
方法实际也是调用了persist
方法,只是默认设置存储级别为只存在内存中。
我们同样可以指定存储级别来指定缓存位置,存储级别定义在StorageLevel
中,常用的级别定义有:
object StorageLevel {
// 不缓存
val NONE = new StorageLevel(false, false, false, false)
// 只存磁盘
val DISK_ONLY = new StorageLevel(true, false, false, false)
val DISK_ONLY_2 = new StorageLevel(true, false, false, false, 2)
// 只存内存
val MEMORY_ONLY = new StorageLevel(false, true, false, true)
val MEMORY_ONLY_2 = new StorageLevel(false, true, false, true, 2)
// 只存内存并序列化
val MEMORY_ONLY_SER = new StorageLevel(false, true, false, false)
val MEMORY_ONLY_SER_2 = new StorageLevel(false, true, false, false, 2)
// 存内存和磁盘,当内存空间不足时,将缓存存储在磁盘上
val MEMORY_AND_DISK = new StorageLevel(true, true, false, true)
val MEMORY_AND_DISK_2 = new StorageLevel(true, true, false, true, 2)
// 存内存和磁盘并序列化
val MEMORY_AND_DISK_SER = new StorageLevel(true, true, false, false)
val MEMORY_AND_DISK_SER_2 = new StorageLevel(true, true, false, false, 2)
// 存储在堆外内存
val OFF_HEAP = new StorageLevel(true, true, true, false, 1)
其中带有_2
后缀的表示缓存的副本数量为2,使得缓存数据更不容易丢失。
缓存可能丢失,或者由于内存不足而被回收,缓存丢失后也可以通过RDD的血缘关系进行重新计算。由于RDD的各个分区是独立的,所以只需要重新计算丢失的那个分区数据即可。
网友评论