RDD 简介
rdd 是 spark 对数据的核心抽象,全称弹性分布式数据集( Resilient Distributed Dataset 简称 RDD )
spark 中的 RDD 是一个不可变的分布式对象集合。每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点。RDD 可以包含 Python、Java、Scala 中任意类型的数据对象,甚至可以包含用户自定义对象。
在 spark 中对 RDD 的操作有创建 RDD、转化已有的 RDD 以及调用 RDD 操作进行求值。
创建 RDD
parallelize 方法
python 中的 parallelize() 方法
>>> sc.parallelize(['a','b','c'])
ParallelCollectionRDD[0] at parallelize at PythonRDD.scala:195
Scala 中的 parallelize() 方法
>>> sc.parallelize(['a','b','c'])
ParallelCollectionRDD[2] at parallelize at PythonRDD.scala:195
textFile 方法
python textFile() 方法
sc.textFile('/path/file')
Scala textFile() 方法
sc.textFile('/path/file')
文件路劲说明
hadoop 的配置 $HADOOP_HOME/etc/hadoop/core-site.xml 中配置
<property>
<name>fs.defaultFS</name>
<value>hdfs://localhost:9000</value>
</property>
如果有上述配置textFile 默认从hdfs中读取文件
如果需要指定
指定从本地文件中读取
sc.textFile('file:///path/file')
指定从hdfs文件中读取
sc.textFile('hdfs:/path/file')
db中读取数据创建 rdd
可以从db读取数据组装成 list 在通过parallelize 方式创建
有了 RDD 必然需要多 RDD 中的数据进行操作 对 RDD 操作的函数 也称为算子
map 算子
将算子应用于每一个数据并返回一个新的RDD
python
>>> rdd = sc.parallelize([1,2,3])
>>> rdd1=rdd.map(lambda x:x+1)
>>> rdd1.collect()
[2, 3, 4]
创建一个数字列表RDD map将每个数字加1操作
Scala
scala> val rdd = sc.parallelize(List(1,2,3))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:24
scala> val rdd1 = rdd.map(_+1)
rdd1: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[1] at map at <console>:25
scala> rdd1.collect()
res0: Array[Int] = Array(2, 3, 4)
从python和Scala的方式来看 语法上略有差异 本质操作还是一样的
flatMap 算子
flatMap 也是作用于每个元素并返回值,将新值组装一个新的 rdd 与 map 不同的是 map返回一个元素 flatMap返回一个列表 flatMap通常用来切分单词
python
>>> rdd = sc.parallelize(['张三 李四','王五 李丽'])
>>> rdd1 = rdd.flatMap(lambda line : line.split(' '))
>>> rdd1.collect()
['张三', '李四', '王五', '李丽']
假如用map做同样操作
>>> rdd = sc.parallelize(['张三 李四','王五 李丽'])
>>> rdd1 = rdd.map(lambda line : line.split(' '))
>>> rdd1.collect()
[['张三', '李四'], ['王五', '李丽']]
通过比较结果很清晰flatMap与map不同。 map返回元素作为rdd 的一个元素 即使是list 也是 rdd 的一个元素 只是这个元素是list flatMap返回list中每个元素都是rdd中的一个元素
Scala
scala> val rdd = sc.parallelize(List("张三 李四","王五 李丽"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[4] at parallelize at <console>:24
scala> val rdd1 = rdd.flatMap(_.split(" "))
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at flatMap at <console>:25
scala> rdd1.collect()
res1: Array[String] = Array(张三, 李四, 王五, 李丽)
distinct 算子
去重
python
>>> rdd = sc.parallelize(['a','a','b','c'])
>>> rdd1 = rdd.distinct()
>>> rdd1.collect()
['b', 'c', 'a']
Scala
scala> val rdd = sc.parallelize(List("a","a","b","c"))
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[10] at parallelize at <console>:24
scala> val rdd1 = rdd.distinct()
rdd1: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at distinct at <console>:25
scala> rdd1.collect()
res2: Array[String] = Array(a, b, c)
sample 算子
从原 rdd 中抽样 有三个参数
- withReplacement: true表示有放回的采样,false表示无放回采样
- fraction:期望样本的大小作为RDD大小的一部分
当withReplacement=false时:选择每个元素的概率;分数一定是[0,1] ;
当withReplacement=true时:选择每个元素的期望次数; 分数必须大于等于0。 - seed:随机数生成器的种子
如: 元素不可以多次抽样:withReplacement=false,每个元素被抽取到的概率为0.5:fraction=0.5
python
>>> rdd = sc.parallelize(['a','a','b','c'])
>>> rdd.sample(False,0.5).collect()
['b', 'c']
>>> rdd.sample(False,0.5).collect()
['a', 'c']
>>> rdd.sample(False,0.5).collect()
['a', 'b', 'c']
如: 元素可以多次抽样:withReplacement=true,每个元素被抽取到的期望次数为2:fraction=2
python
>>> rdd = sc.parallelize(['a','a','b','c'])
>>> rdd.sample(True,2).collect()
['a', 'a', 'a', 'b', 'b', 'b', 'c', 'c', 'c']
>>> rdd.sample(True,2).collect()
['a', 'a', 'a', 'a', 'b', 'c', 'c']
>>> rdd.sample(True,2).collect()
['a', 'a', 'a', 'b', 'c', 'c', 'c']
以上算子都是对单个 rdd 操作 下面介绍两个 rdd 之间操作的算子
union 算子
生成一个包含两个rdd中所有元素的rdd
python
>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd1 = sc.parallelize(['c','d','d'])
>>> rdd.union(rdd1).collect()
['a', 'b', 'c', 'c', 'd', 'd']
intersection 算子
求两个rdd的共同元素
>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd1 = sc.parallelize(['c','d','d'])
>>> rdd.intersection(rdd1).collect()
['c']
这个返回结果会去重
比如:
>>> rdd = sc.parallelize(['a','b','c','c'])
>>> rdd1 = sc.parallelize(['c','d','d'])
>>> rdd.intersection(rdd1).collect()
['c']
rdd 中有两个'c' 通过intersection后仍然只有一个'c'
subtract 算子
移除一个 RDD中的内容
>>> rdd = sc.parallelize(['a','b','c','c','a'])
>>> rdd1 = sc.parallelize(['c','d','d'])
>>> rdd.subtract(rdd1).collect()
['a', 'a', 'b']
次例中表示从 rdd中移除含rdd1中元素
cartesian 算子
计算两个算子的笛卡尔积
>>> rdd = sc.parallelize(['a','b','c'])
>>> rdd1 = sc.parallelize(['1','2','3'])
>>> rdd.cartesian(rdd1).collect()
[('a', '1'), ('a', '2'), ('a', '3'), ('b', '1'), ('b', '2'), ('b', '3'), ('c', '1'), ('c', '2'), ('c', '3')]
在计算相似度时特别有用 比如rdd是用户集 rdd1上商品集 需要给用户推荐商品 需计算用户与每个商品的推荐指数 就会用到用户集和商品集的笛卡尔乘积。只是特别注意,求大规模的rdd笛卡尔积开销巨大
以上介绍 rdd 基本算子 下一篇介绍 rdd 行动算子
glom 算子
按分区进行分组
Python
>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd.glom().collect()
[[1, 2], [3, 4], [5, 6], [7, 8, 9, 10]]
说明将数据分为4个分区 第一个分区有数据 1,2 。 第二个分区数据 3,4。第三个分区 5,6 。 第四个分区 7,8,9,10
repartition 将 RDD 进行重新分区
Pyhon
>>> rdd = sc.parallelize([1,2,3,4,5,6,7,8,9,10])
>>> rdd.glom().collect()
[[1, 2], [3, 4], [5, 6], [7, 8, 9, 10]]
>>> rdd.repartition(2).glom().collect()
[[1, 2, 5, 6, 7, 8, 9, 10], [3, 4]]
repartition 会对RDD 数据进行重新组合比较消息性能,但在某些特定场合配置foreachPartition 使用。会对新能有极大提升
conf = SparkConf().setMaster("local[*]").setAppName("My Demo")
sc = SparkContext(conf=conf)
rdd = sc.parallelize([1, 2, 3, 4, 5, 6, 7, 8, 9, 10])
rdd1 = rdd.repartition(2)
def func(ite):
for i, value in enumerate(ite):
# 这里会在executor中执行 这里print 只是个例子 说明怎么使用ite
print(value)
rdd1.foreachPartition(func)
网友评论