基本操作
- 创建RDD
var data = Array(1,2,3,4) //数组
var distData = sc.parallelize(data,3) //创建RDD
distData.collect //收集展示
distData.take(1) //取出第一行/数组中一个数据占一行
- 读取文件数据
var distFile = sc.textFile("data.txt")//按行读
distFile.take(1) //取出一行数据
//也可以同时读取多个文件,相当于多个文件拼接
//读取整个目录下的所有文件
//读取含有通配符的目录
textFile("/input/*.txt")
- map操作,指定一个函数产生新的RDD,元素直接是一对一的关系。
var rdd1 = sc.parallelize(1 to 9,3)
var rdd2 = rdd1.map(x=>x*2)
rdd2.collect
res1: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)
- filter对RDD进行过滤,返回过滤后的RDD
var rdd3 = rdd2.filter(x=>x>10)
rdd3.collect
res5: Array[Int] = Array(12, 14, 16, 18)
- flatmap是一个一对多的map
var rdd4 = rdd3.flatMap(x=>x to 20)
rdd4: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[6] at flatMap at <console>:30
scala> rdd4.collect
res6: Array[Int] = Array(12, 13, 14, 15, 16, 17, 18, 19, 20, 14, 15, 16, 17, 18, 19, 20, 16, 17, 18, 19, 20, 18, 19, 20)
- sample产生随机数组
var a = sc.parallelize(1 to 1000,3)
a.sample(false,0.1,0)
a.sample(false,0.1,0).count()
第一个参数是是否放回抽样,比例,随机种子
- 交集与并集
rdd1.union(rdd2)
rdd1.intersection(rdd2)
- 去重
distinct
- 分组groupbykey
val rdd0 = sc.parallelize(Array((1,1),(1,2),(2,3)))
val rdd11 = rdd0.groupByKey()
rdd11.collect
- reduceByKey
是数据分组聚合操作,在一个key的数据集上使用
val rdd0 = sc.parallelize(Array((1,1),(1,2),(2,3),(2,1),(2,2),(2,3),3))
var rd,d = rdd0.reduceByKey((x,y)=>x+y)
rdd.collect
(1,6)(2,6)
- combineByKey数据集合并
val data = Array((1,1.0),(1,2.0),(1,3.0),(2,4.0))
网友评论