美文网首页
Spark之RDD

Spark之RDD

作者: wilsonLwx | 来源:发表于2021-10-26 16:53 被阅读0次

    最近在学习Spark, 网上搜集了一些学习文章,把便于自己理解的揉杂在一起,方便自己学习回顾。

    RDD概念

    RDD (Resilient Distrubuted Dataset) 弹性分布式数据集,简称RDD.

    Resilient: 何为弹性?一个节点挂了,内存中的数据集可以恢复。
    Distributed: 分布式?内存集分布在多个节点上。
    Dataset: rdd的数据集由records组成,独立可辨识的数据,类似表的一行或text的一行等。

    是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型.每个RDD本质上就是一个只读的分区记录集合,可以分成多个分区,每个分区就是一个数据集片段,并且相同RDD上的不同分区可以被保存在集群的不同节点上,从而可以在集群中的不同节点上进行并行计算。不能直接修改,只能基于稳定的物理存储中的数据集来创建RDD,或者通过在其他RDD上执行确定的转换操作(如map、join和groupBy)而创建得到新的RDD。RDD提供了一组丰富的操作以支持常见的数据运算,分为“行动”(Action)和“转换”(Transformation)两种类型,前者用于执行计算并指定输出的形式,后者指定RDD之间的相互依赖关系。两类操作的主要区别是,转换操作(比如map、filter、groupBy、join等)接受RDD并返回RDD,而行动操作(比如count、collect等)接受RDD但是返回非RDD(即输出一个值或结果)。

    RDD典型的执行过程如下:
    1.RDD读入外部数据源(或者内存中的集合)进行创建;
    2.RDD经过一系列的“转换”操作,每一次都会产生不同的RDD,供给下一个“转换”使用;
    3.最后一个RDD经“行动”操作进行处理,并输出到外部数据源(或者变成Scala集合或标量)。


    RDD执行过程图

    RDD采用惰性调用,只有在RDD的“行动”操作之后才真正的发生计算,而之前的“转换”操作Spark都只是记录了RDD之间的生成和依赖关系,当要进行"行动"操作时会根据依赖关系生成DAG(有向无环图),才真正意义上的开始计算

    RDD的创建

    RDD可以通过两种方式创建:

    • 第一种:读取一个外部数据集。比如,从本地文件加载数据集,或者从HDFS文件系统、HBase、Cassandra、Amazon S3等外部数据源中加载数据集。Spark可以支持文本文件、SequenceFile文件(Hadoop提供的 SequenceFile是一个由二进制序列化过的key/value的字节流组成的文本存储文件)和其他符合Hadoop InputFormat格式的文件。
    • 第二种:调用SparkContext的parallelize方法,在Driver中一个已经存在的集合(数组)上创建。

    A.从本地文件加载数据

    >>> textFile = sc.textFile("/Users/wilson/text.txt")
    

    上面代码中,sc.textFile()中的这个textFile是sc(SparkContext)的一个方法名称,这个方法用来加载文件数据。执行上上面这条命令以后,并不会马上显示结果,因为,Spark采用惰性机制,只有遇到“行动”类型的操作,才会从头到尾执行所有操作。所以,下面我们执行一条“行动”类型的语句,就可以看到结果:

    >>> textFile.first()
    >>> 'Hello Spark'
    

    first()是一个“行动”(Action)类型的操作,会启动真正的计算过程,从文件中加载数据到变量textFile中,并取出第一行文本。
    正因为Spark采用了惰性机制,在执行转换操作的时候,即使我们输入了错误的语句,pyspark也不会马上报错,而是等到执行“行动”类型的语句时启动真正的计算,那个时候“转换”操作语句中的错误就会显示出来

    下面代码中,练习将textFile变量中的内容写入另外的目录中,之后可到指定对应目录下查看是否保存成功
    saveAsTextFile()是一个“行动”类型的操作。

    >>> textFile.saveAsTextFile("/Users/wilson/another_text.txt")
    

    也可以加载HDFS中的文件,可先安装Hadoop,文件路径使用HDFS对应的文件目录即可:

    fileRDD = sc.textFile('hdfs://localhost:9000/test.txt')
    filterRDD = fileRDD.filter(lambda line:’hello world’ in line)
    filterRDD.cache()
    filterRDD.count()
    

    这个程序的执行过程如下:

    • 创建这个Spark程序的执行上下文,即创建SparkContext对象;
    • 从外部数据源(即HDFS文件)中读取数据创建fileRDD对象;
    • 构建起fileRDD和filterRDD之间的依赖关系,形成DAG图,这时候并没有发生真正的计算,只是记录转换的轨迹;
    • 第3行代码表示对filterRDD进行持久化,把它保存在内存或磁盘中(这里采用cache接口把数据集保存在内存中),方便后续重复使用,当数据被反复访问时(比如查询一些热点数据,或者运行迭代算法),这是非常有用的,而且通过cache()可以缓存非常大的数据集,支持跨越几十甚至上百个节点
    • 执行到第4行代码时,count()是一个行动类型的操作,触发真正的计算,开始实际执行从fileRDD到filterRDD的转换操作,并把结果持久化到内存中,最后计算出filterRDD中包含的元素个数。

    B.通过并行集合(数组)创建RDD

    >>> nums = [1,2,3,4,5]
    >>> rdd = sc.parallelize(nums)
    >>> rdd.collect()
    [1, 2, 3, 4, 5]
    

    RDD之间的依赖关系

    RDD中不同的操作会使得不同RDD中的分区会产生不同的依赖。RDD中的依赖关系分为窄依赖(Narrow Dependency)与宽依赖(Wide Dependency),如图:


    a、窄依赖表现为一个父RDD的分区对应于一个子RDD的分区,或多个父RDD的分区对应于一个子RDD的分区;

    b、宽依赖表现为一个父RDD的一个分区对应一个子RDD的多个分区。

    窄依赖与宽依赖的区别:
    Spark的这种依赖关系设计,使其具有了天生的容错性,大大加快了Spark的执行速度。因为,RDD数据集通过“血缘关系”记住了它是如何从其它RDD中演变过来的,血缘关系记录的是粗颗粒度的转换操作行为,当这个RDD的部分分区数据丢失时,它可以通过血缘关系获取足够的信息来重新运算和恢复丢失的数据分区,由此带来了性能的提升。相对而言,在两种依赖关系中,窄依赖的失败恢复更为高效,它只需要根据父RDD分区重新计算丢失的分区即可(不需要重新计算所有分区),而且可以并行地在不同节点进行重新计算。而对于宽依赖而言,单个节点失效通常意味着重新计算过程会涉及多个父RDD分区,开销较大。此外,Spark还提供了数据检查点和记录日志,用于持久化中间RDD,从而使得在进行失败恢复时不需要追溯到最开始的阶段。在进行故障恢复时,Spark会对数据检查点开销和重新计算RDD分区的开销进行比较,从而自动选择最优的恢复策略。

    阶段的划分

    Spark通过分析各个RDD的依赖关系生成了DAG,再通过分析各个RDD中的分区之间的依赖关系来决定如何划分阶段,具体划分方法是:在DAG中进行反向解析,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的阶段中;将窄依赖尽量划分在同一个阶段中,可以实现流水线计算(具体的阶段划分算法请参见AMP实验室发表的论文《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》)。例如,如图所示,假设从HDFS中读入数据生成3个不同的RDD(即A、C和E),通过一系列转换操作后再将计算结果保存回HDFS。对DAG进行解析时,在依赖图中进行
    反向解析,由于从RDD A到RDD B的转换以及从RDD B和F到RDD G的转换,都属于宽依赖,因此,在宽依赖处断开后可以得到三个阶段,即阶段1、阶段2和阶段3。可以看出,在阶段2中,从map到union都是窄依赖,这两步操作可以形成一个流水线操作,比如,分区7通过map操作生成的分区9,可以不用等待分区8到分区9这个转换操作的计算结束,而是继续进行union操作,转换得到分区13,这样流水线执行大大提高了计算的效率。
    如图:


    RDD的运行过程

    通过上述对RDD概念、依赖关系和阶段划分的介绍,结合之前介绍的Spark运行基本流程,这里再总结一下RDD在Spark架构中的运行过程(如图下图所示):


    (1)创建RDD对象;
    (2)SparkContext负责计算RDD之间的依赖关系,构建DAG;
    (3)DAGScheduler负责把DAG图分解成多个阶段,每个阶段中包含了多个任务,每个任务会被任务调度器分发给各个工作节点(Worker Node)上的Executor去执行。

    RDD的转换操作

    对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。
    下面列出一些常见的转换操作(Transformation API):

    • filter(func):筛选出满足函数func的元素,并返回一个新的数据集
    • map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
    • flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
    • groupByKey():应用于(K,V)键值对的数据集时,返回一个新的(K, Iterable)形式的数据集
    • reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合

    RDD的行动操作

    行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。
    下面列出一些常见的行动操作(Action API):

    • count() 返回数据集中的元素个数
    • collect() 以数组的形式返回数据集中的所有元素
    • first() 返回数据集中的第一个元素
    • take(n) 以数组的形式返回数据集中的前n个元素
    • reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
    • foreach(func) 将数据集中的每个元素传递到函数func中运行*

    RDD的持久化

    前面我们已经说过,在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执行计算。如果整个Spark程序中只有一次行动操作,这当然不会有什么问题。但是,在一些情形下,我们需要多次调用不同的行动操作,这就意味着,每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据。
    比如,下面就是多次计算同一个DD的例子:

    >>> list = ["Hadoop","Spark","Hive"]
    >>> rdd = sc.parallelize(list)
    >>> print(rdd.count()) //行动操作,触发一次真正从头到尾的计算
    3
    >>> print(','.join(rdd.collect())) //行动操作,触发一次真正从头到尾的计算
    Hadoop,Spark,Hive
    

    上面代码执行过程中,前后共触发了两次从头到尾的计算。
    实际上,可以通过持久化(缓存)机制避免这种重复计算的开销。可以使用persist()方法对一个RDD标记为持久化,之所以说“标记为持久化”,是因为出现persist()语句的地方,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化,持久化后的RDD将会被保留在计算节点的内存中被后面的行动操作重复使用。
    persist()的圆括号中包含的是持久化级别参数,比如,persist(MEMORY_ONLY)表示将RDD作为反序列化的对象存储于JVM中,如果内存不足,就要按照LRU原则替换缓存中的内容。persist(MEMORY_AND_DISK)表示将RDD作为反序列化的对象存储在JVM中,如果内存不足,超出的分区将会被存放在硬盘上。一般而言,使用cache()方法时,会调用persist(MEMORY_ONLY)。
    例子如下:

    >>> list = ["Hadoop","Spark","Hive"]
    >>> rdd = sc.parallelize(list)
    >>> rdd.cache()  //会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,这是rdd还没有被计算生成
    >>> print(rdd.count()) //第一次行动操作,触发一次真正从头到尾的计算,这时才会执行上面的rdd.cache(),把这个rdd放到缓存中
    3
    >>> print(','.join(rdd.collect())) //第二次行动操作,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
    Hadoop,Spark,Hive
    

    最后,可以使用unpersist()方法手动地把持久化的RDD释放。

    相关文章

      网友评论

          本文标题:Spark之RDD

          本文链接:https://www.haomeiwen.com/subject/ybxybktx.html