美文网首页
【Spark学习笔记】RDD篇

【Spark学习笔记】RDD篇

作者: 小透明苞谷 | 来源:发表于2017-11-11 14:47 被阅读0次

    每个Spark应用都由一个驱动器程序(driver program)(例如Spark Shell本身)来发起集群上的各种并行操作,一般要管理多个执行器(executor)节点。
    驱动器程序是整个程序的入口,通过一个SparkContext对象来访问Spark。这个对象代表集群上的一个连接。


    初始化Spark
    from pyspark import SparkConf, SparkContext
    conf = SparkConf().setMaster('local').setAppName('Myapp')
    sc = SparkContext(conf = conf)
    
    关闭Spark

    sc.stop()

    RDD

    RDD(弹性分布式数据集Resilient Distributed Dataset)就是分布式的元素集合
    Spark中,对数据的所有操作不外乎创建RDD转化已有RDD调用RDD操作进行求值
    Spark会自动将RDD中的数据分发到集群上,并将操作并行化执行。
    RDD是一个不可变的分布式集合对象
    一个RDD内部有许多partitions组成,partition 是RDD的最小单元,RDD是由分布在各个节点上的partition 组成的。

    创建RDD

    • 读取一个外部数据集
      lines = sc.textFile("README.md")
    • 在驱动器程序里对一个集合进行并行化(如list,set)
      words = sc.parallelize(["scala","java","hadoop","spark","akka"])

    RDD操作

    • 转化操作
      惰性求值,不会立即执行操作
      Spark会在内部记录下所要求执行的操作的相关信息。
      会生成一个新的RDD
      如map(),filter(),union()
      返回RDD
      pylines = lines.filter(lambda line: 'python' in line )
    • 行动操作
      用到RDD时会触发实际的计算,强制执行那些用到的RDD的转化操作
      最终结果会返回到驱动器程序或写入外部存储系统
      如count(),first()

    每当调用一个新的行动操作时,整个RDD都会从头开始计算。可以通过将中间结果持久化,避免这种低效的行为。

    把数据读取到RDD的操作也是惰性的。,到可以执行一个行动操作来强制执行。如count()

    Spark使用谱系图来记录这些不同的RDD之间的依赖关系。Spark需要这些信息来按需计算每个RDD,也可以依靠谱系图在持久化的RDD丢失部分数据时恢复所丢失的数据。

    常见操作

    1. 转化操作
    • map()与 flatMap()
      将函数用于RDD中每一个元素
      flatMap()对每个输入元素生成多个输出元素,通常用于切分单词
    • distinct() 保持元素唯一性
      开销大,需要将所有数据通过通过网络进行shuffle(混洗),保证每个元素只有一份
    • filter() 过滤

    2.伪集合操作

    • union() 合集,有重复元素
    • intersection()交集,无重复元素(shuffle,单个RDD内的重复元素也一起被移除)
    • substract() 也需要shuffle
    • 计算两个RDD的笛卡尔积 RDD1.cartesian(RDD2)
    1. 行动操作
    • reduce(),接收函数作为参数,函数操作两个相同元素类型的RDD数据并返回一个同样类型的新元素
      sum = rdd.reduce(lambda x,y: x+y)
    • fold()
    • collect() 可以用来获取整个RDD中的数据(数据规模小的情况下)
    • count() RDD中元素个数
    • countByValue() RDD中各元素出现次数
    • take(num) 返回num个元素,无序
    • top(num) 返回前num个元素
    • foreach(func) 对每个元素进行操作,不需把RDD返回本地
      lines.foreach(println)

    4.数据持久化

    • 如果要在多个行动操作中重用同一个RDD,应该用RDD.persist()让Spark把整个RDD的内容保存到内存中(以分区的方式存储到集群中的各机器上)。也可以缓存到磁盘上。
    • persist()调用本身不会触发强制求值
    • cache()
    • unpersist()手动把持久化的RDD从缓存中移除

    persist()与cache():
    1)RDD的cache()方法其实调用的就是persist方法,缓存策略均为MEMORY_ONLY;
    2)可以通过persist方法手工设定StorageLevel来满足工程需要的存储级别;
    3)cache或者persist并不是action;
    4)cache和persist都可以用unpersist来取消

    相关文章

      网友评论

          本文标题:【Spark学习笔记】RDD篇

          本文链接:https://www.haomeiwen.com/subject/ijmwmxtx.html