美文网首页
Resilient Distributed Datasets:

Resilient Distributed Datasets:

作者: 西部小笼包 | 来源:发表于2019-12-15 21:05 被阅读0次

    带着问题阅读论文?

    有什么应用SPARK支持的要比MAP REDUCE/HADOOP支持的要棒?

    这篇文章我不是按照翻译论文的思路来写,翻译的论文在这:
    https://zhuanlan.zhihu.com/p/28533365
    我建议先通读论文,再来看这篇博客。

    为什么我们要学习SPARK?

    首先他被广泛的使用,一个流行的开源项目。 比MR 更加适合迭代的应用。同时它的容错也十分的强大。也是ACM年度最佳论文奖。

    MR使得开发者在编写分布式的应用时十分简单,甚至不怎么需要分布式的知识。因为MR为我们做了很多, 比如:节点间的通讯,分布式的代码,任务调度,容错。
    但是MR暴露的模式,可能有一些APP 不是能很好的适应。
    下面我们说其中一列。

    迭代算法

    有一类算法叫迭代。也就是说输出的东西又变为输入,如此反复。
    比如大名鼎鼎的PAGE-RANK就是这样一个算法。
    首先他会为每个网页计算一个RANK,然后基于这个RANK和多有DOC指向它,会不断反复的重新计算。
    在每一次的迭代中,我们会根据这个网页的RANK,然后往这个网页指向的其他网页,平均的分配他自己的RANK。
    这个是巨大的计算。因为要跑在世界上所有的网页。即使有很多机器,这也要跑很久。

    在传统的MR里,MR是适合只做一次的迭代。因为下一次的迭代,MR不得不去GFS捞出上次迭代的结果。然后要重新启动一个MR JOB, 这些都十分昂贵。

    这就是一个挑战,如何设计出更加好的模型去支持迭代计算,同时也依然保持容错。

    分布式共享内存

    一个解决方案是用DSM。也就是分布式共享内存。每个网页的RANK可以放进共享内存里,每个WORKER可以去读和写。 但是DSM在容错时非常麻烦。
    传统的解决方案是每小时对当前的内存状态写一个CHECK POINT。
    一边计算一边写CHECK POINT 从共享内存去磁盘是十分昂贵的。还有必须REDO所有WORK在这个CP之后。

    SPARK

    那么SPARK同时解决了上面2个问题,第一个他能很好的容错,不像 DSM这么麻烦。第二,他也对迭代式的应用非常友好。

    在SPARK里,数据被存储在一个叫RDD的结构里。 持久化这些RDD在内存中,那么下一次迭代可以基于内存里的RDD去做。

    同时RDD也是SPARK里的核心概念。

    image.png

    RDD

    首先RDD是不可变的,一旦创建出来就无法修改。
    RDD支持2种操作。transform 和 action
    transform就是把一个RDD通过一个运算变成另一个RDD。比如常见的有JOIN, FILTER,REDUCE BY KEY等。 transform是懒做的,也就是不会立即计算。他只是计算的一种描述。只有当RESULT需要的时候,才会去计算。


    image.png

    那么ACTION 就是需要结果了,可以是拿COUNT的结果,或结果本身(collect),或者是一个特定的值。

    例子:

      lines = spark.textFile("hdfs://...")
      errors = lines.filter(_.startsWith("ERROR"))    // lazy!
      errors.persist()    // no work yet
      errors.count()      // an action that computes a result
      // now errors is materialized in memory
      // partitioned across many nodes
      // Spark, will try to keep in RAM (will spill to disk when RAM is full)
    

    RDD可以被重用

      errors.filter(_.contains("MySQL")).count()
      // this will be fast because reuses results computed by previous fragment
      // Spark will schedule jobs across machines that hold partition of errors
    

    RDD的血统(lineage)

    当执行ACTION的时候,SPARK会构建一副LINEAGE图。他会描述这个结果是经过哪些变化得来的。

    lines -> filter w ERROR -> errors -> filter w. HDFS -> map -> timed fields

    Spark 使用血统来调度JOB。
    变换(transform)在相同的分块上组成一个STAGE。 一个JOB运行单个STAGE。

    调度器根据目标RDD的Lineage图创建一个由stage构成的无回路有向图(DAG)。每个stage内部尽可能多地包含一组具有窄依赖关系的转换,并将它们流水线并行化(pipeline)。stage的边界有两种情况:一是宽依赖上的Shuffle操作;二是已缓存分区,它可以缩短父RDD的计算过程。例如图6。父RDD完成计算后,可以在stage内启动一组任务计算丢失的分区。

    image.png

    图6 Spark怎样划分任务阶段(stage)的例子。实线方框表示RDD,实心矩形表示分区(黑色表示该分区被缓存)。要在RDD G上执行一个动作,调度器根据宽依赖创建一组stage,并在每个stage内部将具有窄依赖的转换流水线化(pipeline)。 本例不用再执行stage 1,因为B已经存在于缓存中了,所以只需要运行2和3。

    血统和容错

    有了血统,容错就变得非常容易了。
    我们假设有一台机器挂了。我们只需要重新计算这个挂的机器的状态就可以。血统图可以告诉我们什么是需要被重新计算的。沿着血统图去找到所有的需要的分块。然后根据这些分块的RDD去计算这个挂掉机器上的RDD。

    宽依赖和窄依赖

    我们发现RDD之间的依赖关系可以分为两类,即:(1)窄依赖(narrow dependencies):子RDD的每个分区依赖于常数个父分区(即与数据规模无关);(2)宽依赖(wide dependencies):子RDD的每个分区依赖于所有父RDD分区。例如,map产生窄依赖,而join则是宽依赖(除非父RDD被哈希分区)。另一个例子见图5。

    image

    图5 窄依赖和宽依赖的例子。(方框表示RDD,实心矩形表示分区)
    区分这两种依赖很有用。首先,窄依赖允许在一个集群节点上以流水线的方式(pipeline)计算所有父分区。例如,逐个元素地执行map、然后filter操作;而宽依赖则需要首先计算好所有父分区数据,然后在节点之间进行Shuffle,这与MapReduce类似。第二,窄依赖能够更有效地进行失效节点的恢复,即只需重新计算丢失RDD分区的父分区,而且不同节点之间可以并行计算;而对于一个宽依赖关系的Lineage图,单个节点失效可能导致这个RDD的所有祖先丢失部分分区,因而需要整体重新计算。

    RDD的实现

    每个RDD都包含:(1)一组RDD分区(partition,即数据集的原子组成部分);(2)对父RDD的一组依赖,这些依赖描述了RDD的Lineage;(3)一个函数,即在父RDD上执行何种计算;(4)元数据,描述分区模式和数据存放的位置。例如,一个表示HDFS文件的RDD包含:各个数据块的一个分区,并知道各个数据块放在哪些节点上。而且这个RDD上的map操作结果也具有同样的分区,map函数是在父数据上执行的。表3总结了RDD的内部接口。

    image

    为什么RDD上需要携带METADATA?
    因为依赖多个RDD的变换需要知道是否需要重新整理数据,宽依赖需要,窄依赖不需要。 同时也可以让用户控制更好的局部性和减少整理的工作量。

    为什么要区分窄依赖和宽依赖?
    为了防止失败,窄依赖可以很快的被重计算,掉1-2个PARITION就可以重计算。宽依赖需要一个完整的RDD才可以被重计算。

    RDD实例

    HDFS文件:目前为止我们给的例子中输入RDD都是HDFS文件,对这些RDD可以执行:partitions操作返回各个数据块的一个分区(每个Partition对象中保存数据块的偏移),preferredLocations操作返回数据块所在的节点列表,iterator操作对数据块进行读取。

    map:任何RDD上都可以执行map操作,返回一个MappedRDD对象。该操作传递一个函数参数给map,对父RDD上的记录按照iterator的方式执行这个函数,并返回一组符合条件的父RDD分区及其位置。

    union:在两个RDD上执行union操作,返回两个父RDD分区的并集。通过相应父RDD上的窄依赖关系计算每个子RDD分区(注意union操作不会过滤重复值,相当于SQL中的UNION ALL)。

    sample:抽样与映射类似,但是sample操作中,RDD需要存储一个随机数产生器的种子,这样每个分区能够确定哪些父RDD记录被抽样。

    join:对两个RDD执行join操作可能产生窄依赖(如果这两个RDD拥有相同的哈希分区或范围分区),可能是宽依赖,也可能两种依赖都有(比如一个父RDD有分区,而另一父RDD没有)。

    利用RDD来实现PAGE RANK

    image.png

    通过使用存储在HDFS上的49G Wikipedia导出数据,我们比较了使用RDD实现的Pregel与使用Hadoop计算PageRank的性能。PageRank算法通过10轮迭代处理了大约400万文章的链接图数据,图10显示了在30个节点上,Spark处理速度是Hadoop的2倍多,改进后对输入进行Hash分区速度提升到2.6倍,使用Combiner后提升到3.6倍,这些结果数据也随着节点扩展到60个时同步放大。

    image

    虽然性能很好,实现简单。
    但是我们会发现这个血统的长度和迭代次数成正比。如果失败了,我们重新计算花的代价可能很大。
    解决方案是用户可以指定去复制RDD。程序员可以传入"reliable" flage去persist()里。
    当然还可以传入REPLICATE flag,这就这个RDD的结果就会被写入HDFS,类似于CHECK POINT的功能。
    当然这些都是有代价的,快速的回复,意味着较慢的执行。因为要写磁盘和复制。

    Q:PERSIST是一个transformation 还是action?
    A: 都不是,他不创建一个新的RDD,也不会引起物化。他只是一个调度器的指令。

    Q:如果PERSIST不传FLAG,是否可以保证在发生故障的情况下不必重新计算RDD?
    A: 不能保证,因为没有复制,这个节点的PARITION是可以失效的,复制是重要的。

    Q:为啥还需要检查点?
    A:长血统引起的恢复时间会很长。或者遇到一个宽依赖时,失败会引起很多PARTITION重新计算。这时检查点很有价值。

    Q: spark可以应对网络分区吗?
    A: 节点不能和调度器沟通,等价于死亡。如果部分网络还是可以被调度器REACH的情况下,是可以继续计算。只要有足够的数据去启动这个血统即可。如果所有的复制都不能REACH了,那么集群将会卡主。

    Q:都放在内存里,内存不够怎么办?
    会使用LRU的策略应用在PARITION上。
    当然用户也可以指定PARTITION落盘的优先级

    性能

    在论文里的图7,逻辑回归在100个HADOOP节点上,消耗了80秒
    在图12里,在25个节点消耗了70秒
    差距是因为,MR需要复制存储在REDUCE之后。但是SPARK都在内存里,只有内存不够的时候,才会用到磁盘。一般是没有网络的延迟,和磁盘的IO在复制上。

    对于非迭代的任务,没有架构上的原因使得MR比SPARK更慢。但是对迭代的任务来说,SPARK因为不用去磁盘会快很多。
    当然也没有原因SPARK会比MR慢。

    总结

    SPARK的目标是批处理,迭代的应用程序。 SPARK也可以藐视其他模型如MR, PREGEL。
    但是不能很好处理新进来的数据。SPARK也不擅长构建K/V存储,因为RDD是不可变的。

    要解决上面2个问题,我们请看下一篇论文:Naiad: A Timely Dataflow System

    相关文章

      网友评论

          本文标题:Resilient Distributed Datasets:

          本文链接:https://www.haomeiwen.com/subject/ogdlnctx.html