1.spark基础-RDD

作者: 水墨点滴 | 来源:发表于2019-07-18 20:51 被阅读0次

1.RDD创建

Spark是以RDD概念为中心运行的。RDD是一个容错的、可以被并行操作的元素集合。创建一个RDD有两个方法:在你的驱动程序中并行化一个已经存在的集合;从外部存储系统中引用一个数据集,这个存储系统可以是一个共享文件系统,比如HDFS、HBase或任意提供了Hadoop输入格式的数据来源。

(1) RDD的创建—— 并行化集合
并行化集合是通过在驱动程序中一个现有的迭代器或集合上调用SparkContext的parallelize方法建立的。为了创建一个能够并行操作的分布数据集,集合中的元素都会被拷贝

data = [1, 2, 3, 4, 5]
distData = sc.parallelize(data)    #建立了分布数据集,可以进行一些并行的操作

并行化中可以自己设置数据集划分成分片的数量(一般是spark集群自动进行设定的),比如sc.parallelize(data, 10)

(2)外部数据集
PySpark可以通过Hadoop支持的外部数据源(包括本地文件系统、HDFS、 Cassandra、HBase、亚马逊S3等等)建立分布数据集。Spark支持文本文件、序列文件以及其他任何Hadoop输入格式文件.

(1)通过文本文件创建RDD要使用SparkContext的textfile方法

from pyspark import SparkContext

if __name__ == "__main__":
    sc = SparkContext(appName="zzz_KMeans")
    #调用文件的url/本地文件路径等
    lines = sc.textFile("your_hdfs_path")

注意

  • 包括textFile在内的所有基于文件的Spark读入方法,都支持将文件夹、压缩文件、包含通配符的路径作为参数
  • textFile方法也可以传入第二个可选参数来控制文件的分片数量。默认情况下,Spark会为文件的每一个块(在HDFS中块的大小默认是64MB)创建一个分片。但是你也可以通过传入一个更大的值来要求Spark建立更多的分片。注意,分片的数量绝不能小于文件块的数量。

(3)其他

除了文本文件之外,pyspark还支持一些其他的数据格式

  • SparkContext.wholeTextFiles能够读入包含多个小文本文件的目录,然后为每一个文件返回一个(文件名,内容)对。这是与textFile方法为每一个文本行返回一条记录相对应的。
  • RDD.saveAsPickleFile和SparkContext.pickleFile支持将RDD以串行化的Python对象格式存储起来。串行化的过程中会以默认10个一批的数量批量处理。
  • 序列文件和其他Hadoop输入输出格式。

数据库

2.RDD基本操作

RDD的操作,整体上分为两类: 转化操作和启动操作

转化操作

  • 都是惰性求值的,就是说它们并不会立刻真的计算出结果。相反,它们仅仅是记录下了转换操作的操作对象(比如:一个文件)。只有当一个启动操作被执行,要向驱动程序返回结果时,转化操作才会真的开始计算
  • 每一个由转化操作得到的RDD都会在每次执行启动操作时重新计算生成。也可以调用persist或者cache方法将RDD永久化到内存中。

启动操作

常见的转化操作:

转化操作 作用
map(func) 返回一个新的分布数据集,由原数据集元素经func处理后的结果组成
filter(func) 返回一个新的数据集,由传给func返回True的原数据集元素组成
flatMap(func) 与map类似,但是每个传入元素可能有0或多个返回值,func可以返回一个序列而不是一个值
mapParitions(func) 类似map,但是RDD的每个分片都会分开独立运行,所以func的参数和返回值必须都是迭代器
mapParitionsWithIndex(func) 类似mapParitions,但是func有两个参数,第一个是分片的序号,第二个是迭代器。返回值还是迭代器
sample(withReplacement, fraction, seed) 使用提供的随机数种子取样,然后替换或不替换
union(otherDataset) 返回新的数据集,包括原数据集和参数数据集的所有元素
intersection(otherDataset) 返回新数据集,是两个集的交集
distinct([numTasks]) 返回新的集,包括原集中的不重复元素
groupByKey([numTasks]) 当用于键值对RDD时返回(键,值迭代器)对的数据集
aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 用于键值对RDD时返回(K,U)对集,对每一个Key的value进行聚集计算sortByKey([ascending], [numTasks])用于键值对RDD时会返回RDD按键的顺序排序,升降序由第一个参数决定
join(otherDataset, [numTasks]) 用于键值对(K, V)和(K, W)RDD时返回(K, (V, W))对RDD
cogroup(otherDataset, [numTasks]) 用于两个键值对RDD时返回(K, (V迭代器, W迭代器))RDD
cartesian(otherDataset) 用于T和U类型RDD时返回(T, U)对类型键值对RDD
pipe(command, [envVars]) 通过shell命令管道处理每个RDD分片
coalesce(numPartitions) 把RDD的分片数量降低到参数大小
repartition(numPartitions) 重新打乱RDD中元素顺序并重新分片,数量由参数决定
repartitionAndSortWithinPartitions(partitioner) 按照参数给定的分片器重新分片,同时每个分片内部按照键排序

