本节我们将介绍Spark是如何管理中间数据的,以及我们如何配置数据的访问模式,引导Spark实现更高的可用性和更好的性能。
Spark执行过程回顾
我们先来快速回顾一下Spark应用的执行过程。RDD是分区的,并且整个执行层是基于分区构建的。每个Task同一时间只处理少量的分区,在必要时对所有分区进行reshuffle。Spark传输数据的最小单位不是分区,而是数据块,也就是固定大小的数据,通常是若干MB。
Cache的场景引入
下面我们举一个例子。假设我们在Spark shell下,已经获取了SparkContext,创建好了RDD,并调用了actions的算子。
我们想统计HDFS的文件中,单词Spark出现的次数,可能会写出以下的代码:
sc = SparkContext(...)
wiki = sc.textFile("wikipedia.dump.txt")
spark_articles = wiki.filter(lambda x: "Spark" in x)
print(spark_articles.count())
Spark的执行过程图如下:

现在我们想统计每个出现Spark的文章中,分别出现Hadoop和MapReduce单词的数量,对应的代码如下:
sc = SparkContext(...)
wiki = sc.textFile("wikipedia.dump.txt")
spark_articles = wiki.filter(lambda x: "Spark" in x)
hadoop_articles = spark_articles.filter(lambda x: "Hadoop" in x)
mapreduce_articles = spark_articles.filter(lambda x: "MapReduce" in x)
print(hadoop_articles.count())
print(mapreduce_articles.count())
Spark的执行过程图如下:

Spark在完成第一个Job的执行后,会将所有的中间结果丢弃中间结果和中间的RDD。一个更好的策略是将这些加载的数据保存在内存中,直到整个程序的session结束才会释放。Spark会允许将一些RDD进行缓存,比如下面我们的代码我们将spark_articles进行缓存处理。
spark_articles = ...
spark_articles.cache()
加了缓存后,下图中黄色的RDD将可以重用。

Cache和Persist
Cache方法是比它更通用的Persist方法的快捷方式。调用Persist方法的RDD会在第一次计算后将数据集进行持久化。Persist的参数是设置的持久化级别,共分为以下六种:
- DISK_ONLY:数据保存在磁盘中;
- MEMORY_ONLY:数据保存在内存中;
- MEMORY_AND_DISK:数据保存在内存中,如果内存用尽,则保存在磁盘中
- DISK_ONLY_2/MEMORY_ONLY_2/MEMORY_AND_DISK_2:保存方式和以上相同,区别是会冗余保存两份数据副本,数据副本的数量是可以调整的。
rdd.cache() = rdd.persist(MEMORY_ONLY)
cache方法是persist方法持久化到内存的快捷方式。
最佳实践
- 在以交互式方式运行Spark时,最好将预处理后的数据集进行cache,这样将会得到更好的交互式体验;
- 在批量计算中,最好将需要进行join的字典表进行cache,因为join的字典表会经常reshuffle,cache后会减少数据读取的时间;
- 在需要进行迭代的计算中,将迭代中不变的数据,比如字典表等进行cache,可以减少迭代中这部分数据的读取时间。
小结
本节我们介绍了如何通过cache来提高Spark应用的性能,我们可以任意设置哪些中间数据需要persist,以及persist的方式和数据副本个数等。记住,将计算过程中,重复使用的而且不变的数据进行cache是最佳选择。
网友评论