美文网首页
spark-RDD算子操作分类

spark-RDD算子操作分类

作者: 百年叔叔 | 来源:发表于2020-07-13 08:39 被阅读0次

RDD算子操作分类

测试用例说明 前置方法

   /**
   * 提供初始化方法,完成输出目录的清理
   * 在每个Test方法之前先运行
   */
  @Before
  def init(): Unit ={

    val fileSystem: FileSystem = FileSystem.get(new Configuration())

    val path = new Path("output")

    // 如果输出目录存在,就删除
    if (fileSystem.exists(path)){
      fileSystem.delete(path,true)
    }

  }

后置方法

  /**
   *  每次测试完成后运行
   */
  @After
  def stop(): Unit ={
    sc.stop()
  }

成员变量

val sc = new SparkContext(new SparkConf().setAppName("My app").setMaster("local[*]"))

1.taansformation(转换)

它可以实现把一个rdd转换成一个新的rdd,它是延迟加载,不会立即触发任务的真正运行。
比如 flatMap/map/reduceBykey

1.1map

/**
   * map 特点:  分区之间是并行(真正并行取决于cores)运算
   * *
   * 同一个分区内,一个元素执行完所有的转换操作后,才开始下一个元素!
   */
  @Test
  def testMap(): Unit = {
    val list: List[Int] = List(1, 2, 3, 4)

    val rdd: RDD[Int] = sc.makeRDD(list, 2)

    val rdd2: RDD[Int] = rdd.map(x => {
      println(x + "执行了第一次Map操作!")
      x
    })

    val rdd3: RDD[Int] = rdd2.map(x => {
      println(x+"执行了第二次Map操作!")
      x
    })
    rdd3.saveAsTextFile("output")
  }
image.png

  /*
      map :  def map[U: ClassTag](f: T => U): RDD[U]

              对当前RDD中的每个元素执行map操作,返回一个新的元素,将元素放入新的MapPartitionsRDD中!

              特点: ①map操作后,不会改变分区数
                     ②分区间的数据也不会发生交换
   */
  @Test
  def test2(): Unit = {

    val list = List(1, 2, 3, 4)

    val rdd: RDD[Int] = sc.makeRDD(list, 2)

    val rdd1: RDD[Int] = rdd.map(x => x + 1)

    rdd1.saveAsTextFile("output")

  }
image.png
image.png

1.2 mapPartitions

 /*
      mapPartitions :   将一个分区作为一个整体,调用一次map函数,转换后生成新的分区集合!
      def mapPartitions[U: ClassTag](
      f: Iterator[T] => Iterator[U],
      preservesPartitioning: Boolean = false): RDD[U]

        和map的区别: ①传入的函数不同,map将一个元素转为另一个元素, mapPartitions将一个集合变为另一个集合!
                     ② mapPartiion逻辑:  cleanedF(iter)    批处理
                        map逻辑:  iter.map(cleanF)         个体处理
                     ③map是全量处理:  RDD中有x个元素,返回的集合也有x个元素
                        mapPartition只要返回一个集合,进行过滤或添加操作!
                     ④ 本质是mapPartition是一个集合调用一次
                          在特殊场景,节省性能,例如将一个分区的数据,写入到数据库中
                     ⑤ map是将一个元素的所有转换操作运行结束后,再继续开始下一个元素!
                       mapPartition: 多个分区并行开始转换操作,一个分区的所有数据全部运行结束后,mapPartition才结束!
                            一旦某个分区中的元素没有处理完,整个分区的数据都无法释放!需要更大的内存!



         spark是分布式运算: 时刻分清Driver 和 Executor
                            Executor执行的是Task(封装了RDD的执行逻辑)



   */
 @Test
  def testMapPartitions(): Unit = {

    // 封装RDD操作的逻辑
    val list = List(1, 2, 3, 4)

    val rdd: RDD[Int] = sc.makeRDD(list, 2)

    // 将分区中的奇数提取出来
    val result: RDD[Int] = rdd.mapPartitions(x => {

      x.filter(elem => elem % 2 == 1).toIterator
    })

    result.saveAsTextFile("output")
  }
image.png
image.png

1.3 mapPartitionsWithIndex

 /*
      mapPartitionsWithIndex :  执行逻辑  f(index, iter) : index是当前分区的索引
                                                          iter是分区的迭代器

                                 将一个分区整体执行一次map操作,可以使用分区的index!
   */
  @Test
  def test7(): Unit = {

    val list = List(1, 2, 3, 4)

    val rdd: RDD[Int] = sc.makeRDD(list, 2)

    val result: RDD[(Int, Int)] = rdd.mapPartitionsWithIndex((index, iter) => iter.map(elem => (index, elem)))

    result.saveAsTextFile("output")
  }
image.png
image.png

1.5 flatMap


  /*
        flatMap : 先map再扁平化。不会改变分区和分区逻辑!

        将List(List(1,2),3,List(4,5))进行扁平化操作


   */
  @Test
  def test() : Unit ={

    val list = List(List(1, 2), 3, List(4, 5))

    val rdd: RDD[Any] = sc.makeRDD(list, 2)

    val result: RDD[Any] = rdd.flatMap {
      // 将单个Int,转成集合
      case x: Int => List(x)
      case y: List[_] => y

    }

    result.saveAsTextFile("output")

  }
image.png
image.png

1.6 glom

  /*
      glom():  将一个分区的所有元素合并到一个Array中
   */
  @Test
  def test2() : Unit ={

    val list = List(1,2,3,4)

    val rdd: RDD[Any] = sc.makeRDD(list, 2)

    val result: RDD[Array[Any]] = rdd.glom()

    //  result.collect() => Array [ Array[Any],Array[Any] ]
    result.collect().foreach(x=>println(x.mkString(",")))

  }
image.png

2.action(动作)

它会触发任务的真正运行
比如collect/saveAstextFile

相关文章

  • spark-RDD算子操作分类

    RDD算子操作分类 测试用例说明 前置方法 后置方法 成员变量 1.taansformation(转换) 它可以实...

  • RDD常见算子

    RDD算子的分类 RDD算子从对数据操作上讲,大致分为两类: 转换(transformations)和行动(act...

  • Spark-RDD算子

    map 说明 通过指定变换函数将原有RDD中的元素逐个进行变换,并返回一个新的RDD 示例 结果 246810 ...

  • Spark-RDD行动算子

    reduce(func) 说明通过func函数聚合RDD中所有元素,先聚合分区内元素,再聚合分区间元素 示例val...

  • Spark算子

    一、算子分类1、transformation算子:这类算子并不触发提交作业,完成作业中间过程处理Transform...

  • RDD 算子分类

    摘要: 本文主要介绍Spark算子的作用,以及算子的分类。 转换:Transformation , 行动: Act...

  • Spark-RDD操作MySQL

    Spark支持通过Java JDBC访问关系型数据库,需要通过JdbcRDD进行访问,示例如下: 添加依赖 MyS...

  • Flink 2.1State

    无状态算子:map/flatmap/filter等简单算子有状态算子:window,reduce等操作,需要依赖之...

  • Spark常用算子详解 2020-05-07

    Spark常用算子详解 Spark的算子的分类 从大方向来说,Spark 算子大致可以分为以下两类: 1)Tran...

  • Spark算子总结版

    Spark的算子的分类 从大方向来说,Spark 算子大致可以分为以下两类: 1)Transformation 变...

网友评论

      本文标题:spark-RDD算子操作分类

      本文链接:https://www.haomeiwen.com/subject/zeiocktx.html