美文网首页
011 Spark RDD

011 Spark RDD

作者: 逸章 | 来源:发表于2020-10-07 22:25 被阅读0次

一、转换和动作

1.1 Transformation

常见的5种转换为filter(func)、map(func)、flatMap(func)、groupByKey、reduceByKey(func)

scala> val lines = sc.textFile("file:///home/hadoop/word.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/word.txt MapPartitionsRDD[3] at textFile at <console>:24


scala> val linesWithSpark = lines.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at filter at <console>:25


scala> val data = Array(1,2,3,4,5)
data: Array[Int] = Array(1, 2, 3, 4, 5)

scala> val rdd1 = sc.parallelize(data)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:26

scala> val rdd2 = rdd1.map(x => x + 10)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at map at <console>:25

scala> rdd2.collect
res4: Array[Int] = Array(11, 12, 13, 14, 15)



scala> val linesWithSpark = lines.map(line => line.split(" "))
linesWithSpark: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[8] at map at <console>:25

scala> linesWithSpark.collect
res5: Array[Array[String]] = Array(Array(Hadoop, is, good), Array(Spark, is, fast), Array(Spark, is, better))

scala> val linesSplit = lines.flatMap(line => line.split(" "))
linesSplit: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at flatMap at <console>:25

scala> linesSplit.collect()
res6: Array[String] = Array(Hadoop, is, good, Spark, is, fast, Spark, is, better)




scala> val v = Array("one","two","two","three","three","three","three")
v: Array[String] = Array(one, two, two, three, three, three, three)

scala> val v2 = v.map(one => (one, 1))
v2: Array[(String, Int)] = Array((one,1), (two,1), (two,1), (three,1), (three,1), (three,1), (three,1))

scala> val v2 = sc.parallelize(v).map(one => (one, 1))
v2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[1] at map at <console>:26

//********************下面展示了groupByKey操作
scala> val reduceByResult = v2.reduceByKey(_+_)
reduceByResult: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at reduceByKey at <console>:25

scala> reduceByResult.collect()
res1: Array[(String, Int)] = Array((two,2), (one,1), (three,4))                 

//********************下面展示了groupByKey操作
scala> val groupByResult = v2.groupByKey().map(t => (t._1, t._2.sum))
groupByResult: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:25

scala> v2.groupByKey().map(t => (t._1, t._2.sum)).foreach(x => println(x._1+","x._2))
<console>:1: error: ')' expected but '.' found.
       v2.groupByKey().map(t => (t._1, t._2.sum)).foreach(x => println(x._1+","x._2))
                                                                                ^

scala> v2.groupByKey().map(t => (t._1, t._2.sum)).foreach(x => println(x._1+","+x._2))
two,2
one,1
three,4


1.2 Action

常用的几个action为:count、collect、first、take、reduce(func)、foreach(func)
例子:

scala> val rdd = sc.parallelize(Array(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24

scala> rdd.count
res3: Long = 5

scala> rdd.first
res4: Int = 1

scala> rdd.take(3)
res5: Array[Int] = Array(1, 2, 3)

scala> rdd.reduce(_+_)
res6: Int = 15

scala> rdd.collect
res7: Array[Int] = Array(1, 2, 3, 4, 5)

scala> rdd.foreach(println(_))
1
2
3
4
5

二、持久化操作

scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)

scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> rdd.cache
res0: rdd.type = ParallelCollectionRDD[0] at parallelize at <console>:26

scala> println(rdd.count)
3                                                                               

scala> println(rdd.collect().mkString(","))
Hadoop,Spark,Hive

scala> println(rdd.collect().mkString("[",",","]"))
[Hadoop,Spark,Hive]

scala> rdd.persist(StorageLevel.DISK_ONLY)
<console>:26: error: not found: value StorageLevel
       rdd.persist(StorageLevel.DISK_ONLY)
                   ^

scala> rdd.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level
  at org.apache.spark.rdd.RDD.persist(RDD.scala:174)
  at org.apache.spark.rdd.RDD.persist(RDD.scala:198)
  ... 47 elided

scala> val rdd2 = sc.parallelize(list)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:26

//上面的cache()会调用persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
scala> rdd2.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
res6: rdd2.type = ParallelCollectionRDD[1] at parallelize at <console>:26

相关文章

网友评论

      本文标题:011 Spark RDD

      本文链接:https://www.haomeiwen.com/subject/rxiqpktx.html