image.png
reduce
/**
* 算出总价
* 注意点:
* 1. 函数中的curr参数,并不是value,而是一整条数据
* 2. reduce 整体上的结果,只有一个
*/
@Test
def reduce(): Unit = {
val tuple: (String, Double) = sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0)))
.reduce((curr, agg) => ("总价", curr._2 + agg._2))
println(tuple)
}
foreach
@Test
def foeach(): Unit = {
sc.parallelize(Seq(1, 3, 2, 4, 5, 7, 6))
.foreach(println(_))
}
countAndCountByKey
/**
* count 和 countByKey 的结果相差很远,每次调用Action 都会生成一个job,job会运行获取结果
* 所以两个job 会有大量的log,其实就是在运行 job
*
* countByKey 的运算结果是 Map(电脑 -> 1, 手机 -> 2)
* 数据倾斜,如果要解决数据倾斜的问题,是不是要先知道谁倾斜,通过countByKey可以查看对应数据的总数,从而解决数据倾斜的问题
*/
@Test
def countAndCountByKey(): Unit = {
println(sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0)))
.count())
println("================================== ")
println(sc.parallelize(Seq(("手机", 10.0), ("手机", 15.0), ("电脑", 20.0)))
.countByKey())
}
take
/**
* take 和 takeSample都是获取数据,一个是 直接获取,一个是采样获取
* frist 一般情况下,action会从所有分区获取数据,相对来说速度就比较慢,
* frist 只是获取第一个元素,所以frist只会处理第一个分区,所以速度很快,无序处理所有数据
*/
@Test
def take(): Unit = {
val rdd: RDD[Int] = sc.parallelize(Seq(1, 2, 3, 4, 5, 6))
rdd.take(3).foreach(println(_))
println(rdd.first())
rdd.takeSample(withReplacement = true, num = 3).foreach(println(_))
}
网友评论