RDD 不仅是一组不可变的JVM对象的分布集,可以让你执行高速运算。改数据集是分布式的。基于某种关键字,该数据集被划分成块,同时分发到执行器节点。RDD将跟踪(记入日志)应用于每个块的所有转换,以加快计算速度,并在发生错误和部分数据丢失时提供回退。
内部运行方式:RDD并行操作,每个转换并行执行,从而大大提高速度。数据集转换通常是惰性的。这就意味着任何转换仅在调用数据集上的操作时才执行。
一个示例数据集:
1.统计出某一列中不同值出现的次数
2.选出以字母 t 开头的
3.将结果打印到屏幕上
import findspark
findspark.init()
from pyspark import SparkContext,SparkConf
conf = SparkConf().setAppName("wordcount")
sc =SparkContext(conf=conf)
text_example = " Hooray! It's snowing! It's time to make a snowman.James runs out. He makes a big pile of sn
ow. He puts a big snowball on top. He adds a scarf and a hat. He adds an orange for the nose. He adds coal f
or the eyes and buttons.In the evening, James opens the door. What does he see? The snowman is moving! James
invites him in. The snowman has never been inside a house. He says hello to the cat. He plays with paper to
wels.A moment later, the snowman takes James's hand and goes out.They go up, up, up into the air! They are f
lying! What a wonderful night!The next morning, James jumps out of bed. He runs to the door.He wants to than
k the snowman. But he's gone."
wordCount= sc.parallelize(text_example.split(" ")).map(lambda word:(word,1)).filter(lambda val: val[0].startswith('t')).reduceByKey(lambda a, b : a + b
)
print(wordCount.collect())
输出:[('time', 1), ('to', 4), ('top.', 1), ('the', 9), ('towels.A', 1), ('takes', 1), ('thank', 1)]
创建RDD:
使用.parallelize(...) 集合(元素list 或 array)
data = sc.parallelize([('time', 1), ('to', 4), ('top.', 1), ('the', 9), ('towels.A', 1), ('takes', 1), ('thank', 1)])
引用位于本地或外部的某个文件(或者多个文件)
text_file = sc.textFile("/root/workdir/charlotte.txt")
wordCount= text_file.flatMap(lambda line: line.split(" ")).map(lambda word:(word,1)).reduceByKey(lambda a, b : a + b)
Schema
RDD是无schema的数据结构,可以使用任何类型的数据结构:tuple、dict、list。
data_heterogenous = sc.parallelize([('Ferrari','fast'),{'Porsche':1000000},['Spain','visited',4504]]).collect()
可以访问对象中的数据:data_heterogeous[1]['Porsche']
.collect()方法把RDD的所有元素返回给驱动程序,驱动程序将其序列化成了一个列表。
转换
转换可以调整数据集。包括映射、筛选、连接、转换数据集中的值。
.map(...)转换
该方法应用在每个RDD元素上
.filter(...)转换
该方法可以让你从数据集中选择元素,该元素符合特定的标准。
.flatMap(...)转换
.flatMap(...)返回一个扁平的结果,而不是一个列表。
.distinct(...)转换
该方法返回指定列中不同值的列表。
.sample(...)转换
该方法返回数据集中的随机样本。
.leftOuterJoin(...)转换
根据两个数据集中都有的值来连接两个RDD,并返回左侧的RDD记录,而右边的记录附加在两个RDD匹配的地方。
.repartition(...)转换
重新对数据集进行分区,改变了数据集分区的数量。
操作
和转换不同,操作执行数据集上的计划任务。一旦完成数据转换,则可以执行相应转换。
.take(...)方法
返回单个数据分区的前n行
.takeSample(...)
返回随机记录
.collect(...)
返回所有RDD的元素给驱动程序。
.reduce(...)方法
使用指定的方法减少RDD中的元素。
.reduceByKey(...)方法
和.reduce(...)类似,但只在键-键基础上进行。
.count(...)方法
统计RDD里的元素数量
.saveAsTextFile(...)方法
对RDD执行.saveAsTextFile(...)可以让RDD保存为文本文件:每个文件一个分区。
.foreach(...)方法
对RDD里的每个元素,用迭代的方法应用相同的函数。
总结:RDD是无schema的数据结构,是Spark的核心。Spark中的转换是惰性的,它们只在操作被调用是执行。
网友评论