美文网首页
Spark第二天

Spark第二天

作者: 吾为天帝乎 | 来源:发表于2018-11-06 23:09 被阅读4次

    1.自定义一个集合val list = List("张三:20","李四:21","王五:20"),分三个区 使用mapPartitions算子构建一个新的集合,要求内部元素的格式为(String,Int),分别代表姓名和年龄

    2.自定义一个集合val list = List(1,2,3,4,5,6,7,8,9),分三个区 使用mapPartitionsWithIndex算子构建一个新的集合,要求记录每个分区的元素,同时记录每个分区元素的平方和

    3.自定义一个集合val list = List((1,1),(1,3),(1,5),(1,7),(2,2),(2,4),(2,6),(2,8))(1)使用aggregateByKey算子构建一个新的集合,要求输出每个key对应的元素乘积

    (2)使用aggregateByKey算子构建一个新的集合,要求输出每个key对应的元素加和

    object Work01 {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Master")
        val sc = new SparkContext(sparkConf)
        val list = List("张三:20","李四:21","王五:20")
        val listRDD: RDD[String] = sc.parallelize(list,3)
        val f : (Iterator[String]) => (Iterator[(String,Int)]) = it => {
          var result:ListBuffer[(String,Int)] = ListBuffer()
          while (it.hasNext){
            val value = it.next()
            val name = value.split(":")(0)
            val age = value.split(":")(1).toInt
            result.append((name,age))
          }
          result.iterator
        }
        println(listRDD.mapPartitions(f).partitions.size)
        // 重新分区 - 传入分区器(分区数量)
        println(listRDD.mapPartitions(f).partitionBy(new HashPartitioner(1)).partitions.size)
      }
    }
    
    
    object Work02 {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Master")
        val sc = new SparkContext(sparkConf)
        val list = List(1,2,3,4,5,6,7,8,9)
        val listRDD: RDD[Int] = sc.parallelize(list,3)
        val f: (Int,Iterator[Int]) => (Iterator[(Int,Int)]) = (index,it) => {
          var sum = 0
          var result:ListBuffer[(Int,Int)] = ListBuffer()
          while (it.hasNext){
            val value = it.next()
            sum += (value * value)
            result.append((index,value))
          }
          result.append((index,sum))
          result.iterator
        }
        listRDD.mapPartitionsWithIndex(f).groupByKey().foreach(println)
    
      }
    
     def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Master")
        val sc = new SparkContext(sparkConf)
        val list = List((1,1),(1,3),(1,5),(1,7),(2,2),(2,4),(2,6),(2,8))
        val pairRDD: RDD[(Int,Int)] = sc.parallelize(list)
        pairRDD.groupByKey().foreach(println)
        val seq1: (Int,Int) => (Int) = (zero,value) => {
          zero * value
        }
        val com1: (Int,Int) => (Int) = (par,otherPar) => {
          par * otherPar
        }
        pairRDD.aggregateByKey(1)(seq1,com1).foreach(println)
        val seq2: (Int,Int) => (Int) = (zero,value) => {
          zero + value
        }
        val com2: (Int,Int) => (Int) = (par,otherPar) => {
          par + otherPar
        }
        pairRDD.aggregateByKey(0)(seq2,com2).foreach(println)
      }
    
    scala> var data = sc.parallelize(List((1,3),(1,2),(1, 4),(2,3),(7,9),(2,4)))
    
    scala> data.sortByKey(true).collect
    res10: Array[(Int, Int)] = Array((1,3), (1,2), (1,4), (2,3), (2,4), (7,9))
    
    • join(otherDataset, [numTasks])

      对于数据集类型为 (K, V) 及 (K, W)的RDD,join操作后返回类型为 (K, (V, W))

    scala> val rdd1=sc.parallelize(Array((1,2),(1,3))
        
    rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[24] at parallelize at <console>:21
    
    scala> val rdd2=sc.parallelize(Array((1,3)))
    rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:21
    
    scala> rdd1.join(rdd2).collect
    res13: Array[(Int, (Int, Int))] = Array((1,(3,3)), (1,(2,3)))
    
    
    • cogroup(otherDataset, [numTasks])

      如果输入的RDD类型为(K, V) 和(K, W),则返回的RDD类型为 (K, (Iterable, Iterable)) . 该操作与 groupWith等同

    scala> val rdd1=sc.parallelize(Array((1,2),(1,3))
         | )
    rdd1: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[24] at parallelize at <console>:21
    
    scala> val rdd2=sc.parallelize(Array((1,3)))
    rdd2: org.apache.spark.rdd.RDD[(Int, Int)] = ParallelCollectionRDD[32] at parallelize at <console>:21
    
    scala> rdd1.cogroup(rdd2).collect
    res17: Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((1,(CompactBuffer(3, 2),CompactBuffer(3))))
    
    scala> rdd1.groupWith(rdd2).collect
    res18: Array[(Int, (Iterable[Int], Iterable[Int]))] = Array((1,(CompactBuffer(2, 3),CompactBuffer(3))))
    --------------------- 
    
    
    • pipe(command):

      返回一个新的RDD,使用shell命令对各分区进行管

      道化,生成新的RDD

    val list = List(1,2,3,4,5,6,7,8)
    val listRDD = sc.parallelize(list,4)
    val resultRDD = listRDD.pipe("head -n 1")
    resultRDD.foreach(println)
    // 结果: 1 3 5 7
    
    
    • coalesce(numPartitions):

      返回一个新的RDD,将原RDD重新分区,将分区数量减少到指定个数,默认不执行shuffle操作

    val list = List(1,2,3,4,5,6,7,8)
    val listRDD = sc.parallelize(list,4)
    listRDD.foreach(println)
    // 结果:1 2 -> 3 4 -> 5 6 -> 7 8
    val resultRDD = listRDD.coalesce(2)
    resultRDD.foreach(println)
    // 结果:1 2 3 4 -> 5 6 7 8
    
    
    • repartition(numPartitions):返回一个新的RDD,将原RDD重新分
      区,尽量保持各分区数据平衡,会执行shuffle操作

    repartition(numPartitions):返回一个新的RDD,将原RDD重新分区,尽量保持各分区数据平衡,会执行shuffle操作

    val list = List(1,2,3,4,5,6,7,8)
    val listRDD = sc.parallelize(list,4)
    val resultRDD = listRDD.repartition(2)
    resultRDD.foreach(println)
    // 结果: 1 3 5 7 -> 2 4 6 8
    

    Active

    • reduce
    val list = List(1,2,3,4,5,6)
    val listRDD = sc.parallelize(list)
    val result = listRDD.reduce(_
    +
    _)
    // 结果: 21
    
    
    • collect

    以数组的形式返回RDD中的所有元素

    val list = List(1,2,3,4,5,6)
    val listRDD = sc.parallelize(list)
    val result = listRDD.collect
    // 结果: Array(1, 2, 3, 4, 5, 6)
    
    • count:返回RDD中元素的个数,数据类型为Long
    val list = List(1,2,3,4,5,6)
    val listRDD = sc.parallelize(list)
    val result = listRDD.count
    // 结果: 6
    

    重点:

    first

    :返回RDD中的第一个元素

    val list = List(1,2,3,4,5,6)
    val listRDD = sc.parallelize(list)
    val result = listRDD.first
    // 结果: 1
    
    • takeSample(withReplacement,num[,seed])

    对一个RDD进行随机抽
    样,withReplacement用于指定是否有放回,num为返回的数据数
    量,结果格式为数组

    val list = List(1,2,3,4,5,6,7,8,9,10)
    val listRDD = sc.parallelize(list)
    val result = listRDD.takeSample(false,4)
    // 结果: Array(5, 8, 10, 6)
    
    • take(num):

      返回一个RDD中的前n个元素,结果格式为数组

    val list = List(1,2,3,4,5,6,7,8,9,10)
    val listRDD = sc.parallelize(list)
    val result = listRDD.take(4)
    // 结果: Array(1, 2, 3, 4)
    
    
    • takeOrdered(num):

      返回RDD的前n个元素,默认升序排序,也可以
      传入自定义比较器,结果格式为数组

    val list = List(6,5,4,3,2,1)
    val listRDD = sc.parallelize(list)
    val result = listRDD.takeOrdered(4)
    // 结果: Array(1, 2, 3, 4)
    
    

    相关文章

      网友评论

          本文标题:Spark第二天

          本文链接:https://www.haomeiwen.com/subject/egypxqtx.html