美文网首页
Spark转化和行动操作

Spark转化和行动操作

作者: yanzhu728 | 来源:发表于2018-01-04 10:42 被阅读110次

    1.转化操作

    函数名 作用
    map() 参数是函数,函数应用于RDD每一个元素,返回值是新的RDD
    flatMap() 参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD
    filter() 参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD
    distinct() 没有参数,将RDD里的元素进行去重操作
    union() 参数是RDD,生成包含两个RDD所有元素的新RDD
    intersection() 参数是RDD,求出两个RDD的共同元素
    subtract() 参数是RDD,将原RDD里和参数RDD里相同的元素去掉
    cartesian() 参数是RDD,求两个RDD的笛卡儿积
    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
    val rddFile:RDD[String] = sc.textFile(path, 1)
     
    val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
    val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))
     
    /* map操作 */
    println("======map操作======")
    println(rddInt.map(x => x + 1).collect().mkString(","))
    println("======map操作======")
    /* filter操作 */
    println("======filter操作======")
    println(rddInt.filter(x => x > 4).collect().mkString(","))
    println("======filter操作======")
    /* flatMap操作 */
    println("======flatMap操作======")
    println(rddFile.flatMap { x => x.split(",") }.first())
    println("======flatMap操作======")
    /* distinct去重操作 */
    println("======distinct去重======")
    println(rddInt.distinct().collect().mkString(","))
    println(rddStr.distinct().collect().mkString(","))
    println("======distinct去重======")
    /* union操作 */
    println("======union操作======")
    println(rdd01.union(rdd02).collect().mkString(","))
    println("======union操作======")
    /* intersection操作 */
    println("======intersection操作======")
    println(rdd01.intersection(rdd02).collect().mkString(","))
    println("======intersection操作======")
    /* subtract操作 */
    println("======subtract操作======")
    println(rdd01.subtract(rdd02).collect().mkString(","))
    println("======subtract操作======")
    /* cartesian操作 */
    println("======cartesian操作======")
    println(rdd01.cartesian(rdd02).collect().mkString(","))
    println("======cartesian操作======")
    

    1.行动操作

    函数名 作用
    collect() 返回RDD所有元素
    count() RDD里元素个数
    countByValue() 各元素在RDD中出现次数
    reduce() 并行整合所有RDD数据,例如求和操作
    fold(0)(func) 和reduce功能一样,不过fold带有初始值
    aggregate(0)(seqOp,combop) 和reduce功能一样,但是返回的RDD数据类型和原RDD不一样
    foreach(func) 对RDD每个元素都是使用特定函数
    val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
    val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
     
    /* count操作 */
    println("======count操作======")
    println(rddInt.count())
    println("======count操作======")  
    /* countByValue操作 */
    println("======countByValue操作======")
    println(rddInt.countByValue())
    println("======countByValue操作======")
    /* reduce操作 */
    println("======countByValue操作======")
    println(rddInt.reduce((x ,y) => x + y))
    println("======countByValue操作======")
    /* fold操作 */
    println("======fold操作======")
    println(rddInt.fold(0)((x ,y) => x + y))
    println("======fold操作======")
    /* aggregate操作 */
    println("======aggregate操作======")
    val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2))
    println(res._1 + "," + res._2)
    println("======aggregate操作======")
    /* foeach操作 */
    println("======foeach操作======")
    println(rddStr.foreach { x => println(x) })
    println("======foeach操作======")
    

    相关文章

      网友评论

          本文标题:Spark转化和行动操作

          本文链接:https://www.haomeiwen.com/subject/aualnxtx.html