美文网首页
Spark算子、实例代码

Spark算子、实例代码

作者: AceCream佳 | 来源:发表于2019-11-01 18:57 被阅读0次

    从这里学习的,讲的真好~ https://www.bilibili.com/video/av62992342/?p=41

    单数据

    MAP

    1.作用

    返回一个新RDD,该RDD由每个输入元素经过func函数转换后组成。

    2.需求

    创建一个1-10数组的RDD,将所有元素*2形成新的RDD

    3.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("mapTest")
    val sc = new SparkContext(conf)
    val inclusive = 1 to 10
    // 算子
    val listRDD :RDD[Int] = sc.makeRDD(inclusive)
    // 做了映射转换后的RDD
    val mapRDD = listRDD.map( _ * 2 )
    //
    mapRDD.collect().foreach(println)
    
    map.png

    mapPartitions(func)案例

    1.作用

    类似于map,但是独立在RDD的每个分片(区)上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T]=>Iterator[U]。假设有N个元素,有M个分区,那么map函数将被调用N次,mapPartitions函数被调用M次,一个函数一次处理所有分区。

    2.需求

    创建一个RDD,使每个元素*2 组成新RDD

    3.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("mapPartitionsTest")
        val sc = new SparkContext(conf)
        val inclusive = 1 to 10
        // 算子
        val listRDD: RDD[Int] = sc.makeRDD(inclusive)
        // 做了映射转换后的RDD
        // mapPartitions可以对一个RDD中所有分区进行遍历
        val mapPartitions = listRDD.mapPartitions(datas => {
          datas.map(data => data * 2)
        })
        mapPartitions.collect().foreach(println)
    

    mapPartitions效率优于map算子,减少了发送到执行器执行交互的次数。
    但是要注意!发送的是整个分区数据,可能会比较大。。。这里回收机制和GC里可达性分析类似,所以处理完的分区不会立即释放,内存占用高,可能会出现内存溢出OOM。


    mapPartitions.png

    mapPartitionsWithIndex(func)案例

    1.作用

    类似mapPartitions,但是func带一个整数参数,表示分片的索引值。因此在类型为T的RDD上运行,func的函数类型必须是(Int,Interator[T])=>Iterator[U];

    2.需求

    创建一个RDD,使每个元素跟所在分区形成一个元组组成一个新的RDD

    3.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("mapPartitionsWithIndex")
    val sc = new SparkContext(conf)
    
    val listRDD = sc.makeRDD(1 to 10,2)
    
    val indexRdd = listRDD.mapPartitionsWithIndex {
      case (num, datas) => {
        datas.map((_, "分区号:" + num))
      }
    }
    
    indexRdd.collect().foreach(println)
    

    FlatMap(func)案例

    1.作用

    类似于map,但是每个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而非单一元素)

    2.需求

    创建一个元素1-5的RDD,运用FlatMap创建一个新RDD,新RDD为原RDD每个元素2倍

    3.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("mapPartitionsWithIndex")
    val sc = new SparkContext(conf)
    
    val listRDD = sc.makeRDD(Array(List(1,2),List(3,4)))
    
    // flatMap
    // 1,2,3,4
    val flatMapRDD = listRDD.flatMap(datas=>datas)
    
    val value = flatMapRDD.map(_*2)
    
    value.collect().foreach(println)
    

    glom

    1.作用

    将每一个分区形成一个数组,组成新的RDD类型时RDD[Array[T]]

    2.例子

    val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
    val sc = new SparkContext(conf)
    
    val listRDD: RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,7,8),3)
    // 将我们一个分区的数据放到一个数组中
    val glomRDD: RDD[Array[Int]] = listRDD.glom()
    
    glomRDD.collect().foreach(array=>{
      println(array.mkString(","))
    })
    

    groupBy

    1.作用

    分组,按照传入函数的返回值进行分组。将相同的key 对应值放入一个迭代器中。

    2.例子

    val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
    val sc = new SparkContext(conf)
    
    val listRDD: RDD[Int] = sc.makeRDD(List(1,2,3,4))
    
    // 生成数据 按照指定规则进行分组
    // 分组后的数据形成了对偶元组(k-V),k表示分组的key,value表示分组数据集合
    val groupByRDD: RDD[(Int, Iterable[Int])] = listRDD.groupBy(i=>i%2)
    
    groupByRDD.collect().foreach(println)
    

    filter

    1.作用

    过滤,返回新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成。

    2.需求

    创建一个由字符组成的RDD,过滤出一个新的包含“xiao”子串的RDD

    3.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
    val sc = new SparkContext(conf)
    
    val listRDD: RDD[String] = sc.makeRDD(Array("xiaoming","xiaojiang","xiaole","dazhi"))
    
    val filterRDD: RDD[String] = listRDD.filter(x => {
      x.contains("xiao")
    })
    filterRDD
    
    filterRDD.collect().foreach(println)
    

    Sample(withReplacement,fraction,seed)案例

    1.作用

    抽样,以指定随机种子随机抽样出数量为fraction的数据,withReplacement是表示抽出的数据是否放回,true为放回,false无放回。seed用于指定随机数生成器种子。

    1.1 使用场景

    大数量里面进行采样,进行大致分析。1亿数据,不需要全部遍历,采集不同部分数据进行多次分析。最后去平均值。

    2.需求

    创建一个RDD(1-10),从中选择返回和不放回抽样

    3.解

     val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
        val sc = new SparkContext(conf)
    
        // 从指定的数据集合中抽样处理,根据不同算法进行抽样
        val listRDD: RDD[Int] = sc.makeRDD(1 to 10)
        // 不放回
        val sampleRDD: RDD[Int] = listRDD.sample(false,0.4,System.currentTimeMillis())
        // 放回
    //  val sampleRDD: RDD[Int] = listRDD.sample(true,4,System.currentTimeMillis())
    
       sampleRDD.collect().foreach(println)
    

    Distinct([numTasks])

    1.作用

    对源RDD进行去重,返回一个新的RDD。默认情况下,只有8个并行任务来操作,但是可以传入一个可选的numTasks参数来改变。

    2.需求

    创建一个RDD,使用distinct()去重

    3.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
        val sc = new SparkContext(conf)
    
        val listRDD: RDD[Int] = sc.makeRDD(List(1,2,1,5,2,9,6,1))
    
    //    val distinctRDD: RDD[Int] = listRDD.distinct()
        
        // 结果存在两个分区中
        // 使用distinct算子对数据去重,但是因为去重后会导致数据减少,所以可以改变默认分区数量
        val distinctRDD: RDD[Int] = listRDD.distinct(2)
    
        // 我们发现顺序被打乱了
    //    distinctRDD.collect().foreach(println)
        distinctRDD.saveAsTextFile("output")
    

    可以看到output目录下,两个文件,证明两个分区。

    Coalesce(numPartitions)

    1.作用

    缩减分区数,用于大数据集过滤后,提高小数据集的执行效率。

    2.需求

    创建一个4分区的RDD,对其缩减分区。

    3.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
    val sc = new SparkContext(conf)
    
    val listRDD: RDD[Int] = sc.makeRDD(1 to 16,4)
    
    println("缩减分区前 = "+listRDD.partitions.size)
    
    val coalesceRDD: RDD[Int] = listRDD.coalesce(3)
    
    println("缩减分区前 = "+coalesceRDD.partitions.size)
    

    所谓的缩减,其实是合并了。

    Repartition(numerPartitions)

    1.作用

    根据分区数,重新通过网络随机洗牌所有数据。

    coalesce和repartition的区别

    coalesce可以选择shuffle,但是reparation会调用coalesce而且shuffle一定是true

    sortBy(func,[ascending],[numTasks])

    1.作用

    使用func先对数据进行处理,按照处理后的数据比较结果排序,默认正序。
    ascending的参数 true表示升序,false表示降序。

    2.需求

    创建一个RDD,按照不同需求排序

    3.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
    val sc = new SparkContext(conf)
    
    val listRDD: RDD[Int] = sc.makeRDD(List(2,1,3,4))
    
    // 按照自身大小排序
    val sortRDD: RDD[Int] = listRDD.sortBy(x=>x)
    sortRDD.collect().foreach(println)
    // 按照与3余数大小排序
    val sort3RDD: RDD[Int] = listRDD.sortBy(x=>x%3)
    sort3RDD.collect().foreach(println)
    

    多数据交互

    union(otherDataset)

    1.作用

    对源RDD和参数RDD求并集,然后返回新RDD

    2.需求

    创建两个RDD

    3.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
    val sc = new SparkContext(conf)
    
    val rdd1: RDD[Int] = sc.makeRDD(5 to 10)
    val rdd2: RDD[Int] = sc.makeRDD(1 to 5)
    
    val unionRDD: RDD[Int] = rdd1.union(rdd2)
    
    unionRDD.collect().foreach(println)
    

    subtract(otherDataset)

    1.作用

    去除两个RDD中相同的元素,留下不同的元素

    2.案例

    val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
    val sc = new SparkContext(conf)
    
    val rdd1: RDD[Int] = sc.makeRDD(3 to 8)
    val rdd2: RDD[Int] = sc.makeRDD(1 to 5)
    
    val subtractRDD: RDD[Int] = rdd1.subtract(rdd2)
    
    subtractRDD.collect().foreach(println)
    

    intersection(otherDataset)

    1.作用

    两个RDD取交集,就不举例了嗷!!!

    cartesian(otherDataset)

    1.作用

    笛卡尔积(尽量别用!!!)不演示。

    zip(otherDataset)

    1.作用

    将两个RDD组成Key-Value形式的RDD,这里默认两个RDD的partition数量和元素都相同。否则会抛异常。

    2.需求

    创建两个RDD,把他俩组合到一起形成k-v RDD

    3.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
    val sc = new SparkContext(conf)
    
    val rdd1: RDD[Int] = sc.makeRDD(Array(1,2,3),3)
    val rdd2: RDD[String] = sc.makeRDD(Array("a","b","c"),3)
    
     val zipRDD: RDD[(Int, String)] = rdd1.zip(rdd2)
    
    zipRDD.collect().foreach(println)
    

    key-value类型

    PartitionBy

    1.作用

    对pairRDD分区操作,,如果原partitionRDD和现有partitionRDD是一致的话就不进行分区,否则会生成shuffleRDD,即会产生shuffle过程。

    2.需求

    创建一个4个分区的RDD,对其重新分区。

    3.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
    val sc = new SparkContext(conf)
    
    // 从指定的数据集合中抽样处理,根据不同算法进行抽样
    val rdd: RDD[(Int, String)] = sc.makeRDD(Array((1, "aaa"), (2, "bbb"), (3, "ccc"), (4, "ddd")), 4)
    
    println(rdd.partitions.size)
    
    val rdd2: RDD[(Int, String)] = rdd.partitionBy(new org.apache.spark.HashPartitioner(2))
    
    println(rdd2.partitions.size)
    

    groupByKey

    1.作用

    对每个key进行操作,生成一个sequence

    2.需求

    创建一个pairRDD,将相同的key对应值聚合到一个sequence中,并计算相同key对应值的相加结果。

    3.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
    val sc = new SparkContext(conf)
    
    val words = Array("one","two","three","two","three","three")
    
    val wordPairRDD: RDD[(String, Int)] = sc.parallelize(words).map(word=>(word,1))
    
    val group: RDD[(String, Iterable[Int])] = wordPairRDD.groupByKey()
    
    val result: RDD[(String, Int)] = group.map(t=>(t._1,t._2.sum))
    
    result.collect().foreach(println)
    

    reduceByKey(func[numTasks])

    1.作用

    在一个kv的RDD上调用,返回一个kv的RDD,使用指定的reduce函数,将相同的值聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。

    2.需求

    创建一个pairRDD,计算相同key对应值的相加结果。

    3.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
    val sc = new SparkContext(conf)
    
    val words = Array("one","two","three","two","three","three")
    
    val wordPairRDD: RDD[(String, Int)] = sc.parallelize(words).map(word=>(word,1))
    
    val result: RDD[(String, Int)] = wordPairRDD.reduceByKey(_+_)
    
    result.collect().foreach(println)
    
    reduceByKey和groupByKey的区别

    两个都是聚合。
    1.reduceByKey:按照key进行聚合,在shuffle之前有combine操作,预聚合。返回结果是RDD[k,v]
    2.groupByKey:按照key进行分组,直接进行shuffle
    建议使用reduceByKey

    aggregateByKey

    1.作用

    kv对的RDD中,按key将value进行分组合并,合并时,每个value和初始值作为seq函数的参数,进行计算。返回的结果作为一个新的kv对,然后再将结果按照key进行合并,最后将每个分组的value传递给combine函数去计算。将key与计算结果作为一个新的kv对输出。
    参数
    zeroValue:给每个分区的key 一个初始值
    seqOp:函数用于在每个分区中用初始值逐步迭代value
    combOp:函数用于合并每个分区中的结果

    2.需求

    创建一个pairRDD,取出每个分区相同key对应值的最大值,然后相加。

    3.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
    val sc = new SparkContext(conf)
    
    val rdd: RDD[(String, Int)] = sc.parallelize(List(("a",3),("a",2),("c",4),("b",3),("c",6),("c",8)),2)
    
    val agg: RDD[(String, Int)] = rdd.aggregateByKey(0)(math.max(_,_),_+_)
    
    agg.collect().foreach(println)
    

    foldByKey

    1.作用

    aggregateByKey的简化操作,seqop和combop相同

    sortByKey

    1.作用

    在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD。

    2.需求

    创建一个pairRDD,按照key的正序和倒序进行排序

    3.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
    val sc = new SparkContext(conf)
    
    val rdd: RDD[(Int, String)] = sc.parallelize(Array((3,"aa"),(6,"cc"),(2,"bb"),(1,"dd")))
    
    // 正序
    val sortRDD: RDD[(Int, String)] = rdd.sortByKey(true)
    
    sortRDD.collect().foreach(println)
    

    mapValue

    1.作用

    针对k-v形式的类型,只对v进行操作

    2.需求

    创建一个pairRDD,并将value添加字符串"|||"

    3.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
    val sc = new SparkContext(conf)
    
    val rdd3: RDD[(Int, String)] = sc.parallelize(Array((1,"a"),(1,"d"),(2,"b"),(3,"c")))
    
    val kvRDD: RDD[(Int, String)] = rdd3.mapValues(_+"|||")
    
    kvRDD.collect().foreach(println)
    

    join

    1.作用

    在类型k-v和k-w的RDD上调用,返回一个相同key对应的所有元素对在一起的(k,(v,w))的RDD

    2.需求

    创建两个pairRDD,将key相同的数据聚合到一个元组。

    3.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
    val sc = new SparkContext(conf)
    
    val rdd: RDD[(Int, Int)] = sc.parallelize(Array((1,4),(2,5),(3,6)))
    val rdd1: RDD[(Int, String)] = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
    
    val joinRDD: RDD[(Int, (Int, String))] = rdd.join(rdd1)
    
    joinRDD.collect().foreach(println)
    

    cogroup

    1.作用

    在类型k-v和k-w的RDD上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的RDD

    2.需求

    创建两个pairRDD,key相同的数据聚合到一个迭代器里面。

    3.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
    val sc = new SparkContext(conf)
    
    val rdd1: RDD[(Int, Int)] = sc.parallelize(Array((1,4),(2,5),(3,6),(4,7)))
    val rdd: RDD[(Int, String)] = sc.parallelize(Array((1,"a"),(2,"b"),(3,"c")))
    
    val cogroupRDD: RDD[(Int, (Iterable[String], Iterable[Int]))] = rdd.cogroup(rdd1)
    
    cogroupRDD.collect().foreach(println)
    

    举栗子!!

    1.数据结构

    时间戳,省份,城市,用户,广告,中间字段用空格分隔
    agent.log

    1516609143867 6 7 64 16
    1516609143869 9 4 75 18
    1516609143869 1 7 87 12

    2.需求

    统计出每个省份广告被点击次数的top3

    3.思路分析

    思路

    4.实现

    package com.george.bigdata.agent
    
    import org.apache.spark.rdd.RDD
    import org.apache.spark.{SparkConf, SparkContext}
    
    /**
     * Created with IntelliJ IDEA.
     * Description:
     * User: weicaijia
     * Date: 2019/11/1 17:41
     * Time: 14:15
     */
    /**
     * 统计出每一个省份广告被点击次数的TOP3
     * 数据结构:时间戳,省份,城市,用户,广告,中间字段使用空格分割
     */
    object Agent {
    
      def main(args: Array[String]): Unit = {
    
        val conf = new SparkConf().setMaster("local[*]").setAppName("agent")
        val sc = new SparkContext(conf)
    
        //读取文件到RDD
        val fileRDD = sc.textFile("input/data/agent/agent.log")
        //    查看
        //    fileRDD.collect().foreach(println)
    
        // 1.map 操作
        val provinceAdMap: RDD[((String, String), Int)] = fileRDD.map(datas => {
          val fields: Array[String] = datas.split(" ")
          //返回格式: ((省份-广告),1)
          ((fields(1), fields(4)), 1)
        })
        //    查看
        //    proveinceAdMap.collect().foreach(println)
    
        // 2.reduceByKey 操作 返回格式 ((省份-广告),sum)
        val provinceAdMapSum: RDD[((String, String), Int)] = provinceAdMap.reduceByKey(_ + _)
        //    查看
        //    provinceAdMapSum.collect().foreach(println)
    
        // 3.map 操作把格式转换一下  ((省份-广告),sum) ====> (省份,(广告,sum))
        val provinceToAdSum: RDD[(String, (String, Int))] = provinceAdMapSum.map(datas => {
          // 返回 格式
          (datas._1._1, (datas._1._2, datas._2))
        })
        //    查看
        //    provinceToAdSum.collect().foreach(println)
    
        // 4.groupByKey 操作 返回格式 ((省份1,List((广告1,sum1),(广告2,sum2)...)),(省份2,List((广告1,sum1),(广告2,sum2)...)))
        val provinceAdGroup: RDD[(String, Iterable[(String, Int)])] = provinceToAdSum.groupByKey()
        //    查看
        //    provinceAdGroup.collect().foreach(println)
    
    
        // 5. 排名并取3条
        // 利用 mapValues 是只对value进行操作的特性
        val result: RDD[(String, List[(String, Int)])] = provinceAdGroup.mapValues(datas => {
          val list: List[(String, Int)] = datas.toList
          list.sortWith((x, y) => {
            x._2 > y._2
          }).take(3)
        })
    
        result.collect().foreach(println)
        sc.stop()
    
      }
    
    }
    

    具体的日志文件我就不提供了,可以去视频下面拿别人Git存的。我没有传到我的Git
    以上代码和笔记都是手敲,结果也都对照过。
    视频没写这一集。。。

    Action

    reduce(func)

    1.作用

    通过func函数聚集RDD中的所有元素,先聚合分区内数据,再聚合分区间数据。

    2.需求

    创建一个RDD,将所有元素聚合得到结果

    3.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
    val sc = new SparkContext(conf)
    
    val rdd1: RDD[Int] = sc.makeRDD(1 to 20, 2)
    // 聚合
    val rdd1Result: Int = rdd1.reduce(_ + _)
    println(rdd1Result)
    
    val rdd2: RDD[(String, Int)] = sc.parallelize(Array(("a", 1), ("a", 3), ("c", 3), ("d", 5)))
    // 聚合
    val rdd2Result: (String, Int) = rdd2.reduce((x, y) => {
      (x._1 + y._1, x._2 + y._2)
    })
    println(rdd2Result)
    

    collect()

    1.作用

    驱动程序中,以数组的形式返回数据集的所有元素。

    count()

    1.作用

    返回RDD中元素个数

    first()

    1.作用

    返回RDD中第一个元素

    take(n)

    1.作用

    返回一个由RDD的前n个元素组成的数组。

    takeOrdered(n)

    1.作用

    返回RDD排序后,前n个元素组成的数组

    aggregate

    1.参数

    (zeroValue:U)(seqOp:(U,T)=>U,combOp(U,U)=>U)

    2.作用

    将每个分区里面的元素,通过seqOp和初始值进行聚合。然后用combine函数将每个分区的结果和初始值(zeroValue)进行combine操作,这个函数最终返回的类型不需要和RDD中元素类型一致。

    3.需求

    创建RDD,所有元素相加得到结果。
    4.解

    val conf = new SparkConf().setMaster("local[*]").setAppName("glom")
    val sc = new SparkContext(conf)
    
    val rdd1: RDD[Int] = sc.makeRDD(1 to 10, 2)
    // 初始值为0  分区内相加   分区间相加
    val result: Int = rdd1.aggregate(0)(_+_,_+_)
    println(result)
    

    fold(num)(func)

    1.作用

    折叠操作,aggregate的简化版。但是!返回值必须与rdd的数据类型相同。

    saveAsTextFile

    1.作用

    将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统。对于每个元素,Spark将会调用toString方法,转换为文件中的文本。

    saveAsSequenceFile

    1.作用

    将数据集中的元素以Hadoop sequencefile的格式保存到指定目录下,可以使HDFS或其他Hadoop支持的文件系统。

    saveAsObjectFile

    1.作用

    将RDD中的元素序列化成对象,存到文件

    countByKey

    1.作用

    针对kv类型RDD,返回一个(K,Int)的map,表示每个key对应的元素个数。

    foreach(func)

    1.作用

    在数据集的每个元素上,运行函数func进行更新

    相关文章

      网友评论

          本文标题:Spark算子、实例代码

          本文链接:https://www.haomeiwen.com/subject/fbjqbctx.html