Spar05 RDD 转换算子

作者: 山高月更阔 | 来源:发表于2020-05-23 19:34 被阅读0次

RDD 简介

rdd 是 spark 对数据的核心抽象,全称弹性分布式数据集( Resilient Distributed Dataset 简称 RDD )
spark 中的 RDD 是一个不可变的分布式对象集合。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点。RDD 可以包含 Python、Java、Scala 中任意类型的数据对象,甚至可以包含用户自定义对象。
在 spark 中对 RDD 的操作有创建 RDD、转化已有的 RDD 以及调用 RDD 操作进行求值。

创建 RDD

parallelize 方法

python 中的 parallelize() 方法

>>> sc.parallelize(['a','b','c'])
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195

Scala 中的 parallelize() 方法

>>> sc.parallelize(['a','b','c'])
ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:195

textFile 方法

python textFile() 方法

sc.textFile('/path/file')

Scala textFile() 方法

sc.textFile('/path/file')

文件路劲说明
hadoop 的配置 $HADOOP_HOME/etc/hadoop/core-site.xml 中配置

 <property>
        <name>fs.defaultFS</name>
        <value>hdfs://localhost:9000</value>
    </property>

如果有上述配置textFile 默认从hdfs中读取文件
如果需要指定
指定从本地文件中读取

sc.textFile('file:///path/file')

指定从hdfs文件中读取

sc.textFile('hdfs:/path/file')

db中读取数据创建 rdd

可以从db读取数据组装成 list 在通过parallelize 方式创建

有了 RDD 必然需要多 RDD 中的数据进行操作 对 RDD 操作的函数 也称为算子

map 算子

将算子应用于每一个数据并返回一个新的RDD
python

>>> rdd = sc.parallelize([1,2,3])
>>> rdd1=rdd.map(lambda x:x+1)
>>> rdd1.collect()
[2, 3, 4]  

创建一个数字列表RDD map将每个数字加1操作
Scala

