转摘地址: Spark笔记:RDD基本操作(上)
RDD本质就是一个数组,因此构造数据时候使用的是List(链表)和Array(数组)类型。
/* 使用makeRDD创建RDD */
/* List */
val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
val r01 = rdd01.map { x => x * x }
println(r01.collect().mkString(","))
/* Array */
val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))
val r02 = rdd02.filter { x => x < 5}
println(r02.collect().mkString(","))
val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1)
val r03 = rdd03.map { x => x + 1 }
println(r03.collect().mkString(","))
/* Array */
val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1)
val r04 = rdd04.filter { x => x > 3 }
println(r04.collect().mkString(","))
RDD 转化操作
函数名 | 作用 |
---|---|
map() | 参数是函数,函数应用于RDD每一个元素,返回值是新的RDD |
flatMap() | 参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD |
filter() | 参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD |
distinct() | 没有参数,将RDD里的元素进行去重操作 |
union() | 参数是RDD,生成包含两个RDD所有元素的新RDD |
intersection() | 参数是RDD,求出两个RDD的共同元素 |
subtract() | 参数是RDD,将原RDD里和参数RDD里相同的元素去掉 |
cartesian() | 参数是RDD,求两个RDD的笛卡儿积 |
RDD 行动操作
函数名 | 作用 |
---|---|
collect() | 返回RDD所有元素 |
count() | RDD里元素个数 |
countByValue() | 各元素在RDD中出现次数 |
reduce() | 并行整合所有RDD数据,例如求和操作 |
fold(0)(func) | 和reduce功能一样,不过fold带有初始值 |
aggregate(0)(seqOp,combop) | 和reduce功能一样,但是返回的RDD数据类型和原RDD不一样 |
foreach(func) | 对RDD每个元素都是使用特定函数 |
RDD 转化操作代码示例
val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
val rddFile:RDD[String] = sc.textFile(path, 1)
val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))
/* map操作 */
println("======map操作======")
println(rddInt.map(x => x + 1).collect().mkString(","))
println("======map操作======")
/* filter操作 */
println("======filter操作======")
println(rddInt.filter(x => x > 4).collect().mkString(","))
println("======filter操作======")
/* flatMap操作 */
println("======flatMap操作======")
println(rddFile.flatMap { x => x.split(",") }.first())
println("======flatMap操作======")
/* distinct去重操作 */
println("======distinct去重======")
println(rddInt.distinct().collect().mkString(","))
println(rddStr.distinct().collect().mkString(","))
println("======distinct去重======")
/* union操作 */
println("======union操作======")
println(rdd01.union(rdd02).collect().mkString(","))
println("======union操作======")
/* intersection操作 */
println("======intersection操作======")
println(rdd01.intersection(rdd02).collect().mkString(","))
println("======intersection操作======")
/* subtract操作 */
println("======subtract操作======")
println(rdd01.subtract(rdd02).collect().mkString(","))
println("======subtract操作======")
/* cartesian操作 */
println("======cartesian操作======")
println(rdd01.cartesian(rdd02).collect().mkString(","))
println("======cartesian操作======")
RDD 行动操作代码示例
val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
/* count操作 */
println("======count操作======")
println(rddInt.count())
println("======count操作======")
/* countByValue操作 */
println("======countByValue操作======")
println(rddInt.countByValue())
println("======countByValue操作======")
/* reduce操作 */
println("======countByValue操作======")
println(rddInt.reduce((x ,y) => x + y))
println("======countByValue操作======")
/* fold操作 */
println("======fold操作======")
println(rddInt.fold(0)((x ,y) => x + y))
println("======fold操作======")
/* aggregate操作 */
println("======aggregate操作======")
val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2))
println(res._1 + "," + res._2)
println("======aggregate操作======")
/* foeach操作 */
println("======foeach操作======")
println(rddStr.foreach { x => println(x) })
println("======foeach操作======")
日常工作初涉:
rdd.map(e => e._2.listCells().get(0).getValueLength).foreach(println(_))
rdd.map(e=>{
Bytes.toString(CellUtil.cloneValue(e._2.listCells().get(0)))
}).foreach(println(_))
val rdd2 = rdd.filter(e => e._2.listCells().get(0).getValueLength > 13).map(e => {
val cells = e._2.listCells()
val content = Bytes.toString(CellUtil.cloneValue(cells.get(0)))
// println(content.substring(0, 13))
(content.substring(0, 13), 1L)
}).reduceByKey(_ + _).collect()
取出hbase数据,调用rdd.collect()
方法出现序列化异常问题
问题:
18/11/11 17:14:15 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
Serialization stack:
- object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 38 39 31 36 33)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (38 39 31 36 33,keyvalues={111210/cf1:assigner/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:author/1541926899148/Put/vlen=0/seqid=0, 111210/cf1:canComment/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:channelNames/1541926899148/Put/vlen=12/seqid=0, 111210/cf1:content/1541926899148/Put/vlen=862/seqid=0, 111210/cf1:createTime/1541926899148/Put/vlen=23/seqid=0, 111210/cf1:creator/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:creatorName/1541926899148/Put/vlen=15/seqid=0, 111210/cf1:editor/1541926899148/Put/vlen=15/seqid=0, 111210/cf1:modifyTime/1541926899148/Put/vlen=22/seqid=0, 111210/cf1:newsType/1541926899148/Put/vlen=4/seqid=0, 111210/cf1:originTitle/1541926899148/Put/vlen=0/seqid=0, 111210/cf1:publishTime/1541926899148/Put/vlen=23/seqid=0, 111210/cf1:publisher/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:publisherName/1541926899148/Put/vlen=15/seqid=0, 111210/cf1:rejectTime/1541926899148/Put/vlen=21/seqid=0, 111210/cf1:responsibleEditor/1541926899148/Put/vlen=15/seqid=0, 111210/cf1:showType/1541926899148/Put/vlen=3/seqid=0, 111210/cf1:source/1541926899148/Put/vlen=12/seqid=0, 111210/cf1:status/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:summary/1541926899148/Put/vlen=0/seqid=0, 111210/cf1:tag/1541926899148/Put/vlen=6/seqid=0, 111210/cf1:title/1541926899148/Put/vlen=18/seqid=0, 111210/cf1:voicebroadcast/1541926899148/Put/vlen=1/seqid=0}))
- element of array (index: 0)
- array (class [Lscala.Tuple2;, size 495)
at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:393)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
18/11/11 17:14:15 ERROR TaskSetManager: Task 0.0 in stage 1.0 (TID 1) had a not serializable result: org.apache.hadoop.hbase.io.ImmutableBytesWritable
Serialization stack:
- object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 38 39 31 36 33)
- field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
- object (class scala.Tuple2, (38 39 31 36 33,keyvalues={111210/cf1:assigner/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:author/1541926899148/Put/vlen=0/seqid=0, 111210/cf1:canComment/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:channelNames/1541926899148/Put/vlen=12/seqid=0, 111210/cf1:content/1541926899148/Put/vlen=862/seqid=0, 111210/cf1:createTime/1541926899148/Put/vlen=23/seqid=0, 111210/cf1:creator/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:creatorName/1541926899148/Put/vlen=15/seqid=0, 111210/cf1:editor/1541926899148/Put/vlen=15/seqid=0, 111210/cf1:modifyTime/1541926899148/Put/vlen=22/seqid=0, 111210/cf1:newsType/1541926899148/Put/vlen=4/seqid=0, 111210/cf1:originTitle/1541926899148/Put/vlen=0/seqid=0, 111210/cf1:publishTime/1541926899148/Put/vlen=23/seqid=0, 111210/cf1:publisher/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:publisherName/1541926899148/Put/vlen=15/seqid=0, 111210/cf1:rejectTime/1541926899148/Put/vlen=21/seqid=0, 111210/cf1:responsibleEditor/1541926899148/Put/vlen=15/seqid=0, 111210/cf1:showType/1541926899148/Put/vlen=3/seqid=0, 111210/cf1:source/1541926899148/Put/vlen=12/seqid=0, 111210/cf1:status/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:summary/1541926899148/Put/vlen=0/seqid=0, 111210/cf1:tag/1541926899148/Put/vlen=6/seqid=0, 111210/cf1:title/1541926899148/Put/vlen=18/seqid=0, 111210/cf1:voicebroadcast/1541926899148/Put/vlen=1/seqid=0}))
- element of array (index: 0)
- array (class [Lscala.Tuple2;, size 495); not retrying
解决:sparkConf
中指定序列化配置
val sparkConf = new SparkConf().setAppName("test").setMaster("local").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
将一个rdd转换成多个rdd的flatMap
rdd
.map(e => {
val cells = e._2.listCells()
val id = HbaseCommon.getLongValueByQualifier(cells, "content_id", false)
//channelIds like "11,22,33,54"
val channelIds = channelOfNewsMap.get(id.toLong).getOrElse("")
channelIds.toString.split(",")
})
.flatMap(e => e)
.map(e => {
(e, 1L)
})
.reduceByKey(_ + _)
.collect()
.map(e => {
val channelId = e._1
val count = e._2
println("channelId:" + channelId + " ,count:" + count)
})
网友评论