带着问题阅读论文?
有什么应用SPARK支持的要比MAP REDUCE/HADOOP支持的要棒?
这篇文章我不是按照翻译论文的思路来写,翻译的论文在这:
https://zhuanlan.zhihu.com/p/28533365
我建议先通读论文,再来看这篇博客。
为什么我们要学习SPARK?
首先他被广泛的使用,一个流行的开源项目。 比MR 更加适合迭代的应用。同时它的容错也十分的强大。也是ACM年度最佳论文奖。
MR使得开发者在编写分布式的应用时十分简单,甚至不怎么需要分布式的知识。因为MR为我们做了很多, 比如:节点间的通讯,分布式的代码,任务调度,容错。
但是MR暴露的模式,可能有一些APP 不是能很好的适应。
下面我们说其中一列。
迭代算法
有一类算法叫迭代。也就是说输出的东西又变为输入,如此反复。
比如大名鼎鼎的PAGE-RANK就是这样一个算法。
首先他会为每个网页计算一个RANK,然后基于这个RANK和多有DOC指向它,会不断反复的重新计算。
在每一次的迭代中,我们会根据这个网页的RANK,然后往这个网页指向的其他网页,平均的分配他自己的RANK。
这个是巨大的计算。因为要跑在世界上所有的网页。即使有很多机器,这也要跑很久。
在传统的MR里,MR是适合只做一次的迭代。因为下一次的迭代,MR不得不去GFS捞出上次迭代的结果。然后要重新启动一个MR JOB, 这些都十分昂贵。
这就是一个挑战,如何设计出更加好的模型去支持迭代计算,同时也依然保持容错。
分布式共享内存
一个解决方案是用DSM。也就是分布式共享内存。每个网页的RANK可以放进共享内存里,每个WORKER可以去读和写。 但是DSM在容错时非常麻烦。
传统的解决方案是每小时对当前的内存状态写一个CHECK POINT。
一边计算一边写CHECK POINT 从共享内存去磁盘是十分昂贵的。还有必须REDO所有WORK在这个CP之后。
SPARK
那么SPARK同时解决了上面2个问题,第一个他能很好的容错,不像 DSM这么麻烦。第二,他也对迭代式的应用非常友好。
在SPARK里,数据被存储在一个叫RDD的结构里。 持久化这些RDD在内存中,那么下一次迭代可以基于内存里的RDD去做。
同时RDD也是SPARK里的核心概念。
image.pngRDD
首先RDD是不可变的,一旦创建出来就无法修改。
RDD支持2种操作。transform 和 action
transform就是把一个RDD通过一个运算变成另一个RDD。比如常见的有JOIN, FILTER,REDUCE BY KEY等。 transform是懒做的,也就是不会立即计算。他只是计算的一种描述。只有当RESULT需要的时候,才会去计算。
image.png
那么ACTION 就是需要结果了,可以是拿COUNT的结果,或结果本身(collect),或者是一个特定的值。
例子:
lines = spark.textFile("hdfs://...")
errors = lines.filter(_.startsWith("ERROR")) // lazy!
errors.persist() // no work yet
errors.count() // an action that computes a result
// now errors is materialized in memory
// partitioned across many nodes
// Spark, will try to keep in RAM (will spill to disk when RAM is full)
RDD可以被重用
errors.filter(_.contains("MySQL")).count()
// this will be fast because reuses results computed by previous fragment
// Spark will schedule jobs across machines that hold partition of errors
RDD的血统(lineage)
当执行ACTION的时候,SPARK会构建一副LINEAGE图。他会描述这个结果是经过哪些变化得来的。
如
lines -> filter w ERROR -> errors -> filter w. HDFS -> map -> timed fields
Spark 使用血统来调度JOB。
变换(transform)在相同的分块上组成一个STAGE。 一个JOB运行单个STAGE。
调度器根据目标RDD的Lineage图创建一个由stage构成的无回路有向图(DAG)。每个stage内部尽可能多地包含一组具有窄依赖关系的转换,并将它们流水线并行化(pipeline)。stage的边界有两种情况:一是宽依赖上的Shuffle操作;二是已缓存分区,它可以缩短父RDD的计算过程。例如图6。父RDD完成计算后,可以在stage内启动一组任务计算丢失的分区。
image.png图6 Spark怎样划分任务阶段(stage)的例子。实线方框表示RDD,实心矩形表示分区(黑色表示该分区被缓存)。要在RDD G上执行一个动作,调度器根据宽依赖创建一组stage,并在每个stage内部将具有窄依赖的转换流水线化(pipeline)。 本例不用再执行stage 1,因为B已经存在于缓存中了,所以只需要运行2和3。
血统和容错
有了血统,容错就变得非常容易了。
我们假设有一台机器挂了。我们只需要重新计算这个挂的机器的状态就可以。血统图可以告诉我们什么是需要被重新计算的。沿着血统图去找到所有的需要的分块。然后根据这些分块的RDD去计算这个挂掉机器上的RDD。
宽依赖和窄依赖
我们发现RDD之间的依赖关系可以分为两类,即:(1)窄依赖(narrow dependencies):子RDD的每个分区依赖于常数个父分区(即与数据规模无关);(2)宽依赖(wide dependencies):子RDD的每个分区依赖于所有父RDD分区。例如,map产生窄依赖,而join则是宽依赖(除非父RDD被哈希分区)。另一个例子见图5。
image图5 窄依赖和宽依赖的例子。(方框表示RDD,实心矩形表示分区)
区分这两种依赖很有用。首先,窄依赖允许在一个集群节点上以流水线的方式(pipeline)计算所有父分区。例如,逐个元素地执行map、然后filter操作;而宽依赖则需要首先计算好所有父分区数据,然后在节点之间进行Shuffle,这与MapReduce类似。第二,窄依赖能够更有效地进行失效节点的恢复,即只需重新计算丢失RDD分区的父分区,而且不同节点之间可以并行计算;而对于一个宽依赖关系的Lineage图,单个节点失效可能导致这个RDD的所有祖先丢失部分分区,因而需要整体重新计算。
RDD的实现
每个RDD都包含:(1)一组RDD分区(partition,即数据集的原子组成部分);(2)对父RDD的一组依赖,这些依赖描述了RDD的Lineage;(3)一个函数,即在父RDD上执行何种计算;(4)元数据,描述分区模式和数据存放的位置。例如,一个表示HDFS文件的RDD包含:各个数据块的一个分区,并知道各个数据块放在哪些节点上。而且这个RDD上的map操作结果也具有同样的分区,map函数是在父数据上执行的。表3总结了RDD的内部接口。
image为什么RDD上需要携带METADATA?
因为依赖多个RDD的变换需要知道是否需要重新整理数据,宽依赖需要,窄依赖不需要。 同时也可以让用户控制更好的局部性和减少整理的工作量。
为什么要区分窄依赖和宽依赖?
为了防止失败,窄依赖可以很快的被重计算,掉1-2个PARITION就可以重计算。宽依赖需要一个完整的RDD才可以被重计算。
RDD实例
HDFS文件:目前为止我们给的例子中输入RDD都是HDFS文件,对这些RDD可以执行:partitions操作返回各个数据块的一个分区(每个Partition对象中保存数据块的偏移),preferredLocations操作返回数据块所在的节点列表,iterator操作对数据块进行读取。
map:任何RDD上都可以执行map操作,返回一个MappedRDD对象。该操作传递一个函数参数给map,对父RDD上的记录按照iterator的方式执行这个函数,并返回一组符合条件的父RDD分区及其位置。
union:在两个RDD上执行union操作,返回两个父RDD分区的并集。通过相应父RDD上的窄依赖关系计算每个子RDD分区(注意union操作不会过滤重复值,相当于SQL中的UNION ALL)。
sample:抽样与映射类似,但是sample操作中,RDD需要存储一个随机数产生器的种子,这样每个分区能够确定哪些父RDD记录被抽样。
join:对两个RDD执行join操作可能产生窄依赖(如果这两个RDD拥有相同的哈希分区或范围分区),可能是宽依赖,也可能两种依赖都有(比如一个父RDD有分区,而另一父RDD没有)。
利用RDD来实现PAGE RANK
image.png通过使用存储在HDFS上的49G Wikipedia导出数据,我们比较了使用RDD实现的Pregel与使用Hadoop计算PageRank的性能。PageRank算法通过10轮迭代处理了大约400万文章的链接图数据,图10显示了在30个节点上,Spark处理速度是Hadoop的2倍多,改进后对输入进行Hash分区速度提升到2.6倍,使用Combiner后提升到3.6倍,这些结果数据也随着节点扩展到60个时同步放大。
image虽然性能很好,实现简单。
但是我们会发现这个血统的长度和迭代次数成正比。如果失败了,我们重新计算花的代价可能很大。
解决方案是用户可以指定去复制RDD。程序员可以传入"reliable" flage去persist()里。
当然还可以传入REPLICATE flag,这就这个RDD的结果就会被写入HDFS,类似于CHECK POINT的功能。
当然这些都是有代价的,快速的回复,意味着较慢的执行。因为要写磁盘和复制。
Q:PERSIST是一个transformation 还是action?
A: 都不是,他不创建一个新的RDD,也不会引起物化。他只是一个调度器的指令。
Q:如果PERSIST不传FLAG,是否可以保证在发生故障的情况下不必重新计算RDD?
A: 不能保证,因为没有复制,这个节点的PARITION是可以失效的,复制是重要的。
Q:为啥还需要检查点?
A:长血统引起的恢复时间会很长。或者遇到一个宽依赖时,失败会引起很多PARTITION重新计算。这时检查点很有价值。
Q: spark可以应对网络分区吗?
A: 节点不能和调度器沟通,等价于死亡。如果部分网络还是可以被调度器REACH的情况下,是可以继续计算。只要有足够的数据去启动这个血统即可。如果所有的复制都不能REACH了,那么集群将会卡主。
Q:都放在内存里,内存不够怎么办?
会使用LRU的策略应用在PARITION上。
当然用户也可以指定PARTITION落盘的优先级
性能
在论文里的图7,逻辑回归在100个HADOOP节点上,消耗了80秒
在图12里,在25个节点消耗了70秒
差距是因为,MR需要复制存储在REDUCE之后。但是SPARK都在内存里,只有内存不够的时候,才会用到磁盘。一般是没有网络的延迟,和磁盘的IO在复制上。
对于非迭代的任务,没有架构上的原因使得MR比SPARK更慢。但是对迭代的任务来说,SPARK因为不用去磁盘会快很多。
当然也没有原因SPARK会比MR慢。
总结
SPARK的目标是批处理,迭代的应用程序。 SPARK也可以藐视其他模型如MR, PREGEL。
但是不能很好处理新进来的数据。SPARK也不擅长构建K/V存储,因为RDD是不可变的。
要解决上面2个问题,我们请看下一篇论文:Naiad: A Timely Dataflow System
网友评论