- map方法
val valrdd=sc.parallelize(Array(1,2,3,4)).map(x=>2*x).collect
valrdd: Array[Int] = Array(2, 4, 6, 8)
- fillter
val rdd=sc.parallelize(Array(1,2,3,4)).filter(x=>x>1).collect
rdd: Array[Int] = Array(2, 3, 4)
- flatmap
val array = Array("a b c"
,
"d e f"
,
"h i j")
val arrayRDD = sc.parallelize(array)
val resultRDD = arrayRDD.flatMap(_
.split(" "))
resultRDD.foreach(println)
// 结果:a b c d e f h i j
scala> val data =Array(Array(1, 2, 3, 4, 5),Array(4,5,6))
data: Array[Array[Int]] = Array(Array(1, 2, 3, 4, 5), Array(4, 5, 6))
scala> val rdd1=sc.parallelize(data)
rdd1: org.apache.spark.rdd.RDD[Array[Int]] = ParallelCollectionRDD[2] at parallelize at <console>:23
scala> val rdd2=rdd1.flatMap(x=>x.map(y=>y))
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[3] at flatMap at <console>:25
scala> rdd2.collect
res0: Array[Int] = Array(1, 2, 3, 4, 5, 4, 5, 6)
- mapPartitions
mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。:
scala> val f:Iterator[Int] => Iterator[Int] = (iterator) => {
| val result =
| scala.collection.mutable.ListBuffer[Int]()
| while(iterator.hasNext){
| result.append(iterator.next()*2)
| }
| result.iterator
| }
f: Iterator[Int] => Iterator[Int] = <function1>
scala> val resultRDD=listRDD.mapPartitions(f)
resultRDD: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[10] at mapPartitions at <console>:33
scala> resultRDD.foreach(println)
2
4
6
8
10
val sparkConf = new SparkConf().setAppName("Master").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val list = List(1,2,3,4,5,6,7,8,9)
val listRDD = sc.parallelize(list,3)
listRDD.foreach(println)
val f:Iterator[Int] => Iterator[(Int,Int)] = (it) => {
// 构建返回结果的集合
val result:ListBuffer[(Int,Int)] = ListBuffer[(Int,Int)]()
// 判断下一个元素是否达到末尾 -> 判断后继当中的信息是否为空
while (it.hasNext){
// 将游标向后移动一位,取出当前指向的元素
val value = it.next()
result.append((value,value * value))
}
// 获取集合中的迭代器
result.iterator
}
listRDD.mapPartitions(f).foreach({
println
})
- mapPartitionsWithIndex(function)
mapPartitionsWithIndex函数是mapPartitions函数的一个变种
其中的整型参数代表分区索引
val list = List(1,2,3,4,5,6)
val listRDD = sc.parallelize(list,3)
def f(index:Int, iterator:Iterator[Int]):Iterator[(Int,Int)] = {
val result =scala.collection.mutable.ListBuffer[(Int,Int)]()
while (iterator.hasNext){
result.append((index,iterator.next()))
}
result.iterator
}
val resultRDD = listRDD.mapPartitionsWithIndex(f)
resultRDD.foreach(println)
// 结果:(2,5) (1,3) (1,4) (0,1) (2,6) (0,2)
val sparkConf = new SparkConf().setAppName("Master").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val list = List(1,2,3,4,5,6,7,8,9)
val listRDD = sc.parallelize(list,3)
listRDD.foreach(println)
val f:(Int, Iterator[Int]) => Iterator[(Int,Int)] = (index, it) => {
println(index)
// 构建返回结果的集合
val result:ListBuffer[(Int,Int)] = ListBuffer[(Int,Int)]()
// 每个分区进行加和运算
var sum:Int = 0
while (it.hasNext){
sum += it.next()
}
result.append((index,sum))
// 获取集合中的迭代器
result.iterator
}
listRDD.mapPartitionsWithIndex(f).foreach({
println
})
val sparkConf = new SparkConf().setAppName("Master").setMaster("local[*]")
val sc = new SparkContext(sparkConf)
val list = List(1,2,3,4,5,6,7,8,9)
val listRDD = sc.parallelize(list,3)
listRDD.foreach(println)
val f:(Int, Iterator[Int]) => Iterator[(Int,Int)] = (index, it) => {
println(index)
// 构建返回结果的集合
val result:ListBuffer[(Int,Int)] = ListBuffer[(Int,Int)]()
if (index == 0){
while (it.hasNext){
val value = it.next()
result.append((index,value * 2))
}
}else if(index == 1){
while (it.hasNext){
val value = it.next()
result.append((index,value * 3))
}
}else{
while (it.hasNext){
val value = it.next()
result.append((index,value * 4))
}
}
// 获取集合中的迭代器
result.iterator
}
listRDD.mapPartitionsWithIndex(f).foreach({
println
})
- sample(withReplacement,fraction,seed):
返回一个新的RDD,从原RDD中进行采样,withReplacement用于指定是否放回,fraction指定采样比例,seed用于指定种子值
(是否放回,采样比例,种子值)
要抽样,种子值一样,那多次抽样的数据就会相同
如果为false,不放回抽样 (条件是是否放回抽样)
val list = List(1,2,3,4,5,6)
val listRDD = sc.parallelize(list)
val resultRDD = listRDD.sample(false,0.5,0)
resultRDD.foreach(println)
// 结果:1 3 6
``
- union(otherRDD):返回一个新的RDD,对原RDD与目标RDD求并集
val list1 = List(1,2,3,4)
val list2 = List(3,4,5,6)
val listRDD1 = sc.parallelize(list1)
val listRDD2 = sc.parallelize(list2)
val resultRDD = listRDD1.union(listRDD2)
println(resultRDD.collect().length)
resultRDD.foreach(println)
// 结果:8
// 结果:2 1 4 3 3 4 5 6
val rdd1=sc.parallelize(1 to 5)
val rdd2=sc.parallelize(4 to 8)
rdd1.union(rdd2).collect
res13: Array[Int] = Array(1, 2, 3, 4, 5, 4, 5, 6, 7, 8)
- intersection
方法返回两个RDD数据集的交集
scala> rdd1.intersection(rdd2).collect
res14: Array[Int] = Array(4, 5)
- distinct
distinct函数将去除重复元素
scala> val rdd1=sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21
scala> val rdd2=sc.parallelize(4 to 8)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:21
scala> rdd1.union(rdd2).distinct.collect
res0: Array[Int] = Array(6, 1, 7, 8, 2, 3, 4, 5)
- groupByKey([numTasks])
输入数据为(K, V) 对, 返回的是 (K, Iterable) ,numTasks指定task数量,该参数是可选的,下面给出的是无参数的groupByKey方法
scala> val rdd1=sc.parallelize(1 to 5)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:21
scala> val rdd2=sc.parallelize(4 to 8)
rdd2: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:21
scala> rdd1.union(rdd2).map((_,1)).groupByKey.collect
res2: Array[(Int, Iterable[Int])] = Array((6,CompactBuffer(1)), (1,CompactBuffer(1)), (7,CompactBuffer(1)), (8,CompactBuffer(1)), (2,CompactBuffer(1)), (3,CompactBuffer(1)), (4,CompactBuffer(1, 1)), (5,CompactBuffer(1, 1)))
CompactBuffer
val words = List(("good",1),("good,1),("study",1),("day",1),("day",1),("up",1))
val wordsRDD = sc.parallelize(words)
val resultRDD = wordsRDD.groupByKey()
resultRDD.foreach(println)
// 结果:(study,CompactBuffer(1)) (day,CompactBuffer(1,
1)) (good,CompactBuffer(1, 1)) (up,CompactBuffer(1))
- reduceByKey(function[, numPartitions]):
返回一个新的RDD,格式为(K,V),要求原RDD为PairRDD,传入的函数为reduce函数
val words = List(("good",1),("good",1),("study",1),("day",1),("day",1),("up",1))
val wordsRDD = sc.parallelize(words)
val resultRDD = wordsRDD.reduceByKey(_
+
_)
resultRDD.foreach(println)
// 结果:(day,2) (up,1) (study,1) (good,2)
- aggregateByKey(zeroValue[, numPartitions])
aggregate 集合的意思
aggregate 函数将分区的元素与初始化值进行聚合,然后执行combine 操作,将每个分区的的结果进行聚合,最终返回的结果不需要和原RDD 类型一致
下面代码的主要意思是
将list 中的元素求乘积和求和 然后将各片的结果两辆相加
val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Master")
val sc = new SparkContext(sparkConf)
val list = List(1, 2, 3, 4, 5, 6, 7, 8, 9)
val listRDD = sc.parallelize(list,3)
val seqop:((Int,Int),(Int)) => (Int,Int) = {
(zero,value) => {
println(zero)
(zero._1 * value,zero._2 + value)
}
}
val combop:((Int,Int),(Int,Int)) => ((Int,Int)) = {
(par1,par2) => {
// 记录每个分区中的数据
(par1._1 + par2._1,par1._2 + par2._2)
}
}
println(listRDD.aggregate((1,0))(seqop,combop))
}
`
网友评论