美文网首页
2020-11-28-Spark-7(Spark-Core)

2020-11-28-Spark-7(Spark-Core)

作者: 冰菓_ | 来源:发表于2020-12-05 08:02 被阅读0次

聚合算子的区别 ,flatmapvalue的实现原理 各种join的实现(使用cogroup)(模式匹配的应用) 分组求和的多种方式
(考虑算子的性能和效率)

1.复习4个聚合算子

aggregateBykey:分区内的规则与分区间的规则可以不一致
reduceBykey:无法指定初始值,分区内和分区间的规则是一致的
foldBykey:是aggregateBykey的简化版,分区内和分区间的规则是一致的
ConbineBykey:比较与aggregateBykey的初始值,更符合聚合的含义,第一个参数指定返回的RDDvalue值类型

2.keys,values,mapValues、flatMapValues

object Test1 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("t").setMaster("local[*]"))
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("A", 1), ("B", 8), ("C", 7), ("A", 7)))
    // def keys: RDD[K] = self.map(_._1)
    val keysrdd: RDD[String] = rdd.keys
    val valrdd: RDD[Int] = rdd.values
    val mvrdd: RDD[(String, Int)] = rdd.mapValues(data => data)
    keysrdd.collect.foreach(println)
    sc.stop()
  }
}

flatMapValues源码:

    val cleanF = self.context.clean(f)
    new MapPartitionsRDD[(K, U), (K, V)](self,
      (context, pid, iter) => iter.flatMap { case (k, v) =>
        cleanF(v).map(x => (k, x))
      },
      preservesPartitioning = true)
  }

3.分组求和(groupByKey reduceByKey flatMapValues )

//groupByKey 
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Test2 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("test2").setMaster("local[*]"))
    val rdd = sc.makeRDD(List(("小明", "1,2,3"), ("小明", "4,5,6"), ("小莫", "2,3,4")))
    //分组求总和
    val value: RDD[(String, Iterable[String])] = rdd.groupByKey()
    val value1: RDD[(String, Iterable[Array[Int]])] = value.mapValues(data => {
      data.map(data => {
        val strings: Array[String] = data.split(",")
        val ints: Array[Int] = strings.map(data => data.toInt)
        ints
      })
    })
    val result = value1.map(data => (data._1, data._2.map(_.sum).sum))
    result.collect.foreach(println)
    sc.stop()
  }
}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Test3 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("test2").setMaster("local[*]"))
    val rdd = sc.makeRDD(List(("小明", "1,2,3"), ("小明", "4,5,6"), ("小莫", "2,3,4")))
    val mvrdd: RDD[(String, Array[String])] = rdd.mapValues(data => data.split(","))
    val maprdd: RDD[(String, Int)] = mvrdd.map(data => (data._1, data._2.map(data => data.toInt).sum))
    val result: RDD[(String, Int)] = maprdd.reduceByKey(_ + _)
     result.collect.foreach(println)
    sc.stop()
  }
}
//flatMapValues
object Test4 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("test2").setMaster("local[*]"))
    val rdd = sc.makeRDD(List(("小明", "1,2,3"), ("小明", "4,5,6"), ("小莫", "2,3,4")),5)
    val fmvrdd: RDD[(String, Int)] = rdd.flatMapValues(data => data.split(",").map(_.toInt))
    val value: RDD[(String, Int)] = fmvrdd.reduceByKey(_ + _)
    value.collect.foreach(println)
    sc.stop()
  }
}
//flatMapValues的实现
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object Test5 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("test2").setMaster("local[*]"))
    val rdd = sc.makeRDD(List(("小明", "1,2,3"), ("小明", "4,5,6"), ("小莫", "2,3,4")),2)
    val result: RDD[Array[(String, String)]] = rdd.map(data => {
      data match {
        case (x, y) => {
          //返回的是数组
          val tuples: Array[(String, String)] = y.split(",").map(data => (x, data))
          tuples
        }
      }
    })
    val value: RDD[(String, String)] = result.flatMap(data => data)
      value.collect.foreach(println)
     //result.collect.foreach(println)
    sc.stop()
  }
}

