美文网首页
【Spark学习笔记】RDD篇

【Spark学习笔记】RDD篇

作者: 小透明苞谷 | 来源:发表于2017-11-11 14:47 被阅读0次

每个Spark应用都由一个驱动器程序(driver program)(例如Spark Shell本身)来发起集群上的各种并行操作,一般要管理多个执行器(executor)节点。
驱动器程序是整个程序的入口,通过一个SparkContext对象来访问Spark。这个对象代表集群上的一个连接。


初始化Spark
from pyspark import SparkConf, SparkContext
conf = SparkConf().setMaster('local').setAppName('Myapp')
sc = SparkContext(conf = conf)
关闭Spark

sc.stop()

RDD

RDD(弹性分布式数据集Resilient Distributed Dataset)就是分布式的元素集合
Spark中,对数据的所有操作不外乎创建RDD转化已有RDD调用RDD操作进行求值
Spark会自动将RDD中的数据分发到集群上,并将操作并行化执行。
RDD是一个不可变的分布式集合对象
一个RDD内部有许多partitions组成,partition 是RDD的最小单元,RDD是由分布在各个节点上的partition 组成的。

创建RDD

  • 读取一个外部数据集
    lines = sc.textFile("README.md")
  • 在驱动器程序里对一个集合进行并行化(如list,set)
    words = sc.parallelize(["scala","java","hadoop","spark","akka"])

RDD操作

  • 转化操作
    惰性求值,不会立即执行操作
    Spark会在内部记录下所要求执行的操作的相关信息。
    会生成一个新的RDD
    如map(),filter(),union()
    返回RDD
    pylines = lines.filter(lambda line: 'python' in line )
  • 行动操作
    用到RDD时会触发实际的计算,强制执行那些用到的RDD的转化操作
    最终结果会返回到驱动器程序或写入外部存储系统
    如count(),first()

每当调用一个新的行动操作时,整个RDD都会从头开始计算。可以通过将中间结果持久化,避免这种低效的行为。

把数据读取到RDD的操作也是惰性的。,到可以执行一个行动操作来强制执行。如count()

Spark使用谱系图来记录这些不同的RDD之间的依赖关系。Spark需要这些信息来按需计算每个RDD,也可以依靠谱系图在持久化的RDD丢失部分数据时恢复所丢失的数据。

常见操作

  1. 转化操作
  • map()与 flatMap()
    将函数用于RDD中每一个元素
    flatMap()对每个输入元素生成多个输出元素,通常用于切分单词
  • distinct() 保持元素唯一性
    开销大,需要将所有数据通过通过网络进行shuffle(混洗),保证每个元素只有一份
  • filter() 过滤

2.伪集合操作

  • union() 合集,有重复元素
  • intersection()交集,无重复元素(shuffle,单个RDD内的重复元素也一起被移除)
  • substract() 也需要shuffle
  • 计算两个RDD的笛卡尔积 RDD1.cartesian(RDD2)
  1. 行动操作
  • reduce(),接收函数作为参数,函数操作两个相同元素类型的RDD数据并返回一个同样类型的新元素
    sum = rdd.reduce(lambda x,y: x+y)
  • fold()
  • collect() 可以用来获取整个RDD中的数据(数据规模小的情况下)
  • count() RDD中元素个数
  • countByValue() RDD中各元素出现次数
  • take(num) 返回num个元素,无序
  • top(num) 返回前num个元素
  • foreach(func) 对每个元素进行操作,不需把RDD返回本地
    lines.foreach(println)

4.数据持久化

  • 如果要在多个行动操作中重用同一个RDD,应该用RDD.persist()让Spark把整个RDD的内容保存到内存中(以分区的方式存储到集群中的各机器上)。也可以缓存到磁盘上。
  • persist()调用本身不会触发强制求值
  • cache()
  • unpersist()手动把持久化的RDD从缓存中移除

persist()与cache():
1)RDD的cache()方法其实调用的就是persist方法,缓存策略均为MEMORY_ONLY;
2)可以通过persist方法手工设定StorageLevel来满足工程需要的存储级别;
3)cache或者persist并不是action;
4)cache和persist都可以用unpersist来取消

相关文章

  • 【Spark学习笔记】RDD篇

    每个Spark应用都由一个驱动器程序(driver program)(例如Spark Shell本身)来发起集群上...

  • Spark Architecture

    OReilly.Learning.Spark 学习笔记 Spark里所有操作都是对RDD来的。分为两种 1. Tr...

  • Spark RDD学习笔记

    一、学习Spark RDD RDD是Spark中的核心数据模型,一个RDD代表着一个被分区(partition)的...

  • spark任务执行过程

    ​ 在学习了Spark RDD和RDD操作之后,是不是很想快点写个Spark程序来巩固一下所学的知识。学习大数...

  • Spark入门(Python)--1.1 RDD基础

    该系列spark学习笔记基于Python Spark. RDD(弹性分布式数据集)是一个不可变的分布式对象集合,可...

  • Spark RDD 笔记

    本文内容是 是学习 范东来《Spark 课程》 笔记 RDD 不可变, 只读,经过变化会生成新的对象 弹性 表...

  • 【Spark学习笔记】详解RDD

    1.Driver program 包含程序的main()方法,RDDs的定义和操作。它管理很多节点,我们称为exe...

  • Spark学习笔记(1)RDD

    RDD RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最...

  • Spark RDD Api使用指南

    ​ 在Spark快速入门-RDD文章中学了spark的RDD。spark包含转换和行动操作。在进行spark程...

  • Spark源码之DAGScheduler

    Spark源码之DAGScheduler介绍篇 Spark Application中的RDD经过一系列的Trans...

网友评论

      本文标题:【Spark学习笔记】RDD篇

      本文链接:https://www.haomeiwen.com/subject/ijmwmxtx.html