目录
准备
echo "Hadoop first" >> /tmp/text.txt
echo "Spark second" >> /tmp/text.txt
echo "Spark third" >> /tmp/text.txt
cat /tmp/text.txt
# Hadoop first
# Spark second
# Spark third
转换算子
/opt/services/spark/bin/spark-shell
普通值
- map
val textFile = sc.textFile("/tmp/text.txt")
val mapRDD = textFile.map(line => line.split(" ")(1))
mapRDD.foreach(println)
# third
# first
# second
对数据进行转换和改变
- flatMap
val textFile = sc.textFile("/tmp/text.txt")
val flatMapRDD = textFile.flatMap(line => line.split(" "))
flatMapRDD.foreach(println)
# Hadoop
# first
# Spark
# third
# Spark
# second
扁平化数据后再映射处理
- filter
val textFile = sc.textFile("/tmp/text.txt")
val filterRDD = textFile.filter(_.contains("Spark"))
filterRDD.foreach(println)
# Spark third
# Spark second
按指定规则筛选数据
Scala占位符语法: SCALA CHEATSHEET
- distinct
val textFile = sc.textFile("/tmp/text.txt")
val flatMapRDD = textFile.flatMap(_.split(" "))
val distinctRDD = flatMapRDD.distinct()
distinctRDD.foreach(println)
# Spark
# second
# third
# first
# Hadoop
将重复的数据去重
键值对
- reduceByKey
val textFile = sc.textFile("/tmp/text.txt")
val flatMapRDD = textFile.flatMap(_.split(" "))
val mapRDD = flatMapRDD.map((_, 1))
mapRDD.foreach(println)
# (Spark,1)
# (Hadoop,1)
# (first,1)
# (third,1)
# (Spark,1)
# (second,1)
val reduceRDD = mapRDD.reduceByKey(_+_)
reduceRDD.foreach(println)
# (Spark,2)
# (third,1)
# (Hadoop,1)
# (second,1)
# (first,1)
按照相同的Key执行对Value的操作
- groupByKey
val textFile = sc.textFile("/tmp/text.txt")
val flatMapRDD = textFile.flatMap(_.split(" "))
val mapRDD = flatMapRDD.map((_, 1))
val groupRDD = mapRDD.groupByKey()
groupRDD.foreach(println)
# (second,CompactBuffer(1))
# (Spark,CompactBuffer(1, 1))
# (first,CompactBuffer(1))
# (third,CompactBuffer(1))
# (Hadoop,CompactBuffer(1))
按照相同的Key执行对Value的分组
val mapRDD = groupRDD.map(i => (i._1,i._2.sum))
mapRDD.foreach(println)
# (second,1)
# (first,1)
# (Spark,2)
# (third,1)
# (Hadoop,1)
- sortByKey
val textFile = sc.textFile("/tmp/text.txt")
val flatMapRDD = textFile.flatMap(_.split(" "))
val mapRDD = flatMapRDD.map((_, 1))
val sortRDD = mapRDD.sortByKey(false)
sortRDD.foreach(println)
# (Spark,1)
# (third,1)
# (Spark,1)
# (second,1)
# (Hadoop,1)
# (first,1)
按照Key进行排序(升序/降序)
- join
val textFile = sc.textFile("/tmp/text.txt")
val secondRDD = textFile.filter(_.contains("second")).map(_.split(" ")).map(a => (a(0), a(1)))
secondRDD.foreach(println)
# (Spark,second)
val thirdRDD = textFile.filter(_.contains("third")).map(_.split(" ")).map(a => (a(0), a(1)))
thirdRDD.foreach(println)
# (Spark,third)
val joinRDD = secondRDD.join(thirdRDD)
joinRDD.foreach(println)
# (Spark,(second,third))
两个RDD 按照相同的Key执行对Value的分组
行动算子
/opt/services/spark/bin/spark-shell
- count
val textFile = sc.textFile("/tmp/text.txt")
textFile.count()
# Long = 3
返回元素的个数
- first
val textFile = sc.textFile("/tmp/text.txt")
textFile.first()
# String = Hadoop first
返回第一个元素
- take
val textFile = sc.textFile("/tmp/text.txt")
textFile.take(2)
# Array[String] = Array(Hadoop first, Spark second)
以数组的形式返回前N个元素
- collect
val textFile = sc.textFile("/tmp/text.txt")
textFile.collect()
# Array[String] = Array(Hadoop first, Spark second, Spark third)
已数组的形式返回所有元素
- foreach
val textFile = sc.textFile("/tmp/text.txt")
textFile.foreach(println)
# Spark third
# Hadoop first
# Spark second
遍历每个元素并执行指定方法
- reduce
sc.makeRDD(List(1, 2, 3, 4, 5)).reduce(_+_)
# Int = 15
通过指定方法聚合元素
小结
转换算子 | 行动算子 |
---|---|
Transformations | Actions |
返回新RDD (因为RDD不可变) |
返回计算结果 |
想要了解更多算子 可以参考RDD Programming Guide
网友评论