首先介绍下RDD的存储级别
在RDD的操作中persist(或cache)是数据集是Spark最重要功能的其中之一。当你持久化一个RDD的时候,每个节点会存储它内存中或重用的数据集的那些分区。这样的话,以后的操作会更快(通常大于10倍)。 caching 对于迭代算法和快速的交互应用是一个关键的工具。
通过使用persist()或者cache()方法把一个RDD标记为持久化。当它第一次计算的时候,它就会被节点保存在内存中。Spark的cache具有容错性-如果某个RDD的分区丢失了,它将会使用之前创建这个RDD的转化方法自动的重新计算。
另外,每个持久化的RDD可以使用一个不同的存储级别进行存储。举个例子,持久化数据到磁盘,以序列化的方式持久化到内存(为了节省空间)。可以通过给persist()方法传入不同参数设置不同存储级别。cache()是存储级别是StorageLevel.MEMORY_ONLY的一个快捷方式。 所有的存储级别如下:
Storage Level | Meaning |
---|---|
MEMORY_ONLY | 默认选项,RDD的(分区)数据直接以Java对象的形式存储于JVM的内存中,如果内存空间不足,某些分区的数据将不会被缓存,需要在使用的时候重新计算。 |
MEMORY_AND_DISK | RDD的数据直接以Java对象的形式存储于JVM的内存中,如果内存空间不中,某些分区的数据会被存储至磁盘,使用的时候从磁盘读取。 |
MEMORY_ONLY_SER (Java and Scala) | RDD的数据(Java对象)序列化之后存储于JVM的内存中(一个分区的数据为内存中的一个字节数组),相比于MEMORY_ONLY能够有效节约内存空间(特别是使用一个快速序列化工具的情况下),但读取数据时需要更多的CPU开销;如果内存空间不足,处理方式与MEMORY_ONLY相同。 |
MEMORY_AND_DISK_SER (Java and Scala) | 相比于MEMORY_ONLY_SER,在内存空间不足的情况下,将序列化之后的数据存储于磁盘。 |
DISK_ONLY | 仅仅使用磁盘存储RDD的数据(未经序列化)。 |
MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. | 以MEMORY_ONLY_2为例,MEMORY_ONLY_2相比于MEMORY_ONLY存储数据的方式是相同的,不同的是会将数据备份到集群中两个不同的节点,其余情况类似。 |
OFF_HEAP (experimental) | 与MEMORY_ONLY_SER类似,但是存储在非堆的内存中,需要开启非堆内存。 |
注意:
即使没有使用persisit,Spark在进行shuffle的操作(比如,reduceByKey)的时候也会自动的持久化某些中间数据。这样可以避免在有节点丢失的情况下,重复计算input。对于 resulting RDD如果需要重复使用的话, 强烈建议使用persist进行持久化。
如何选择存储级别
Spark的存储级别其实是提供了一个内存使用率和cpu效率的两者之间的一个平衡。建议按照下面的流程选择存储级别:
- 如果你的RDD在MEMORY_ONLY的级别可以存储的开,那么可以就不用管了。因为这样是CPU效率最高的方式,能使你的RDD以最快的速度运行。
- 如果内存空间不够用,可以使用MEMORY_ONLY_SER 级别,选择一个快的序列化库可以节省空间,并且执行的速度也很快。(scala 或java)
- 尽量不要把数据写入到磁盘,除非数据集的操作计算比较耗费资源或者数据量特别大。否则的话重新计算分区的和从磁盘读取的数据一样快。
- 如果希望快速的故障恢复的话,可以使用混合的存储的级别。比如使用spark作为web服务的请求。所有的存储级别都通过重复计算丢失的数据提供了容错性。但是混合的存储级可以继续运行task而不需要重复计算丢失的分区。
删除数据
Spark能够自动的监控每个节点cache的使用,并根据LRU算法丢弃老的数据。如果你想要手动的删除RDD的数据而不是等待数据的移除。可以使用RDD.unpersist()方法。
适合场景
以上基本是官方的介绍,这儿我来介绍使用的场景:
具体的就是RDD需要继续两个不同的操作
具体的case:一个RDD需要写入到两个存储之中,比如当前RDD的数据需要写入到HBase,又要写入ES,或者Mysql。这个时候如果不进行persist操作,则会有执行多次写入操作之前的。
网友评论