常见的启动操作:

启动操作 作用
reduce(func) 使用func进行聚集计算,func的参数是两个,返回值一个,两次func运行应当是完全解耦的,这样才能正确地并行运算
collect() 向驱动程序返回数据集的元素组成的数组
count() 返回数据集元素的数量
first() 返回数据集的第一个元素
take(n) 返回前n个元素组成的数组
takeSample(withReplacement, num, [seed]) 返回一个由原数据集中任意num个元素的suzuki,并且替换之
takeOrder(n, [ordering]) 返回排序后的前n个元素
saveAsTextFile(path) 将数据集的元素写成文本文件
saveAsSequenceFile(path) 将数据集的元素写成序列文件,这个API只能用于Java和Scala程序
saveAsObjectFile(path) 将数据集的元素使用Java的序列化特性写到文件中,这个API只能用于Java和Scala程序
countByCount() 只能用于键值对RDD,返回一个(K, int) hashmap,返回每个key的出现次数
foreach(func) 对数据集的每个元素执行func, 通常用于完成一些带有副作用的函数,比如更新累加器(见下文)或与外部存储交互等

RDD持久化

主要用的两个方法persistcache

  • Spark的一个重要功能就是在将数据集持久化(或缓存)到内存中以便在多个操作中重复使用。当我们持久化一个RDD是,每一个节点将这个RDD的每一个分片计算并保存到内存中以便在下次对这个数据集(或者这个数据集衍生的数据集)的计算中可以复用。这使得接下来的计算过程速度能够加快(经常能加快超过十倍的速度).

  • 每一个持久化的RDD都有一个可变的存储级别,这个级别使得用户可以改变RDD持久化的储存位置.

  • Spark会自动监视每个节点的缓存使用同时使用LRU算法丢弃旧数据分片。如果你想手动删除某个RDD而不是等待它被自动删除,调用RDD.unpersist()方法。

共享变量

  • 广播变量
  • 累加器

3.RDD分区

有时候需要重新设置Rdd的分区数量:

  • 比如Rdd的分区中,Rdd分区比较多,但是每个Rdd的数据量比较小,需要设置一个比较合理的分区。或者需要把Rdd的分区数量调大。
  • 还有就是通过设置一个Rdd的分区来达到设置生成的文件的数量。

有两种方法是可以重设Rdd的分区:分别是 coalesce()方法和repartition()

spark中的数据是分布式的

4.数据输出/保存

rdd.saveAsTextFile()

【参考资料】

相关文章

  • 1.spark基础-RDD

    1.RDD创建 Spark是以RDD概念为中心运行的。RDD是一个容错的、可以被并行操作的元素集合。创建一个RDD...

  • spark基本工作原理

    1.spark基本工作原理 2.RDD 以及其特性a、RDD是Spark提供的核心抽象,全称为Resillient...

  • Spark Core

    1.spark core1.1 学习方法 1.2 什么是RDD 1.3 源码解释 1.3.1 源码中体现RDD的五...

  • 解析RDD在Spark中的地位

    1.Spark的核心概念是RDD (resilient distributed dataset),指的是一个 只读...

  • Spark 初探总结

    1.spark:分布式/流式数据处理,学习算法 2.数据处理:RDD -> Resilient Distribut...

  • 2020-12.4--Spark-12(Spark-Core)

    distinct算子 数据写入mysql topN案例的性能分析 1.spark基本概念的复习 RDD:是一个...

  • spark技术

    spark技术 1.spark core 基于RDD提供操作接口,利用DAG进行统一的任务规划 2.spark S...

  • Spark面试知识点

    一.Spark架构 1.Spark架构中的组件 2.spark架构揭示了spark的具体流程如下: 二.RDD 1...

  • Spark的算子(函数)

    Spark的算子 1、RDD基础 什么是RDD? RDD(Resilient Distributed Datase...

  • 【Spark入门-笔记】RDD编程

    1 RDD基础 2 创建RDD-两种方式 2.1 进行并行化创建RDD 2.2 读取数据创建RDD 3 RDD操作...

网友评论

    本文标题:1.spark基础-RDD

    本文链接:https://www.haomeiwen.com/subject/mzvrlctx.html