4.用cogroup实现内连接 左(右)外连接 全连接

(flatmapvalue和mapvalue)

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Test6 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("t").setMaster("local[*]"))
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("A", 1), ("B", 8), ("C", 7), ("A", 7)))
    val rdd1 = sc.makeRDD(List(("A", "60"), ("A", "80"), ("C", "90"), ("F", "100")))
    //connectandgroup方法
    // val cagrdd: RDD[(String, (Iterable[Int], Iterable[String]))] = rdd.cogroup(rdd1)
    // cagrdd.collect.foreach(println)
    //(A,(CompactBuffer(1, 7),CompactBuffer(60, 80)))
    //(B,(CompactBuffer(8),CompactBuffer()))
    //(C,(CompactBuffer(7),CompactBuffer(90)))
    //(F,(CompactBuffer(),CompactBuffer(100)))
    //rdd.join(rdd1).collect.foreach(println)
    //实现join
    val value: RDD[(String, (Int, String))] = rdd.cogroup(rdd1).flatMapValues(x => {
      //为什么可以这样,数据是一条一条传过来的,只要数据为空就跳过,从而实现join的效果
      for (num <- x._1; num2 <- x._2) yield (num, num2)
    })
    value.collect.foreach(println)
    sc.stop()
  }
}

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Test6 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("t").setMaster("local[*]"))
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("A", 1), ("B", 8), ("C", 7), ("A", 7)))
    val rdd1 = sc.makeRDD(List(("A", "60"), ("A", "80"), ("C", "90"), ("F", "100")))
    //实现leftoutjoin
    val cagrdd: RDD[(String, (Iterable[Int], Iterable[String]))] = rdd.cogroup(rdd1)
    val value = cagrdd.flatMapValues(data => {
      data match {
        case (vs, Seq()) => vs.iterator.map(data => (data, None))
        case (vs, ws) => for (x <- vs.iterator; y <- ws.iterator) yield (x, y)
      }
    })
    value.collect.foreach(println)
    sc.stop()
  }
}
import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Test6 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("t").setMaster("local[*]"))
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("A", 1), ("B", 8), ("C", 7), ("A", 7)))
    val rdd1 = sc.makeRDD(List(("A", "60"), ("A", "80"), ("C", "90"), ("F", "100")))
    //实现leftoutjoin
    val cagrdd: RDD[(String, (Iterable[Int], Iterable[String]))] = rdd.cogroup(rdd1)
    val value = cagrdd.flatMapValues(data => {
      data match {
        case (vs, Seq()) => vs.iterator.map(data => (data, None))
        case (Seq(),ws) => ws.iterator.map(data => (None,data))
        case (vs, ws) => for (x <- vs.iterator; y <- ws.iterator) yield (x, y)
      }
    })
    value.collect.foreach(println)
    sc.stop()
  }
}

(io.Serializable 源码是包装为Some())

import java.io

import org.apache.spark.{SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

object Test7 {
  def main(args: Array[String]): Unit = {
    val sc = new SparkContext(new SparkConf().setAppName("t").setMaster("local[*]"))
    val rdd: RDD[(String, Int)] = sc.makeRDD(List(("A", 1), ("B", 8), ("C", 7), ("A", 7)))
    val rdd1 = sc.makeRDD(List(("A", "60"), ("A", "80"), ("C", "90"), ("F", "100")))
    val cagrdd: RDD[(String, (Iterable[Int], Iterable[String]))] = rdd.cogroup(rdd1)
    //使用if来判断
    val value: RDD[(String, (Int, io.Serializable))] = cagrdd.flatMapValues(data => {

      if (data._1.isEmpty && data._2.nonEmpty) {
        data._2.map(data => (None, data))
      }
      if (data._1.nonEmpty && data._2.isEmpty) {
        data._1.map(data => (data, None))
      }
      else {
        for (vs <- data._1; ws <- data._2) yield (vs, ws)
      }
    })
    value.collect.foreach(println)
    sc.stop()
  }
}

相关文章

网友评论

      本文标题:2020-11-28-Spark-7(Spark-Core)

      本文链接:https://www.haomeiwen.com/subject/ppwtwktx.html