一、转换和动作
1.1 Transformation
常见的5种转换为filter(func)、map(func)、flatMap(func)、groupByKey、reduceByKey(func)
scala> val lines = sc.textFile("file:///home/hadoop/word.txt")
lines: org.apache.spark.rdd.RDD[String] = file:///home/hadoop/word.txt MapPartitionsRDD[3] at textFile at <console>:24
scala> val linesWithSpark = lines.filter(line => line.contains("Spark"))
linesWithSpark: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[5] at filter at <console>:25
scala> val data = Array(1,2,3,4,5)
data: Array[Int] = Array(1, 2, 3, 4, 5)
scala> val rdd1 = sc.parallelize(data)
rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:26
scala> val rdd2 = rdd1.map(x => x + 10)
rdd2: org.apache.spark.rdd.RDD[Int] = MapPartitionsRDD[7] at map at <console>:25
scala> rdd2.collect
res4: Array[Int] = Array(11, 12, 13, 14, 15)
scala> val linesWithSpark = lines.map(line => line.split(" "))
linesWithSpark: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[8] at map at <console>:25
scala> linesWithSpark.collect
res5: Array[Array[String]] = Array(Array(Hadoop, is, good), Array(Spark, is, fast), Array(Spark, is, better))
scala> val linesSplit = lines.flatMap(line => line.split(" "))
linesSplit: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[9] at flatMap at <console>:25
scala> linesSplit.collect()
res6: Array[String] = Array(Hadoop, is, good, Spark, is, fast, Spark, is, better)
scala> val v = Array("one","two","two","three","three","three","three")
v: Array[String] = Array(one, two, two, three, three, three, three)
scala> val v2 = v.map(one => (one, 1))
v2: Array[(String, Int)] = Array((one,1), (two,1), (two,1), (three,1), (three,1), (three,1), (three,1))
scala> val v2 = sc.parallelize(v).map(one => (one, 1))
v2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[1] at map at <console>:26
//********************下面展示了groupByKey操作
scala> val reduceByResult = v2.reduceByKey(_+_)
reduceByResult: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[2] at reduceByKey at <console>:25
scala> reduceByResult.collect()
res1: Array[(String, Int)] = Array((two,2), (one,1), (three,4))
//********************下面展示了groupByKey操作
scala> val groupByResult = v2.groupByKey().map(t => (t._1, t._2.sum))
groupByResult: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[4] at map at <console>:25
scala> v2.groupByKey().map(t => (t._1, t._2.sum)).foreach(x => println(x._1+","x._2))
<console>:1: error: ')' expected but '.' found.
v2.groupByKey().map(t => (t._1, t._2.sum)).foreach(x => println(x._1+","x._2))
^
scala> v2.groupByKey().map(t => (t._1, t._2.sum)).foreach(x => println(x._1+","+x._2))
two,2
one,1
three,4
1.2 Action
常用的几个action为:count、collect、first、take、reduce(func)、foreach(func)
例子:
scala> val rdd = sc.parallelize(Array(1,2,3,4,5))
rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[8] at parallelize at <console>:24
scala> rdd.count
res3: Long = 5
scala> rdd.first
res4: Int = 1
scala> rdd.take(3)
res5: Array[Int] = Array(1, 2, 3)
scala> rdd.reduce(_+_)
res6: Int = 15
scala> rdd.collect
res7: Array[Int] = Array(1, 2, 3, 4, 5)
scala> rdd.foreach(println(_))
1
2
3
4
5
二、持久化操作
scala> val list = List("Hadoop","Spark","Hive")
list: List[String] = List(Hadoop, Spark, Hive)
scala> val rdd = sc.parallelize(list)
rdd: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> rdd.cache
res0: rdd.type = ParallelCollectionRDD[0] at parallelize at <console>:26
scala> println(rdd.count)
3
scala> println(rdd.collect().mkString(","))
Hadoop,Spark,Hive
scala> println(rdd.collect().mkString("[",",","]"))
[Hadoop,Spark,Hive]
scala> rdd.persist(StorageLevel.DISK_ONLY)
<console>:26: error: not found: value StorageLevel
rdd.persist(StorageLevel.DISK_ONLY)
^
scala> rdd.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
java.lang.UnsupportedOperationException: Cannot change storage level of an RDD after it was already assigned a level
at org.apache.spark.rdd.RDD.persist(RDD.scala:174)
at org.apache.spark.rdd.RDD.persist(RDD.scala:198)
... 47 elided
scala> val rdd2 = sc.parallelize(list)
rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[1] at parallelize at <console>:26
//上面的cache()会调用persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
scala> rdd2.persist(org.apache.spark.storage.StorageLevel.DISK_ONLY)
res6: rdd2.type = ParallelCollectionRDD[1] at parallelize at <console>:26
网友评论