scala> val rdd = sc.parallelize(List(1,2,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val rdd1 = rdd.map(_+1)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25
scala> rdd1.collect()
res0: Array[Int] = Array(2, 3, 4)   

从python和Scala的方式来看 语法上略有差异 本质操作还是一样的

flatMap 算子

flatMap 也是作用于每个元素并返回值,将新值组装一个新的 rdd 与 map 不同的是 map返回一个元素 flatMap返回一个列表 flatMap通常用来切分单词
python

>>> rdd = sc.parallelize(['张三 李四','王五 李丽'])
>>> rdd1 = rdd.flatMap(lambda line : line.split(' '))
>>> rdd1.collect()
['张三', '李四', '王五', '李丽']

假如用map做同样操作

>>> rdd = sc.parallelize(['张三 李四','王五 李丽'])
>>> rdd1 = rdd.map(lambda line : line.split(' '))
>>> rdd1.collect()
[['张三', '李四'], ['王五', '李丽']]

通过比较结果很清晰flatMap与map不同。 map返回元素作为rdd 的一个元素 即使是list 也是 rdd 的一个元素 只是这个元素是list flatMap返回list中每个元素都是rdd中的一个元素

Scala

scala> val rdd = sc.parallelize(List("张三 李四","王五 李丽"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:24

scala> val rdd1 = rdd.flatMap(_.split(" "))
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at flatMap at <console>:25

scala> rdd1.collect()
res1: Array[String] = Array(张三, 李四, 王五, 李丽)

distinct 算子

去重
python

>>> rdd = sc.parallelize(['a','a','b','c'])
>>> rdd1 = rdd.distinct()
>>> rdd1.collect()
['b', 'c', 'a'] 

Scala

scala> val rdd = sc.parallelize(List("a","a","b","c"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24

scala> val rdd1 = rdd.distinct()
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at distinct at <console>:25
scala> rdd1.collect()
res2: Array[String] = Array(a, b, c)

sample 算子

从原 rdd 中抽样 有三个参数

  • withReplacement: true表示有放回的采样,false表示无放回采样
  • fraction:期望样本的大小作为RDD大小的一部分
    当withReplacement=false时:选择每个元素的概率;分数一定是[0,1] ;
    当withReplacement=true时:选择每个元素的期望次数; 分数必须大于等于0。
  • seed:随机数生成器的种子

如: 元素不可以多次抽样:withReplacement=false,每个元素被抽取到的概率为0.5:fraction=0.5
python

>>> rdd = sc.parallelize(['a','a','b','c'])
>>> rdd.sample(False,0.5).collect()
['b', 'c']
>>> rdd.sample(False,0.5).collect()
['a', 'c']
>>> rdd.sample(False,0.5).collect()
['a', 'b', 'c']

如: 元素可以多次抽样:withReplacement=true,每个元素被抽取到的期望次数为2:fraction=2
python

>>> rdd = sc.parallelize(['a','a','b','c'])
>>> rdd.sample(True,2).collect()
['a', 'a', 'a', 'b', 'b', 'b', 'c', 'c', 'c']
>>> rdd.sample(True,2).collect()
['a', 'a', 'a', 'a', 'b', 'c', 'c']
>>> rdd.sample(True,2).collect()
['a', 'a', 'a', 'b', 'c', 'c', 'c']

以上算子都是对单个 rdd 操作 下面介绍两个 rdd 之间操作的算子

union 算子

生成一个包含两个rdd中所有元素的rdd
python

>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd1 = sc.parallelize(['c','d','d'])
>>> rdd.union(rdd1).collect()
['a', 'b', 'c', 'c', 'd', 'd']

intersection 算子

求两个rdd的共同元素

>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd1 = sc.parallelize(['c','d','d'])
>>> rdd.intersection(rdd1).collect()
['c']

这个返回结果会去重
比如:

>>> rdd = sc.parallelize(['a','b','c','c'])
>>> rdd1 = sc.parallelize(['c','d','d'])
>>> rdd.intersection(rdd1).collect()
['c']

rdd 中有两个'c' 通过intersection后仍然只有一个'c'

subtract 算子

移除一个 RDD中的内容

>>> rdd = sc.parallelize(['a','b','c','c','a'])
>>> rdd1 = sc.parallelize(['c','d','d'])
>>> rdd.subtract(rdd1).collect()
['a', 'a', 'b']

次例中表示从 rdd中移除含rdd1中元素

cartesian 算子

计算两个算子的笛卡尔积

>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd1 = sc.parallelize(['1','2','3'])
>>> rdd.cartesian(rdd1).collect()
[('a', '1'), ('a', '2'), ('a', '3'), ('b', '1'), ('b', '2'), ('b', '3'), ('c', '1'), ('c', '2'), ('c', '3')]

在计算相似度时特别有用 比如rdd是用户集 rdd1上商品集 需要给用户推荐商品 需计算用户与每个商品的推荐指数 就会用到用户集和商品集的笛卡尔乘积。只是特别注意,求大规模的rdd笛卡尔积开销巨大

以上介绍 rdd 基本算子 下一篇介绍 rdd 行动算子

glom 算子

按分区进行分组
Python

>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd.glom().collect()
[[1, 2], [3, 4], [5, 6], [7, 8, 9, 10]]

说明将数据分为4个分区 第一个分区有数据 1,2 。 第二个分区数据 3,4。第三个分区 5,6 。 第四个分区 7,8,9,10

repartition 将 RDD 进行重新分区

Pyhon

>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd.glom().collect()
[[1, 2], [3, 4], [5, 6], [7, 8, 9, 10]]
>>> rdd.repartition(2).glom().collect()
[[1, 2, 5, 6, 7, 8, 9, 10], [3, 4]]

repartition 会对RDD 数据进行重新组合比较消息性能,但在某些特定场合配置foreachPartition 使用。会对新能有极大提升

conf = SparkConf().setMaster("local[*]").setAppName("My Demo")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
rdd1 = rdd.repartition(2)
def func(ite):
  for i, value in enumerate(ite):
    # 这里会在executor中执行 这里print 只是个例子 说明怎么使用ite
    print(value)

rdd1.foreachPartition(func)

相关文章

  • Spar05 RDD 转换算子

    RDD 简介 rdd 是 spark 对数据的核心抽象,全称弹性分布式数据集( Resilient Distrib...

  • 算子

    spark--------- rdd算子学习 ---------转换算子: 返回值还是一个rdd就是转换是懒加载的...

  • 【Spark】RDD操作详解1——Transformation和

    Spark算子的作用 下图描述了Spark在运行转换中通过算子对RDD进行转换。 算子是RDD中定义的函数,可以对...

  • RDD算子

    一、RDD算子简介 RDD算子分为两类:Transformation(转换)与Action(行动)Transfor...

  • spark面试题

    1、rdd有哪些算子? 主要分为转换算子和action算子。 transformation:map、filte...

  • RDD常见算子

    RDD算子的分类 RDD算子从对数据操作上讲,大致分为两类: 转换(transformations)和行动(act...

  • Spark 算子- Value Transformation

    Spark算子的作用 Spark的输入、运行转换、输出过程,在运行转换中通过算子对RDD进行转换 输入:外部数据空...

  • Spark 转换算子源码

    Spark 转换算子源码 MapPartitionsRDD map 算子 map算子是对RDD中的每一个函数应用传...

  • Spark算子:RDD基本转换操作(7)–zipWithInde

    关键字:Spark算子、Spark RDD基本转换、zipWithIndex、zipWithUniqueId zi...

  • spark常用算子-transformation

    spark常用算子有两种: transformation:RDD中所有转换算子都是延迟加载,从一个RDD到另一个R...

网友评论

    本文标题:Spar05 RDD 转换算子

    本文链接:https://www.haomeiwen.com/subject/rigsohtx.html