本文全部手写原创,请勿复制粘贴、转载请注明出处,谢谢配合!
Action操作和Transformation操作的区别
-
惰性求值:Action操作会触发实际的计算,而Transformation是没有触发实际计算的,是惰性求值的(见下一篇博客)
-
返回类型:Transformation操作是一个RDD转化为一个新的RDD,即返回RDD,而Action操作返回其他数据类型。
-
输出结果:Action操作会有实际结果的输出,向驱动器程序返回结果或者把结果写入外部系统。Transformation并无实际输出。
Action操作常用函数
-
reduce(func) 根据函数规则对数据集进行整合
-
count() 返回元素个数
-
first() 返回第一个元素
-
collect() 返回数据集所有元素,注意内存溢出问题,只有当你的整个数据集在单台机器中内存放得下时才使用
-
top(n) 按默认或指定排序返回前n个元素,默认按降序
-
take(n) 返回前n个元素
scala> val rdd = sc.parallelize(1 to 10)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[51] at parallelize at <console>:24
scala> rdd.reduce(_+_)
res25: Int = 55
scala> rdd.count()
res26: Long = 10
scala> rdd.first()
res27: Int = 1
scala> rdd.collect()
res28: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
scala> rdd.take(5)
res29: Array[Int] = Array(1, 2, 3, 4, 5)
- saveAsTextFile(path) 存储最后结果到文件系统中
scala> rdd.map(x=>(x,1)).saveAsTextFile("hdfs://master/user/out1")
- countByKey() 分别计算每个Key的个数
scala> val rdd = sc.parallelize(List(("spark",3),("spark",2),("hello",2)))
rdd: org.apache.spark.rdd.RDD[(String, Int)] = ParallelCollectionRDD[56] at parallelize at <console>:24
scala> rdd.countByKey()
res40: scala.collection.Map[String,Long] = Map(spark -> 2, hello -> 1)
- aggregate聚合函数
def aggregate[U](zeroValue: U)(seqOp: (U, T) ⇒ U, combOp: (U, U) ⇒ U)(implicit arg0: ClassTag[U]): U
意思是说,对于每个分区的元素,进行某种操作seqOp: (U, T) ⇒ U,然后聚合这些分区的元素,combOp: (U, U) ⇒ U,(zeroValue: U)是一个初始值。看案例解释比较清楚:
scala> val rdd = sc.parallelize(1 to 10,2)
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[49] at parallelize at <console>:24
scala> rdd.aggregate(1)((x,y)=>x+y,(a,b)=>(a*b))
res23: Int = 656
1,2,3,4,5,6,7,8,9,10分成了两个区
分区一:1,2,3,4,5 进行(x,y)=>x+y 注意有初始值1 即1+1+2+3+4+5=16
分区二:6,7,8,9,10 进行(x,y)=>x+y 注意有初始值1 即 1+6+7+8+9+10=41
对这两个分区的结果16和41 进行(a,b)=>ab 即1641=656
更多函数请参考spark API,看API永远是最好的学习方式没有之一。
网友评论