美文网首页
Spark开发 之 RDD算子

Spark开发 之 RDD算子

作者: 诺之林 | 来源:发表于2021-04-11 11:56 被阅读0次

    目录

    准备

    echo "Hadoop first" >> /tmp/text.txt
    
    echo "Spark second" >> /tmp/text.txt
    
    echo "Spark third" >> /tmp/text.txt
    
    cat /tmp/text.txt
    # Hadoop first
    # Spark second
    # Spark third
    

    转换算子

    /opt/services/spark/bin/spark-shell
    

    普通值

    1. map
    val textFile = sc.textFile("/tmp/text.txt")
    
    val mapRDD = textFile.map(line => line.split(" ")(1))
    
    mapRDD.foreach(println)
    # third
    # first
    # second
    

    对数据进行转换和改变

    1. flatMap
    val textFile = sc.textFile("/tmp/text.txt")
    
    val flatMapRDD = textFile.flatMap(line => line.split(" "))
    
    flatMapRDD.foreach(println)
    # Hadoop
    # first
    # Spark
    # third
    # Spark
    # second
    

    扁平化数据后再映射处理

    1. filter
    val textFile = sc.textFile("/tmp/text.txt")
    
    val filterRDD = textFile.filter(_.contains("Spark"))
    
    filterRDD.foreach(println)
    # Spark third
    # Spark second
    

    按指定规则筛选数据

    Scala占位符语法: SCALA CHEATSHEET

    1. distinct
    val textFile = sc.textFile("/tmp/text.txt")
    
    val flatMapRDD = textFile.flatMap(_.split(" "))
    
    val distinctRDD = flatMapRDD.distinct()
    
    distinctRDD.foreach(println)
    # Spark
    # second
    # third
    # first
    # Hadoop
    

    将重复的数据去重

    键值对

    1. reduceByKey
    val textFile = sc.textFile("/tmp/text.txt")
    
    val flatMapRDD = textFile.flatMap(_.split(" "))
    
    val mapRDD = flatMapRDD.map((_, 1))
    
    mapRDD.foreach(println)
    # (Spark,1)
    # (Hadoop,1)
    # (first,1)
    # (third,1)
    # (Spark,1)
    # (second,1)
    
    val reduceRDD = mapRDD.reduceByKey(_+_)
    
    reduceRDD.foreach(println)
    # (Spark,2)
    # (third,1)
    # (Hadoop,1)
    # (second,1)
    # (first,1)
    

    按照相同的Key执行对Value的操作

    1. groupByKey
    val textFile = sc.textFile("/tmp/text.txt")
    
    val flatMapRDD = textFile.flatMap(_.split(" "))
    
    val mapRDD = flatMapRDD.map((_, 1))
    
    val groupRDD = mapRDD.groupByKey()
    
    groupRDD.foreach(println)
    # (second,CompactBuffer(1))
    # (Spark,CompactBuffer(1, 1))
    # (first,CompactBuffer(1))
    # (third,CompactBuffer(1))
    # (Hadoop,CompactBuffer(1))
    

    按照相同的Key执行对Value的分组

    val mapRDD = groupRDD.map(i => (i._1,i._2.sum))
    
    mapRDD.foreach(println)
    # (second,1)
    # (first,1)
    # (Spark,2)
    # (third,1)
    # (Hadoop,1)
    
    1. sortByKey
    val textFile = sc.textFile("/tmp/text.txt")
    
    val flatMapRDD = textFile.flatMap(_.split(" "))
    
    val mapRDD = flatMapRDD.map((_, 1))
    
    val sortRDD = mapRDD.sortByKey(false)
    
    sortRDD.foreach(println)
    # (Spark,1)
    # (third,1)
    # (Spark,1)
    # (second,1)
    # (Hadoop,1)
    # (first,1)
    

    按照Key进行排序(升序/降序)

    1. join
    val textFile = sc.textFile("/tmp/text.txt")
    
    val secondRDD = textFile.filter(_.contains("second")).map(_.split(" ")).map(a => (a(0), a(1)))
    
    secondRDD.foreach(println)
    # (Spark,second)
    
    val thirdRDD = textFile.filter(_.contains("third")).map(_.split(" ")).map(a => (a(0), a(1)))
    
    thirdRDD.foreach(println)
    # (Spark,third)
    
    val joinRDD = secondRDD.join(thirdRDD)
    
    joinRDD.foreach(println)
    # (Spark,(second,third))
    

    两个RDD 按照相同的Key执行对Value的分组

    行动算子

    /opt/services/spark/bin/spark-shell
    
    1. count
    val textFile = sc.textFile("/tmp/text.txt")
    
    textFile.count()
    # Long = 3
    

    返回元素的个数

    1. first
    val textFile = sc.textFile("/tmp/text.txt")
    
    textFile.first()
    # String = Hadoop first
    

    返回第一个元素

    1. take
    val textFile = sc.textFile("/tmp/text.txt")
    
    textFile.take(2)
    # Array[String] = Array(Hadoop first, Spark second)
    

    以数组的形式返回前N个元素

    1. collect
    val textFile = sc.textFile("/tmp/text.txt")
    
    textFile.collect()
    # Array[String] = Array(Hadoop first, Spark second, Spark third)
    

    已数组的形式返回所有元素

    1. foreach
    val textFile = sc.textFile("/tmp/text.txt")
    
    textFile.foreach(println)
    # Spark third
    # Hadoop first
    # Spark second
    

    遍历每个元素并执行指定方法

    1. reduce
    sc.makeRDD(List(1, 2, 3, 4, 5)).reduce(_+_)
    # Int = 15
    

    通过指定方法聚合元素

    小结

    转换算子 行动算子
    Transformations Actions
    返回新RDD
    (因为RDD不可变)
    返回计算结果
    image.png

    想要了解更多算子 可以参考RDD Programming Guide

    参考

    相关文章

      网友评论

          本文标题:Spark开发 之 RDD算子

          本文链接:https://www.haomeiwen.com/subject/vebhqltx.html