(1)弹性分布数据集
RDD以分区的形式分布在集群中多个机器上,每个分区代表了数据集的一个子集。分区定义了spark中数据的并行单位。Spark 框架并行处理多个分区,一个 分区内的数据对象则是顺序处理。创建 RDD 最简单的方法是在本地对象集合上调用 SparkContext 的 parallelize 方法。
(2)RDD基础
#在python中使用textFile创建一个字符串的RDD
'''
测试文本abc.txt
Jimmy A 1
Jimmy B 8
Sam C 10
Tom A 7
'''
lines=sc.textFile("/user/test/abc.txt")
pythonlines=lines.filter(lambda line:"Jimmy" in line)
pythonlines.first()
#u'Jimmy\tA\t1'
spark或shell 会话都按照如下方式工作:
(i)从外部数据创建出输入RDD
(ii)使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD
(iii)告诉spark对需要被重用的中间结果RDD执行persist()操作。
(iv)使用行动操作来触发一次并行计算,spark会对计算进行优化后再执行。
(3)创建RDD
#python 中的parallelize()方法
lines=sc.parallelize(["pandas","i like pands"])
(4)RDD操作
RDD 操作两种类型:转化操作和行动操作
# filter 转化操作
lines=sc.textFile("/user/test/abc.txt")
flines=lines.filter(lambda x:"Sam" in x)
# union() 转化操作
glines=lines.filter(lambda x:"Tom" in x)
linesunion=flines.union(glines)
# 行动操作,计数
print linesunion.count()
# 2
(5)向spark传递函数
# 在python中传递函数
word=lines.filter(lambda s:"Tom" in s)
def containsTom(s):
return "Tom" in s
word=lines.filter(containsTom)
在scala中,可以把定义的内联函数、方法的引用或静态方法传递给spark。
#scala 中传递函数
import org.apache.spark.rdd.RDD
class SearchFunction(val query:String){
def isMatch(s:String):Boolean={
s.contains(query)
}
def getMatchedNoReference(rdd:RDD[String]):RDD[String]={
val query_ = this.query
rdd.flatMap(x => x.split(query_))
}
}
(4)常见的转化操作和行动操作
基本RDD
--针对各个元素的转化操作
最常用的转化操作是map()和filter()。转化操作map()接收一个函数,把这个函数用于RDD的每个元素,将函数的返回结果作为结果RDD中的对应元素的值。
map
# python 中计算RDD中各个值的平方
nums=sc.parallelize([1,2,3,5])
squared=nums.map(lambda x:x*x).collect()
for num in squared:
print "%i" %(num)
'''
1
4
9
25
'''
// scala中计算RDD中各个值的平方
val input=sc.parallelize(List(1,2,3,4))
val result=input.map(x=>x*x)
println(result.collect().mkString(","))
// 1,4,9,16
flatMap
#python 中的flatMap()将行数据切分为单词
lines = sc.parallelize(["hello world","hi"])
words=lines.flatMap(lambda line:line.split(" "))
words.first()
# hello
//scala 中的flatMap()中将行数据切分为单词
val lines=sc.parallelize(List("hello world","hi"))
val words=lines.flatMap(line=>line.split(" "))
words.first()
//res1: String = hello
伪集合操作
最简单的集合操作是union(other),返回一个包含两个RDD中所有元素的RDD。RDD.distinct()转化操作可以生成一个包含不同元素的新RDD,distinct开销很大,需要将所有数据通过网络进行混洗(shuffle)。
spark 提供了intersection(other)方法,只返回两个RDD中都有的元素。intersection会在运行时去掉所有重复的元素(单个RDD内重复的元素也会一起移除),intersection()和union()的概念类似,但是intersection性能差很多,它需要通过网络混洗数据来发现共有的元素。
计算笛卡尔积 cartesian(other)转化操作会返回所有可能的(a,b)对。
单个RDD操作
map() 将函数应用于RDD中的每个元素,将返回值构成新的RDD,rdd.map(x=>x+1)
flatMap()将函数应用于RDD中的每个元素,将返回的迭代器的所有内容构成新的RDD,通常用来切分单词。rdd.flatMap(x=>x.to(3))
filter() 返回一个由通过传给filter()的函数的元素组成吃的RDD。rdd.filter(x=>x!=1)
distinct()去重 rdd.distinct()
sample(withReplacement,fraction,[seed]) 采样,是否替换。 rdd.sample(false,0.5)
两个RDD操作
union 生成一个包含两个RDD中所有元素的RDD。rdd.union(other)
intersection() 求两个RDD共同的元素的RDD。rdd.intersection(other)
subtract() 移除一个RDD中的内容。rdd.subtract()
cartesian() 与另一个RDD的笛卡尔积。 rdd.cartesian(other)
行动操作
基本的RDD最长见的行动操作reduce()。接收一个函数作为参数,这个函数要操作两个相同元素类型的RDD数据并返回一个同样类型的新元素。
#python 中的reduce()
sum=rdd.reduce(lambda x,y:x+y)
//scala中的reduce()
val sum=rdd.reduce((x,y)=>x+y)
fold和reduce类似,接收一个与reduce()接收的函数签名相同的函数,再加上一个初始值来作为每个分区第一次调用时的结果。
aggregate()函数把从返回值类型必须与所操作的RDD类型相同的限制中解放出来。
#python 中的aggregate()
nums=sc.parallelize([1,2,3,4])
seq0p=(lambda x,y:(x[0]+y,x[1]+1))
comb0p=(lambda x,y:(x[0]+y[0],x[1]+y[1]))
nums.aggregate((0,0),seq0p,comb0p)
//(10, 4)
//scala中的aggregate()
val result=input.aggregate((0,0))(
(acc,value)=>(acc._1+value,acc._2+1),
(acc1,acc2)=>(acc1._1+acc2._1,acc1._2+acc2._2)
)
基本的RDD行动操作
collect() 返回RDD中的所有元素。rdd.collect()
count() RDD中的元素个数。 rdd.count()
countByValue() 各元素在RDD中出现的次数。rdd.countByValue()
take(num) 从RDD中返回num个元素。rdd.take(num)
top(num) 从RDD中按照提供的顺序返回最前面的num个元素。rdd.top()
takeOrdered(num) 从RDD中按照提供的顺序返回最前面的num个元素。rdd.takeOrdered(num)
takeSample(withReplacement,num,[seed]) 从RDD中返回一些任意元素。
reduce(func) 并行整合RDD中所有数据。rdd.reduce((x,y)=>x+y)
fold(zero)(func) 和reduce一样,但是需要初始值。rdd.fold(0)((x,y)=>x+y)
aggregate(zerovalue)(seq0p,comb0p)和reduce相似,但是通常返回不同类型的函数。rdd.aggregate((0,0))((x,y)=>(x._1+y,x._2+1),(x,y)=>(x._1+y._1,x._2+y._2)
foreach(func) 对RDD中的每个元素使用给定的函数。rdd.foreach(func)
(5)持久化(缓存)
SparkRDD是惰性求值的。有时候希望能多次使用同一个RDD,可以给RDD选择不同的持久化级别。在Scala和Java中,默认情况下,persist()会把数据以序列化的形式缓存在JVM的堆空间中,在python中,会始终序列化要持久化存储的数据,所以持久化级别默认值就是以序列话后的对象存储在JVM的堆空间中。
//scala中使用persist()
import org.apache.spark.storage.StorageLevel
val result=input.map(x=>x*x)
result.persist(StorageLevel.DISK_ONLY)
println(result.count())
println(result.collect().mkString(","))
)
网友评论