美文网首页
Spark -Core 之---Transformation

Spark -Core 之---Transformation

作者: wudl | 来源:发表于2020-12-03 11:37 被阅读0次

    Spark 的核心函数value类型

    1.Transform 算子之 -- 1.map 函数:

    遍历RDD 中的每一个元素。

    package com.wudl.core
    import org.apache.spark.{SparkConf, SparkContext}
    /**
     * 函数模型:
     *          def map[U:ClassTag](f.T =>U): RDD[U]
     *  功能说明:
     *            参数是一个函数,当rdd 需要执行map 的时候 ,会遍历该rdd中的每一个数据项,并
     *            依次执行f 函数, 从而产生一个新的函数,即这个新的rdd中的每一个元素都是原来rdd中
     *            每个元素依次应用f 函数而得到。
     */
    object SparkMap {
      def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setMaster("local[1]").setAppName("SparkMap");
        val sc = new SparkContext(conf)
        val rdd = sc.makeRDD(1 to (4), 2)
        val mapRdd = rdd.map(_ * 2)
        mapRdd.collect().foreach(println)
        sc.stop()
      }
    }
    

    2.Transform 算子之 -- mapPartitions 就是一分区为单位的执行map

    实例:

    package com.wudl.core
    import org.apache.spark.{SparkConf, SparkContext}
    /**
     * MapPartions 一次只处理一个分区的数,而map 是一次处理一个元素
     */
    object SparkMapPartitions {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[1]").setAppName("SparkMapPartitions")
        val sc = new SparkContext(conf)
        val rdd = sc.makeRDD(1 to (4), 2)
        val mapParRdd = rdd.mapPartitions(x => x.map(_ * 2))
        mapParRdd.collect().foreach(println)
        sc.stop()
      }
    }
    
    

    3.Transform 算子之 -- mapParitionsWithIndex

    功能说明: 类似于mapPartitions 比mapPartitions多一个整数参数表示分区号

    package com.wudl.core
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * 功能说明: 类似于mapPartitions  比mapPartitions多一个整数参数表示分区号
     * 需求说明:创建一个RDD  使每个元素跟所在的分区号形成一个元组,组成一个新的rdd
     */
    object SparkParittionWitsWithInex {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("SparkConf").setMaster("local[1]")
        val sc = new SparkContext(conf)
        val rdd: RDD[Int] = sc.makeRDD(1 to 8, 2)
        val rddmap = rdd.mapPartitionsWithIndex((index, item) => {
          item.map((index, _))
        })
        rddmap.collect().foreach(println)
        sc.stop()
      }
    }
    
    

    执行的结果:

    (0,1)
    (0,2)
    (1,3)
    (1,4)
    

    4.Transform 算子之 -- flatMap

    将Rdd 中的每一个元素通过应用的函数, 转化为新的元素,并封装到新RDD 中。

    package com.wudl.core
    import org.apache.spark.{SparkConf, SparkContext}
    /**
     * 将Rdd 中的每一个元素通过应用的函数, 转化为新的元素,并封装到新RDD 中
     */
    object SparkFlatMap {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setMaster("local[*]").setAppName("SparkFlatMap")
        val sc = new SparkContext(conf)
        //  创建一个Rdd
        val rdd = sc.makeRDD(List(List(1, 2), List(3, 4), List(5, 6)))
        rdd.flatMap(list => list).collect().foreach(println)
        sc.stop()
      }
    }
    

    相关文章

      网友评论

          本文标题:Spark -Core 之---Transformation

          本文链接:https://www.haomeiwen.com/subject/thfmwktx.html