Spark 的核心函数value类型
1.Transform 算子之 -- 1.map 函数:
遍历RDD 中的每一个元素。
package com.wudl.core
import org.apache.spark.{SparkConf, SparkContext}
/**
* 函数模型:
* def map[U:ClassTag](f.T =>U): RDD[U]
* 功能说明:
* 参数是一个函数,当rdd 需要执行map 的时候 ,会遍历该rdd中的每一个数据项,并
* 依次执行f 函数, 从而产生一个新的函数,即这个新的rdd中的每一个元素都是原来rdd中
* 每个元素依次应用f 函数而得到。
*/
object SparkMap {
def main(args: Array[String]): Unit = {
val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("SparkMap");
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(1 to (4), 2)
val mapRdd = rdd.map(_ * 2)
mapRdd.collect().foreach(println)
sc.stop()
}
}
2.Transform 算子之 -- mapPartitions 就是一分区为单位的执行map
实例:
package com.wudl.core
import org.apache.spark.{SparkConf, SparkContext}
/**
* MapPartions 一次只处理一个分区的数,而map 是一次处理一个元素
*/
object SparkMapPartitions {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[1]").setAppName("SparkMapPartitions")
val sc = new SparkContext(conf)
val rdd = sc.makeRDD(1 to (4), 2)
val mapParRdd = rdd.mapPartitions(x => x.map(_ * 2))
mapParRdd.collect().foreach(println)
sc.stop()
}
}
3.Transform 算子之 -- mapParitionsWithIndex
功能说明: 类似于mapPartitions 比mapPartitions多一个整数参数表示分区号
package com.wudl.core
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}
/**
* 功能说明: 类似于mapPartitions 比mapPartitions多一个整数参数表示分区号
* 需求说明:创建一个RDD 使每个元素跟所在的分区号形成一个元组,组成一个新的rdd
*/
object SparkParittionWitsWithInex {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setAppName("SparkConf").setMaster("local[1]")
val sc = new SparkContext(conf)
val rdd: RDD[Int] = sc.makeRDD(1 to 8, 2)
val rddmap = rdd.mapPartitionsWithIndex((index, item) => {
item.map((index, _))
})
rddmap.collect().foreach(println)
sc.stop()
}
}
执行的结果:
(0,1)
(0,2)
(1,3)
(1,4)
4.Transform 算子之 -- flatMap
将Rdd 中的每一个元素通过应用的函数, 转化为新的元素,并封装到新RDD 中。
package com.wudl.core
import org.apache.spark.{SparkConf, SparkContext}
/**
* 将Rdd 中的每一个元素通过应用的函数, 转化为新的元素,并封装到新RDD 中
*/
object SparkFlatMap {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local[*]").setAppName("SparkFlatMap")
val sc = new SparkContext(conf)
// 创建一个Rdd
val rdd = sc.makeRDD(List(List(1, 2), List(3, 4), List(5, 6)))
rdd.flatMap(list => list).collect().foreach(println)
sc.stop()
}
}
网友评论