美文网首页
Spark常用Transformations算子(一)

Spark常用Transformations算子(一)

作者: piziyang12138 | 来源:发表于2018-10-19 09:43 被阅读0次

    介绍以下Transformations算子:
    map
    flatMap
    mapPartitions
    mapPartitionsWithIndex
    filter
    sample
    union
    intersection
    sortBy
    sortByKey
    groupByKey
    reduceByKey
    distinct
    coalesce
    repartition


    (1) map、mapPartitions、mapPartitionsWithIndex

    1. map以一条记录为单位进行操作
    val arr = Array("Tom","Bob","Tony","Jerry")
    
    //把4条数据分到两个分区中
    val rdd = sc.parallelize(arr,2)
    
    /*
     * 模拟把RDD中的元素写入数据库的过程
     */
    rdd.map(x => {
      println("创建数据库连接...")
      println("写入数据库...")
      println("关闭数据库连接...")
      println()
    }).count()
    
    结果:
    
    创建数据库连接...
    写入数据库...
    关闭数据库连接...
    
    创建数据库连接...
    写入数据库...
    关闭数据库连接...
    
    创建数据库连接...
    写入数据库...
    关闭数据库连接...
    
    创建数据库连接...
    写入数据库...
    关闭数据库连接...
    
    
    1. mapPartitions以分区为单位进行操作
    val arr = Array("Tom","Bob","Tony","Jerry")
    //把4条数据分到两个分区中
    val rdd = sc.parallelize(arr,2)
    
    /*
    * 将RDD中的数据写入到数据库中,绝大部分使用mapPartitions算子来实现
    */
    rdd.mapPartitions(x => {
      println("创建数据库连接...")
      val list = new ListBuffer[String]()
      while(x.hasNext) {
        // 模拟写入数据库
        list += x.next() + "写入数据库"
      }
      // 模拟执行SQL语句,批量插入
      list.iterator
    }).foreach(println)
    
    结果:
    
    创建数据库
    Tom:写入数据库
    Bob:写入数据库 
    创建数据库
    Tony:写入数据库
    Jerry:写入数据库
    
    
    1. mapPartitionsWithIndex
    val dataArr = Array("Tom01","Tom02","Tom03"
                      ,"Tom04","Tom05","Tom06"
                      ,"Tom07","Tom08","Tom09"
                      ,"Tom10","Tom11","Tom12")
    val rdd = sc.parallelize(dataArr, 3);
    val result = rdd.mapPartitionsWithIndex((index,x) => {
        val list = ListBuffer[String]()
        while (x.hasNext) {
          list += "partition:"+ index + " content:" + x.next
        }
        list.iterator
    })
    println("分区数量:" + result.partitions.size)
    val resultArr = result.collect()
    for(x <- resultArr){
      println(x)
    }
    
    结果:
    
    分区数量:3
    partition:0 content:Tom01
    partition:0 content:Tom02
    partition:0 content:Tom03
    partition:0 content:Tom04
    partition:1 content:Tom05
    partition:1 content:Tom06
    partition:1 content:Tom07
    partition:1 content:Tom08
    partition:2 content:Tom09
    partition:2 content:Tom10
    partition:2 content:Tom11
    partition:2 content:Tom12
    
    

    (2) flatMap

    val conf = new SparkConf().setAppName("FlatMapTest").setMaster("local")
    val sc = new SparkContext(conf)
    val data = Array("hello hadoop","hello hive", "hello spark")
    val rdd = sc.makeRDD(data)
    
    rdd.flatMap(_.split(" ")).foreach(println)
    /*
    结果:
    hello
    hadoop
    hello
    hive
    hello
    spark
    */
    rdd.map(_.split(" ")).foreach(println)
    /*
    [Ljava.lang.String;@3c986196
    [Ljava.lang.String;@113116a6
    [Ljava.lang.String;@542d75a6
    */
    
    

    map 和 flatMap的区别
    map:输入一条数据,返回一条数据
    flatMap:输入一条数据,可能返回多条数据

    image.png

    以下scala程序可以说明map函数、flatMap函数和flatten函数的区别和联系:

    scala> val arr = Array("hello hadoop","hello hive","hello spark")
    arr: Array[String] = Array(hello hadoop, hello hive, hello spark)
    
    scala> val map = arr.map(_.split(" "))
    map: Array[Array[String]] = Array(Array(hello, hadoop), Array(hello, hive), Array(hello, spark))
    
    scala> map.flatten
    res1: Array[String] = Array(hello, hadoop, hello, hive, hello, spark)
    
    scala> arr.flatMap(_.split(" "))
    res2: Array[String] = Array(hello, hadoop, hello, hive, hello, spark)
    
    

    (3) filter :过滤

    val rdd = sc.makeRDD(Array("hello","hello","hello","world"))
    // filter(boolean) 返回的是判断条件为true的记录
    rdd.filter(!_.contains("hello")).foreach(println)
    
    结果:
    world
    
    

    (4) sample :随机抽样

    sample(withReplacement: Boolean, fraction: Double, seed: Long)  
    
    withReplacement : 是否是放回式抽样  
        true代表如果抽中A元素,之后还可以抽取A元素
        false代表如果抽中了A元素,之后都不在抽取A元素  
    fraction : 抽样的比例  
    seed : 抽样算法的随机数种子,不同的数值代表不同的抽样规则,可以手动设置,默认为long的随机数
    
    
    val rdd = sc.makeRDD(Array(
      "hello1","hello2","hello3","hello4","hello5","hello6",
      "world1","world2","world3","world4"
    ))
    rdd.sample(false, 0.3).foreach(println)
    
    结果:理论上会随机抽取30%的数据,但是在数据量不大的时候,不一定很准确
    
    hello1
    hello3
    world3
    
    

    (5) union:把两个RDD进行逻辑上的合并

    val rdd1 =sc.makeRDD(1 to 3)
    val rdd2 = sc.parallelize(4 until 6)
    rdd1.union(rdd2).foreach {println}
    
    结果:
    
    1
    2
    3
    4
    5
    
    

    (6) intersection:求两个RDD的交集

    val rdd1 =sc.makeRDD(1 to 3)
    val rdd2 = sc.parallelize(2 to 5)
    
    rdd1.intersection(rdd2).foreach(println)
    
    结果:
    2
    3
    
    

    (7) sortBy和sortByKey

    1. sortBy:手动指定排序的字段
    val arr = Array(
            Tuple3(190,100,"Jed"),
            Tuple3(100,202,"Tom"),
            Tuple3(90,111,"Tony")
        )
    val rdd = sc.parallelize(arr)
    rdd.sortBy(_._1).foreach(println)
    /* 按第一个元素排序
       (90,111,Tony)
       (100,202,Tom)
       (190,100,Jed)
     */
    
    rdd.sortBy(_._2, false).foreach(println)
    /* 按照第二个元素排序,降序
       (100,202,Tom)
       (90,111,Tony)
       (190,100,Jed)
     */
    
    rdd.sortBy(_._3).foreach(println)
    /* 按照第三个元素排序
       (190,100,Jed)
       (100,202,Tom)
       (90,111,Tony)
     */
    
    }
    
    
    1. sortByKey:按key进行排序
    val rdd = sc.makeRDD(Array(
          (5,"Tom"),(10,"Jed"),(3,"Tony"),(2,"Jack")
        ))
    rdd.sortByKey().foreach(println)
    
    结果:
    
    (2,Jack)
    (3,Tony)
    (5,Tom)
    (10,Jed)
    
    

    (8) groupByKey和reduceByKey

    val rdd = sc.makeRDD(Array(
          ("Tom",1),("Tom",1),("Tony",1),("Tony",1)
        ))
    
    rdd.groupByKey().foreach(println)
    /*
    (Tom,CompactBuffer(1, 1))
    (Tony,CompactBuffer(1, 1))
    */
    
    rdd.reduceByKey(_+_).foreach(println)
    /*
    (Tom,2)
    (Tony,2)
    */
    
    
    image.png

    (9) distinct:去掉重复数据

    val rdd = sc.makeRDD(Array(
          "hello",
          "hello",
          "hello",
          "world"
        ))
    
    rdd.distinct().foreach {println}
    /*
    hello
    world
    */
    
    // dinstinct = map + reduceByKey + map
    val distinctRDD = rdd
      .map {(_,1)}
      .reduceByKey(_+_)
      .map(_._1)
    distinctRDD.foreach {println}
    /*
    hello
    world
    */
    
    
    image.png

    (10) coalesce、repartition:改变RDD分区数

    1. coalesce
    /*
     * false:不产生shuffle
     * true:产生shuffle
     * 如果重分区的数量大于原来的分区数量,必须设置为true,否则分区数不变
     * 增加分区会把原来的分区中的数据随机分配给设置的分区中
     * 默认为false
     */
    object CoalesceTest {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("MapTest").setMaster("local")
        val sc = new SparkContext(conf)
        val arr = Array(
          "partition:0 content:Tom01",
          "partition:0 content:Tom02",
          "partition:0 content:Tom03",
          "partition:0 content:Tom04",
          "partition:1 content:Tom05",
          "partition:1 content:Tom06",
          "partition:1 content:Tom07",
          "partition:1 content:Tom08",
          "partition:2 content:Tom09",
          "partition:2 content:Tom10",
          "partition:2 content:Tom11",
          "partition:2 content:Tom12")
    
        val rdd = sc.parallelize(arr, 3);
    
        val coalesceRdd = rdd.coalesce(6,true)
    
        val results = coalesceRdd.mapPartitionsWithIndex((index,x) => {
          val list = ListBuffer[String]()
          while (x.hasNext) {
            list += "partition:"+ index + " content:[" + x.next + "]"
          }
          list.iterator
        })
    
        println("分区数量:" + results.partitions.size)
        results.foreach(println)
        /*
        分区数量:6
        partition:0 content:[partition:1 content:Tom07]
        partition:0 content:[partition:2 content:Tom10]
        partition:1 content:[partition:0 content:Tom01]
        partition:1 content:[partition:1 content:Tom08]
        partition:1 content:[partition:2 content:Tom11]
        partition:2 content:[partition:0 content:Tom02]
        partition:2 content:[partition:2 content:Tom12]
        partition:3 content:[partition:0 content:Tom03]
        partition:4 content:[partition:0 content:Tom04]
        partition:4 content:[partition:1 content:Tom05]
        partition:5 content:[partition:1 content:Tom06]
        partition:5 content:[partition:2 content:Tom09]
        */
    
        // 增加分区肯定会发生shuffle,如果设置为false,结果是不变的
        val coalesceRdd2 = rdd.coalesce(6,false)
        val results2 = coalesceRdd2.mapPartitionsWithIndex((index,x) => {
          val list = ListBuffer[String]()
          while (x.hasNext) {
            list += "partition:"+ index + " content:[" + x.next + "]"
          }
          list.iterator
        })
    
        println("分区数量:" + results2.partitions.size)
        results2.foreach(println)
        /*
        分区数量:3
        partition:0 content:[partition:0 content:Tom01]
        partition:0 content:[partition:0 content:Tom02]
        partition:0 content:[partition:0 content:Tom03]
        partition:0 content:[partition:0 content:Tom04]
        partition:1 content:[partition:1 content:Tom05]
        partition:1 content:[partition:1 content:Tom06]
        partition:1 content:[partition:1 content:Tom07]
        partition:1 content:[partition:1 content:Tom08]
        partition:2 content:[partition:2 content:Tom09]
        partition:2 content:[partition:2 content:Tom10]
        partition:2 content:[partition:2 content:Tom11]
        partition:2 content:[partition:2 content:Tom12]
        */
    
        val coalesceRdd3 = rdd.coalesce(2,true)
        val results3 = coalesceRdd3.mapPartitionsWithIndex((index,x) => {
          val list = ListBuffer[String]()
          while (x.hasNext) {
            list += "partition:"+ index + " content:[" + x.next + "]"
          }
          list.iterator
        })
    
        println("分区数量:" + results3.partitions.size)
        results3.foreach(println)
        /*
        分区数量:2
        partition:0 content:[partition:0 content:Tom01]
        partition:0 content:[partition:0 content:Tom03]
        partition:0 content:[partition:1 content:Tom05]
        partition:0 content:[partition:1 content:Tom07]
        partition:0 content:[partition:2 content:Tom09]
        partition:0 content:[partition:2 content:Tom11]
        partition:1 content:[partition:0 content:Tom02]
        partition:1 content:[partition:0 content:Tom04]
        partition:1 content:[partition:1 content:Tom06]
        partition:1 content:[partition:1 content:Tom08]
        partition:1 content:[partition:2 content:Tom10]
        partition:1 content:[partition:2 content:Tom12]
        */
    
        val coalesceRdd4 = rdd.coalesce(2,false)
        val results4 = coalesceRdd4.mapPartitionsWithIndex((index,x) => {
          val list = ListBuffer[String]()
          while (x.hasNext) {
            list += "partition:"+ index + " content:[" + x.next + "]"
          }
          list.iterator
        })
    
        println("分区数量:" + results4.partitions.size)
        results4.foreach(println)
        /*
        分区数量:2
        partition:0 content:[partition:0 content:Tom01]
        partition:0 content:[partition:0 content:Tom02]
        partition:0 content:[partition:0 content:Tom03]
        partition:0 content:[partition:0 content:Tom04]
        partition:1 content:[partition:1 content:Tom05]
        partition:1 content:[partition:1 content:Tom06]
        partition:1 content:[partition:1 content:Tom07]
        partition:1 content:[partition:1 content:Tom08]
        partition:1 content:[partition:2 content:Tom09]
        partition:1 content:[partition:2 content:Tom10]
        partition:1 content:[partition:2 content:Tom11]
        partition:1 content:[partition:2 content:Tom12]
        */
      }
    
    }
    
    

    以下图片说明这些情况:

    image.png
    1. repartition
    repartition(int n) = coalesce(int n, true)
    
    
    1. partitionBy:自定义分区器,重新分区
    package com.aura.transformations
    
    import org.apache.spark.{Partitioner, SparkConf, SparkContext}
    
    /**
      * Author: Jed
      * Description: 自定义分区规则
      * Date: Create in 2018/1/12
      */
    class MyPartition extends Partitioner {
    
      // 分区数量为2
      override def numPartitions: Int = 2
    
      // 自定义分区规则
      override def getPartition(key: Any): Int = {
        if(key.hashCode() % 2 == 0) {
          0
        }else {
          1
        }
      }
    }
    
    object PartitionByTest {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("PartitionByTest").setMaster("local")
        val sc = new SparkContext(conf)
    
        val arr = Array((1,1),(2,2),(3,3),(4,4),(5,5),(6,6),(7,7),(8,8),(9,9))
        val rdd = sc.makeRDD(arr,3)
        println("分区数量:" + rdd.partitions.length)
        rdd.foreachPartition(x => {
          println("*******")
          while(x.hasNext) {
            println(x.next())
          }
        })
        /*
        分区数量:3
        *******
        (1,1)
        (2,2)
        (3,3)
        *******
        (4,4)
        (5,5)
        (6,6)
        *******
        (7,7)
        (8,8)
        (9,9)
         */
    
        val partitionRDD = rdd.partitionBy(new MyPartition)
        println("分区数量:" + partitionRDD.partitions.length)
        partitionRDD.foreachPartition(x => {
          println("*******")
          while(x.hasNext) {
            println(x.next())
          }
        })
        /*
        分区数量:2
        *******
        (2,2)
        (4,4)
        (6,6)
        (8,8)
        *******
        (1,1)
        (3,3)
        (5,5)
        (7,7)
        (9,9)
         */
      }
    
    }
    
    

    相关文章

      网友评论

          本文标题:Spark常用Transformations算子(一)

          本文链接:https://www.haomeiwen.com/subject/sntyzftx.html