RDD编程

作者: zlcook | 来源:发表于2017-07-21 17:41 被阅读28次

RDD基础

  • RDD:Resilient Distributed Datasets,弹性分布式数据集

  • 分布在集群中的只读对象集合(由多个分区(Partition)构成,这些分区运行在集群中的不同节点上)

  • 可以存储在磁盘或内存中(多种存储级别)

  • 通过并行“转换”操作构造

  • 失效后自动重构

  • RDD可以包含Python、java、Scala中任意类型的对象,甚至可以包含用户自定义的对象。

  • 两种方法创建RDD:

  • 1.读取外部数据集。

  • 2.在驱动程序里分发驱动器程序中的对象集合(比如list和set)。

  • RDD支持两种类型操作

    • 1.转化操作(tranformation):由一个RDD生成一个新的RDD.旧的RDD不会被改变。map、filter、groupBy、reduceBy
    • 2.行动操作(action):对RDD计算出一个结果,并把结果返回到驱动程序中,或者写入外部存储系统中。count、collect、saveAsTextFile
      • 注:转化操作返回的是RDD,行动操作返回的是其它数据类型。


        操作示例
  • 惰性求值

  • 转化操作和行动操作的区别在于Spark计算RDD的方式不同。RDD的转化操作都是惰性求值,即对RDD调用转化操作(如map())时,操作不会立即执行,它们只有第一次在一个行动操作中用到时才会真正计算。

  • 默认情况下,Spark的RDD会在每次对它们进行行动操作时重新计算,如果想在多个行动操作中重用同一个RDD,可以使用RDD.persist(),让Spark把这个RDD缓存起来。默认缓存到内存中(以分区方式存储到集群中各个机器上)

  • 持久化(缓存)

    • 持久化原因如上。持久化数据丢失怎么办?让Spark持久化一个RDD,计算出RDD的节点会分别保存它们所求出的分区数据,如果一个有持久化的节点发生故障,Spark会在用到缓存的数据时重算丢失的数据分区,当然可以把数据备份到多个节点上,以避免单节点故障拖累进度。
  • 持久化数据方式:默认情况下persist()会把数据以序列化的形式缓存在JVM的堆空间中,当我们把数据写到磁盘或堆外存储上是也总是使用序列化数据。

  • 持久化级别:
    如果采用缓存在内存中的级别,当内存放不下是,Spark会自动利用最近最少使用(LRU)的策略吧最老的分区从内存中移除。

  • image.png

向Spark传递函数

  • Spark 的大部分转化操作和一部分行动操作,都需要依赖用户传递的函数来计算。
  • 在Scala 中,我们可以把定义的内联函数方法的引用静态方法传递给Spark,就像Scala 的其他函数式API 一样。我们还要考虑其他一些细节,比如所传递的函数及其引用的数据需要是可序列化的(实现了Java 的Serializable 接口)。
  • 如果在Scala 中出现了NotSerializableException,通常问题就在于我们传递了一个不可序列化的类中的函数或字段。记住,传递局部可序列化变量或顶级对象中的函数始终是安全的。

常见转化操作和行动操作

Transformation与Action实现
  • sample(withReplacement, fraction, seed):对RDD采样,以及是否替换。

  • 对一个RDD的转化操作
  • 对两个RDD的转化操作
  • 对一个RDD进行的行动操作

一个完整案例

wordcount案例 程序执行流程

相关文章

网友评论

      本文标题:RDD编程

      本文链接:https://www.haomeiwen.com/subject/rdvskxtx.html