美文网首页
Spark第一天作业

Spark第一天作业

作者: 吾为天帝乎 | 来源:发表于2018-11-06 17:22 被阅读0次

    1.自定义一个集合val list = List("张三:20","李四:21","王五:20"),分三个区 使用mapPartitions算子构建一个新的集合,要求内部元素的格式为(String,Int),分别代表姓名和年龄

    2.自定义一个集合val list = List(1,2,3,4,5,6,7,8,9),分三个区 使用mapPartitionsWithIndex算子构建一个新的集合,要求记录每个分区的元素,同时记录每个分区元素的平方和

    3.自定义一个集合val list = List((1,1),(1,3),(1,5),(1,7),(2,2),(2,4),(2,6),(2,8))(1)使用aggregateByKey算子构建一个新的集合,要求输出每个key对应的元素乘积

    (2)使用aggregateByKey算子构建一个新的集合,要求输出每个key对应的元素加和

    object Work01 {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Master")
        val sc = new SparkContext(sparkConf)
        val list = List("张三:20","李四:21","王五:20")
        val listRDD: RDD[String] = sc.parallelize(list,3)
        val f : (Iterator[String]) => (Iterator[(String,Int)]) = it => {
          var result:ListBuffer[(String,Int)] = ListBuffer()
          while (it.hasNext){
            val value = it.next()
            val name = value.split(":")(0)
            val age = value.split(":")(1).toInt
            result.append((name,age))
          }
          result.iterator
        }
        println(listRDD.mapPartitions(f).partitions.size)
        // 重新分区 - 传入分区器(分区数量)
        println(listRDD.mapPartitions(f).partitionBy(new HashPartitioner(1)).partitions.size)
      }
    }
    
    
    object Work02 {
      def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Master")
        val sc = new SparkContext(sparkConf)
        val list = List(1,2,3,4,5,6,7,8,9)
        val listRDD: RDD[Int] = sc.parallelize(list,3)
        val f: (Int,Iterator[Int]) => (Iterator[(Int,Int)]) = (index,it) => {
          var sum = 0
          var result:ListBuffer[(Int,Int)] = ListBuffer()
          while (it.hasNext){
            val value = it.next()
            sum += (value * value)
            result.append((index,value))
          }
          result.append((index,sum))
          result.iterator
        }
        listRDD.mapPartitionsWithIndex(f).groupByKey().foreach(println)
    
      }
    
     def main(args: Array[String]): Unit = {
        val sparkConf = new SparkConf().setMaster("local[*]").setAppName("Master")
        val sc = new SparkContext(sparkConf)
        val list = List((1,1),(1,3),(1,5),(1,7),(2,2),(2,4),(2,6),(2,8))
        val pairRDD: RDD[(Int,Int)] = sc.parallelize(list)
        pairRDD.groupByKey().foreach(println)
        val seq1: (Int,Int) => (Int) = (zero,value) => {
          zero * value
        }
        val com1: (Int,Int) => (Int) = (par,otherPar) => {
          par * otherPar
        }
        pairRDD.aggregateByKey(1)(seq1,com1).foreach(println)
        val seq2: (Int,Int) => (Int) = (zero,value) => {
          zero + value
        }
        val com2: (Int,Int) => (Int) = (par,otherPar) => {
          par + otherPar
        }
        pairRDD.aggregateByKey(0)(seq2,com2).foreach(println)
      }
    

    相关文章

      网友评论

          本文标题:Spark第一天作业

          本文链接:https://www.haomeiwen.com/subject/kowyxqtx.html