美文网首页
Spark-算子-transerformation

Spark-算子-transerformation

作者: Demons_LLL | 来源:发表于2020-12-04 11:00 被阅读0次

Map[一对一]

  @Test
  def mapTest: Unit ={
    val array: Array[Int] = sc.parallelize(Seq(1, 2, 3)).map(item => item + 1).collect()
    array.foreach(e => println(e))
  }

flatMap[一对多]

  @Test
  def flatMapTest: Unit = {
    val array: Array[String] = sc.parallelize(Seq("Hello X", "Hello Y", "Hello Z"))
      .flatMap(item => item.split(" ")).collect()
    array.foreach(e => println(e))
  }

mapPartitions

  @Test
  def mapPartitions2(): Unit = {
    sc.parallelize(Seq(1, 2, 3, 4, 5, 6), 2)
      .mapPartitions(iter => {
        iter.map(item => item * 10)
      })
      .collect()
      .foreach(item => println(item))
  }

mapPartitionsWithIndex

  @Test
  def mapPartitionsWithIndex(): Unit = {
    sc.parallelize(Seq(1, 2, 3, 4, 5, 6), 2)
      .mapPartitionsWithIndex((index, iter) => {
        println("index:" + index)
        iter.foreach(item => println(item))
        iter
      })
      .collect()
  }

reduceByKey

  @Test
  def reduceByKeyTest: Unit = {
    val result: Array[(String, Int)] = sc.parallelize(Seq("Hello X", "Hello Y", "Hello Z"))
      .flatMap(item => item.split(" "))
      .map(item => (item, 1))
      .reduceByKey((curr, agg) => curr + agg)
      .sortByKey()
      .collect()
    result.foreach(e => println(e))
  }

filter

  @Test
  def filterTest(): Unit = {
    val array: Array[Int] = sc.parallelize(Seq(1, 2, 3))
      .filter(item => item != 1)
      .collect()
    array.foreach(item => println(item))
  }

sample

  /**
   * 把大数据集 转存 小数据集
   * withReplacement true:可能有重复 false 不重复
   */
  @Test
  def sampleTest(): Unit = {
    sc.parallelize(Seq(1, 2, 3, 4, 5, 6, 7, 8, 9, 10))
      .sample(false, 0.6)
      .collect()
      .foreach(item => println(item))
  }

mapValues

  @Test
  def mapValuesTest(): Unit = {
    sc.parallelize(Seq(("a",1),("b",2),("c",3)))
      .mapValues(item => item * 10)
      .collect()
      .foreach(println(_))
  }

intersection [交集]

  /**
   * 交集
   */
  @Test
  def intersectionTest(): Unit = {
    sc.parallelize(Seq(1, 2, 4, 5, 6))
      .intersection(sc.parallelize(Seq(4, 5, 6, 7, 8, 9)))
      .collect()
      .foreach(println(_))
  }

union [并集]

  /**
   * 并集
   */
  @Test
  def unionTest(): Unit = {
    sc.parallelize(Seq(1, 2, 4, 5, 6))
      .union(sc.parallelize(Seq(4, 5, 6, 7, 8, 9)))
      .collect()
      .foreach(println(_))
  }

subtract [差集]

  /**
   * 差集
   */
  @Test
  def subtractTest(): Unit = {
    sc.parallelize(Seq(1, 2, 4, 5, 6))
      .subtract(sc.parallelize(Seq(4, 5, 6, 7, 8, 9)))
      .collect()
      .foreach(println(_))
  }

groupByKey

  /**
   * groupByKey 在 Map 端没有Combiner
   * reduceByKey 在 Map 端有Combiner,减少IO
   */
  @Test
  def groupByKeyTest(): Unit = {
    sc.parallelize(Seq(("a", 1), ("a", 2), ("b", 3)))
      .groupByKey()
      .collect()
      .foreach(println(_))
  }

combineByKey

  /**
   * 转换数据的函数(初始函数,作用于第一条数据,用于开启整个计算),在分区上进行聚合,把所有分区的聚合结果聚合为最终结果
   */
  @Test
  def combineByKey(): Unit = {
    sc.parallelize(Seq(
      ("zhangsan", 99.0),
      ("zhangsan", 88.0),
      ("lisi", 95.0),
      ("zhangsan", 80.0),
      ("lisi", 98.0))
    ).combineByKey(
      createCombiner = (curr:Double) => (curr,1),
      mergeValue = (curr:(Double,Int),nextValue:Double) => (curr._1 + nextValue,curr._2 + 1),
      mergeCombiners = (curr:(Double,Int), agg:(Double,Int)) => (curr._1 + agg._1 ,curr._2 + agg._2)
    ).map(item => (item._1,(item._2._1 / item._2._2))).collect().foreach(println(_))
  }

