RDD算子操作分类
测试用例说明 前置方法
/**
* 提供初始化方法,完成输出目录的清理
* 在每个Test方法之前先运行
*/
@Before
def init(): Unit ={
val fileSystem: FileSystem = FileSystem.get(new Configuration())
val path = new Path("output")
// 如果输出目录存在,就删除
if (fileSystem.exists(path)){
fileSystem.delete(path,true)
}
}
后置方法
/**
* 每次测试完成后运行
*/
@After
def stop(): Unit ={
sc.stop()
}
成员变量
val sc = new SparkContext(new SparkConf().setAppName("My app").setMaster("local[*]"))
1.taansformation(转换)
它可以实现把一个rdd转换成一个新的rdd,它是延迟加载,不会立即触发任务的真正运行。
比如 flatMap/map/reduceBykey
1.1map
/**
* map 特点: 分区之间是并行(真正并行取决于cores)运算
* *
* 同一个分区内,一个元素执行完所有的转换操作后,才开始下一个元素!
*/
@Test
def testMap(): Unit = {
val list: List[Int] = List(1, 2, 3, 4)
val rdd: RDD[Int] = sc.makeRDD(list, 2)
val rdd2: RDD[Int] = rdd.map(x => {
println(x + "执行了第一次Map操作!")
x
})
val rdd3: RDD[Int] = rdd2.map(x => {
println(x+"执行了第二次Map操作!")
x
})
rdd3.saveAsTextFile("output")
}
image.png
/*
map : def map[U: ClassTag](f: T => U): RDD[U]
对当前RDD中的每个元素执行map操作,返回一个新的元素,将元素放入新的MapPartitionsRDD中!
特点: ①map操作后,不会改变分区数
②分区间的数据也不会发生交换
*/
@Test
def test2(): Unit = {
val list = List(1, 2, 3, 4)
val rdd: RDD[Int] = sc.makeRDD(list, 2)
val rdd1: RDD[Int] = rdd.map(x => x + 1)
rdd1.saveAsTextFile("output")
}
image.png
image.png
1.2 mapPartitions
/*
mapPartitions : 将一个分区作为一个整体,调用一次map函数,转换后生成新的分区集合!
def mapPartitions[U: ClassTag](
f: Iterator[T] => Iterator[U],
preservesPartitioning: Boolean = false): RDD[U]
和map的区别: ①传入的函数不同,map将一个元素转为另一个元素, mapPartitions将一个集合变为另一个集合!
② mapPartiion逻辑: cleanedF(iter) 批处理
map逻辑: iter.map(cleanF) 个体处理
③map是全量处理: RDD中有x个元素,返回的集合也有x个元素
mapPartition只要返回一个集合,进行过滤或添加操作!
④ 本质是mapPartition是一个集合调用一次
在特殊场景,节省性能,例如将一个分区的数据,写入到数据库中
⑤ map是将一个元素的所有转换操作运行结束后,再继续开始下一个元素!
mapPartition: 多个分区并行开始转换操作,一个分区的所有数据全部运行结束后,mapPartition才结束!
一旦某个分区中的元素没有处理完,整个分区的数据都无法释放!需要更大的内存!
spark是分布式运算: 时刻分清Driver 和 Executor
Executor执行的是Task(封装了RDD的执行逻辑)
*/
@Test
def testMapPartitions(): Unit = {
// 封装RDD操作的逻辑
val list = List(1, 2, 3, 4)
val rdd: RDD[Int] = sc.makeRDD(list, 2)
// 将分区中的奇数提取出来
val result: RDD[Int] = rdd.mapPartitions(x => {
x.filter(elem => elem % 2 == 1).toIterator
})
result.saveAsTextFile("output")
}
image.png
image.png
1.3 mapPartitionsWithIndex
/*
mapPartitionsWithIndex : 执行逻辑 f(index, iter) : index是当前分区的索引
iter是分区的迭代器
将一个分区整体执行一次map操作,可以使用分区的index!
*/
@Test
def test7(): Unit = {
val list = List(1, 2, 3, 4)
val rdd: RDD[Int] = sc.makeRDD(list, 2)
val result: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index, iter) => iter.map(elem => (index, elem)))
result.saveAsTextFile("output")
}
image.png
image.png
1.5 flatMap
/*
flatMap : 先map再扁平化。不会改变分区和分区逻辑!
将List(List(1,2),3,List(4,5))进行扁平化操作
*/
@Test
def test() : Unit ={
val list = List(List(1, 2), 3, List(4, 5))
val rdd: RDD[Any] = sc.makeRDD(list, 2)
val result: RDD[Any] = rdd.flatMap {
// 将单个Int,转成集合
case x: Int => List(x)
case y: List[_] => y
}
result.saveAsTextFile("output")
}
image.png
image.png
1.6 glom
/*
glom(): 将一个分区的所有元素合并到一个Array中
*/
@Test
def test2() : Unit ={
val list = List(1,2,3,4)
val rdd: RDD[Any] = sc.makeRDD(list, 2)
val result: RDD[Array[Any]] = rdd.glom()
// result.collect() => Array [ Array[Any],Array[Any] ]
result.collect().foreach(x=>println(x.mkString(",")))
}
image.png
2.action(动作)
它会触发任务的真正运行
比如collect/saveAstextFile
网友评论