RDDs的创建方法:
加载外部数据集
val rddText=sc.textFile("helloSpark.txt")
Scala变量声明:
val:变量值不可修改,一旦分配不能重新指向别的值
var:分配后,可以指向类型相同的值
val lines=sc.textFile("helloSpark.txt")
lines=sc.textFile("helloSpark.txt")
var lines=sc.textFile("helloSpark.txt")
lines=sc.textFile("helloSpark2.txt")
Scala的匿名函数和类型推断
lines.filter(line=>line.contains("world"))
定义一个匿名函数,接受一个参数line,使用line这个String类型变量上的contains方法,并且返回结果。line的类型不需要指定,能够推断出来。
Transformation
Transformations(转换),从之前的RDD
构建一个新的RDD,像map()和filter()
- 逐元素Transformation
map():map()接受函数,把函数应用到RDD的每一个元素,返回新的RDD。
val lines=sc.parallelize(Array("hello","spark","hello","world","!"))
lines.foreach(println)
hello
spark
hello
world
!
val lines2=lines.map(word=>(word,1))
lines2.foreach(println)
(hello,1)
(spark,1)
(hello,1)
(world,1)
(!,1)
- flatMap():
对每个输入元素,输出多个输出元素。flat压扁的意思,将RDD中元素压扁后返回一个新的RDD。
inputs.foreach(println)
hello !
hello spark
hello world
val lines=inputs.flatMap(line=>line.split(" "))
lines.foreach(print)
hellosparkhelloworldhello! #拆开打散压扁了
集合运算
RDDs支持数学集合的计算,例如并集,交集计算。
val rdd1=sc.parallelize(Array("caffe","caffe","panda","monkey","tea"))
val rdd2=sc.parallelize(Array("caffe","monkey","kitty"))
- 去重
val rdd_distinct=rdd1.distinct()
rdd_distinct.foreach(println)
monkey
coffe
panda
tea
- 并集
rdd_union=rdd1.union(rdd2)
rdd_union.foreach(println)
caffe
caffe
panda
monkey
tea
caffe
monkey
kitty
- 交集
val rdd_inter=rdd1.intersection(rdd2)
rdd_inter.foreach(println)
monkey
coffe
- 减集
val rdd_sub=rdd1.subtract(rdd2)
rdd_sub.foreach(println)
tea
panda
网友评论