foldByKey

  /**
   * foldByKey 和 spark 中 reduceByKey 的区别是 可以指定初始值
   * foldByKey 和 Scala 中 foldLeft 和 foldRight 的区别是 初始值作用于每一个数据
   */
  @Test
  def foldByKey(): Unit = {
    sc.parallelize(Seq(("a", 1), ("b", 2), ("c", 3)))
      .foldByKey(10)((curr, agg) => curr + agg)
      .collect()
      .foreach(println(_))
  }

aggregateByKey

/**
   * aggregateByKey = (zerovalue,(seqop,combop))
   * zerovalue : 指定初始值
   * seqOp : 作用于每一个元素,根据初始值 进行计算
   * combOp : 将 seqOp 处理过得结果进行聚合
   *
   * aggregateByKey 特别适合每一条数据 先处理 后聚合
   */
  @Test
  def aggregateByKey(): Unit ={
    sc.parallelize(Seq(("手机",10),("手机",15),("电脑",20)))
      .aggregateByKey(0.8)(
        seqOp = (curr,zero) => curr * zero,
        combOp = (curr,agg) => curr + agg
      )
      .collect()
      .foreach(println(_))
  }

join

  @Test
  def join(): Unit ={
    val rdd1: RDD[(String, Int)] = sc.parallelize(Seq(("a", 1), ("b", 10), ("c", 20)))
    val rdd2: RDD[(String, Int)] = sc.parallelize(Seq(("a", 2), ("a", 11), ("c", 21)))
    rdd1.join(rdd2).collect().foreach(println(_))
  }

sort

  /**
   * sortBy 可以作用于任何数据的RDD ,sortbykey 只有KV 类型数据的RDD中才有
   * sortBy 可以按照任何部分来排序,sortByKey只能按照Key来排序
   * sortByKey 写法简单,不用编写函数了
   */
  @Test
  def sort(): Unit ={
//    sc.parallelize(Seq(4,65,32,68,312,9)).sortBy(item => item,ascending = false).collect().foreach(println(_))
    sc.parallelize(Seq(("a",1),("a",9),("a",7))).sortBy(item => item,ascending = false).collect().foreach(println(_))
  }

partitioning

  /**
   * repartition 进行重分区的时候,默认是shuffle 的
   * coalesce 进行重分区的时候,默认是不 shuffle 的 ,coalesce 默认不能增大分区数
   */
  @Test
  def partitioning(): Unit ={
//    println(sc.parallelize(Seq(1, 2, 3, 4, 5), 2).repartition(5).partitions.size)
    println(sc.parallelize(Seq(1, 2, 3, 4, 5), 2).coalesce(4 ,shuffle = true).partitions.size)
  }

相关文章

  • Spark-算子-transerformation

    Map[一对一] flatMap[一对多] mapPartitions mapPartitionsWithInde...

  • Spark-算子-Active

    reduce foreach countAndCountByKey take

  • spark-源码-action算子触发

    基于spark1.6 创建完SparkContext,然后执行Action算子 当RDD执行Action算子时(形...

  • spark-天池O2O竞赛

    地址转移到 : spark-天池O2O竞赛

  • Hive 入门

    Hive官网 Hive概述 Hive 的底层执行引擎有 :MapReduce,Tez,Spark- Hive on...

  • Spark-

    之前一直用yomail,但是最近觉得youmail这家伙有点丑。然后自带的那个设置又打不开,一气之下改换成spar...

  • 计算机视觉:matlab实现9种边缘检测算子

    9种边缘算子 Canny算子 Sobel算子 Susan算子 Prewitt算子 Laplace算子 LOG算子 ...

  • Spark- OutOfMemoryError

    java.lang.OutOfMemoryError: heap spacejava.lang.OutOfMemo...

  • Spark-缓存

    RDD通过persist方法或cache方法可以将前面得计算结果进行缓存,默认情况下persist会将数据序列化后...

  • Spark-前言

    主要内容说明 主要按以下3个大点进行说明,也符合进行操作过程种的步骤,读取数据-操作数据-分析保存数据.总结起来就...

网友评论

      本文标题:Spark-算子-transerformation

      本文链接:https://www.haomeiwen.com/subject/gaimiktx.html