美文网首页
两种类型的算子:transformation和actio

两种类型的算子:transformation和actio

作者: lehuai | 来源:发表于2018-01-05 10:02 被阅读0次

    练习

    package day07
    
    import org.apache.spark.{SparkConf, SparkContext}
    
    object SparkRDDTest {
    
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("SparkRDDTest").setMaster("local")
    
        val sc = new SparkContext(conf)
    
        // 通过并行化生成rdd
        val rdd1 = sc.parallelize(List(5,6,4,7,3,8,2,9,1,10))
    
        // 对rdd1里的每一个元素乘2然后排序
    //    val res1 = rdd1.map(_ * 2).sortBy(x => x,true)
    //    println(res1.collect().toBuffer)
        // 过滤出大于等于10的元素
    //    val res2 = res1.filter(_ >= 10)
    
        // 将元素以数组的方式打印出来
    //    println(res2.collect().toBuffer)
    
        val rdd2 = sc.parallelize(Array("a b c","d e f","h i j"))
        // 将rdd2里面的每一个元素先切分再压平
    //    val res = rdd2.flatMap(_.split(' '))
    //    println(res.collect.toBuffer)
    
        // 来个复杂的,
        val rdd3 = sc.parallelize(List(List("a b c","a b b"),List("e f g","a f g"),List("h i j","a a b")))
        // 将rdd3里面的每一个元素先切分再压平
    //    val res = rdd3.flatMap(_.flatMap(_.split(" ")))
    //    println(res.collect().toBuffer)
    
        val rdd4 = sc.parallelize(List(5,6,4,3))
        val rdd5 = sc.parallelize(List(1,2,3,4))
        // 求并集
        val unionres = rdd4 union rdd5
    //    println(res.collect().toBuffer)
        // 求交集
    //    println(rdd4.intersection(rdd5).collect().toBuffer)
        // 去重
    //    println(unionres.distinct().collect().toBuffer)
    
        val rdd6 = sc.parallelize(List(("tom",1),("jerry",3),("kitty",2)))
        val rdd7 = sc.parallelize(List(("jerry",2),("tom",1),("shuke",2)))
        // 求join
    //    println((rdd6 join rdd7).collect().toBuffer)
    
        // 求左连接和右连接
    //    val res1 = rdd6.leftOuterJoin(rdd7)
    //    val res2 = rdd6.rightOuterJoin(rdd7)
    //    println(res1.collect().toBuffer)
        // 求并集
    //    val res = rdd6 union(rdd7)
    
        // 按key进行分组
    //    println(res.groupByKey().collect().toBuffer)
    
        // 分别用groupByKey和reduceByKey实现单词计数,注意groupByKey与reduceByKey的区别
        // groupByKey
    //    println(res.groupByKey().mapValues(_.sum).collect().toBuffer)
        // reduceByKey
    //    println(res.reduceByKey(_ + _).collect.toBuffer)
    
        val rdd8 = sc.parallelize(List(("tom",1),("tom",2),("jerry",3),("kitty",2)))
        val rdd9 = sc.parallelize(List(("jerry",2),("tom",1),("shuke",2)))
        // cogroup  注意cogroup与groupByKey的区别
    //    println(rdd8.cogroup(rdd9).collect().toBuffer)
    
        val rdd10 = sc.parallelize(List(1,2,3,4,5))
        // reduce聚合
    //    println(rdd10.reduce(_+_))
    
        val rdd11 = sc.parallelize(List(("tom",1),("jerry",3),("kitty",2),("shuke",1)))
        val rdd12 = sc.parallelize(List(("jerry",2),("tom",3),("shuke",2),("kitty",5)))
        val rdd13 = rdd11.union(rdd12)
        // 按key进行聚合
    //    reduceByKey
    
    
        // 按value的降序排序
        val res = rdd13.reduceByKey(_+_).map(t => (t._2,t._1)).sortByKey(false).map(t => (t._2,t._1))
        println(res.collect.toBuffer)
    
        // 笛卡尔积
    //    println(rdd11.cartesian(rdd12).collect.toBuffer)
    
        // 其他:count、top、take、first、takeOrdered
        
    
    
      }
    }
    
    

    相关文章

      网友评论

          本文标题:两种类型的算子:transformation和actio

          本文链接:https://www.haomeiwen.com/subject/eqirnxtx.html