Spark API - RDD

作者: Grey____ | 来源:发表于2018-11-03 19:29 被阅读0次

转摘地址: Spark笔记:RDD基本操作(上)


RDD本质就是一个数组,因此构造数据时候使用的是List(链表)和Array(数组)类型。

/* 使用makeRDD创建RDD */
/* List */
val rdd01 = sc.makeRDD(List(1,2,3,4,5,6))
val r01 = rdd01.map { x => x * x }
println(r01.collect().mkString(","))
/* Array */
val rdd02 = sc.makeRDD(Array(1,2,3,4,5,6))
val r02 = rdd02.filter { x => x < 5}
println(r02.collect().mkString(","))
 
val rdd03 = sc.parallelize(List(1,2,3,4,5,6), 1)
val r03 = rdd03.map { x => x + 1 }
println(r03.collect().mkString(","))
/* Array */
val rdd04 = sc.parallelize(List(1,2,3,4,5,6), 1)
val r04 = rdd04.filter { x => x > 3 }
println(r04.collect().mkString(","))

RDD 转化操作

函数名 作用
map() 参数是函数,函数应用于RDD每一个元素,返回值是新的RDD
flatMap() 参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD
filter() 参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD
distinct() 没有参数,将RDD里的元素进行去重操作
union() 参数是RDD,生成包含两个RDD所有元素的新RDD
intersection() 参数是RDD,求出两个RDD的共同元素
subtract() 参数是RDD,将原RDD里和参数RDD里相同的元素去掉
cartesian() 参数是RDD,求两个RDD的笛卡儿积

RDD 行动操作

函数名 作用
collect() 返回RDD所有元素
count() RDD里元素个数
countByValue() 各元素在RDD中出现次数
reduce() 并行整合所有RDD数据,例如求和操作
fold(0)(func) 和reduce功能一样,不过fold带有初始值
aggregate(0)(seqOp,combop) 和reduce功能一样,但是返回的RDD数据类型和原RDD不一样
foreach(func) 对RDD每个元素都是使用特定函数

RDD 转化操作代码示例

val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
val rddFile:RDD[String] = sc.textFile(path, 1)
 
val rdd01:RDD[Int] = sc.makeRDD(List(1,3,5,3))
val rdd02:RDD[Int] = sc.makeRDD(List(2,4,5,1))
 
/* map操作 */
println("======map操作======")
println(rddInt.map(x => x + 1).collect().mkString(","))
println("======map操作======")
/* filter操作 */
println("======filter操作======")
println(rddInt.filter(x => x > 4).collect().mkString(","))
println("======filter操作======")
/* flatMap操作 */
println("======flatMap操作======")
println(rddFile.flatMap { x => x.split(",") }.first())
println("======flatMap操作======")
/* distinct去重操作 */
println("======distinct去重======")
println(rddInt.distinct().collect().mkString(","))
println(rddStr.distinct().collect().mkString(","))
println("======distinct去重======")
/* union操作 */
println("======union操作======")
println(rdd01.union(rdd02).collect().mkString(","))
println("======union操作======")
/* intersection操作 */
println("======intersection操作======")
println(rdd01.intersection(rdd02).collect().mkString(","))
println("======intersection操作======")
/* subtract操作 */
println("======subtract操作======")
println(rdd01.subtract(rdd02).collect().mkString(","))
println("======subtract操作======")
/* cartesian操作 */
println("======cartesian操作======")
println(rdd01.cartesian(rdd02).collect().mkString(","))
println("======cartesian操作======")

RDD 行动操作代码示例

val rddInt:RDD[Int] = sc.makeRDD(List(1,2,3,4,5,6,2,5,1))
val rddStr:RDD[String] = sc.parallelize(Array("a","b","c","d","b","a"), 1)
 
/* count操作 */
println("======count操作======")
println(rddInt.count())
println("======count操作======")  
/* countByValue操作 */
println("======countByValue操作======")
println(rddInt.countByValue())
println("======countByValue操作======")
/* reduce操作 */
println("======countByValue操作======")
println(rddInt.reduce((x ,y) => x + y))
println("======countByValue操作======")
/* fold操作 */
println("======fold操作======")
println(rddInt.fold(0)((x ,y) => x + y))
println("======fold操作======")
/* aggregate操作 */
println("======aggregate操作======")
val res:(Int,Int) = rddInt.aggregate((0,0))((x,y) => (x._1 + x._2,y),(x,y) => (x._1 + x._2,y._1 + y._2))
println(res._1 + "," + res._2)
println("======aggregate操作======")
/* foeach操作 */
println("======foeach操作======")
println(rddStr.foreach { x => println(x) })
println("======foeach操作======")

日常工作初涉:

    rdd.map(e => e._2.listCells().get(0).getValueLength).foreach(println(_))
    rdd.map(e=>{
      Bytes.toString(CellUtil.cloneValue(e._2.listCells().get(0)))
    }).foreach(println(_))

    val rdd2 = rdd.filter(e => e._2.listCells().get(0).getValueLength > 13).map(e => {
      val cells = e._2.listCells()
      val content = Bytes.toString(CellUtil.cloneValue(cells.get(0)))
//      println(content.substring(0, 13))
      (content.substring(0, 13), 1L)
    }).reduceByKey(_ + _).collect()

取出hbase数据,调用rdd.collect()方法出现序列化异常问题

问题:

18/11/11 17:14:15 ERROR Executor: Exception in task 0.0 in stage 1.0 (TID 1)
java.io.NotSerializableException: org.apache.hadoop.hbase.io.ImmutableBytesWritable
Serialization stack:
    - object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 38 39 31 36 33)
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
    - object (class scala.Tuple2, (38 39 31 36 33,keyvalues={111210/cf1:assigner/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:author/1541926899148/Put/vlen=0/seqid=0, 111210/cf1:canComment/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:channelNames/1541926899148/Put/vlen=12/seqid=0, 111210/cf1:content/1541926899148/Put/vlen=862/seqid=0, 111210/cf1:createTime/1541926899148/Put/vlen=23/seqid=0, 111210/cf1:creator/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:creatorName/1541926899148/Put/vlen=15/seqid=0, 111210/cf1:editor/1541926899148/Put/vlen=15/seqid=0, 111210/cf1:modifyTime/1541926899148/Put/vlen=22/seqid=0, 111210/cf1:newsType/1541926899148/Put/vlen=4/seqid=0, 111210/cf1:originTitle/1541926899148/Put/vlen=0/seqid=0, 111210/cf1:publishTime/1541926899148/Put/vlen=23/seqid=0, 111210/cf1:publisher/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:publisherName/1541926899148/Put/vlen=15/seqid=0, 111210/cf1:rejectTime/1541926899148/Put/vlen=21/seqid=0, 111210/cf1:responsibleEditor/1541926899148/Put/vlen=15/seqid=0, 111210/cf1:showType/1541926899148/Put/vlen=3/seqid=0, 111210/cf1:source/1541926899148/Put/vlen=12/seqid=0, 111210/cf1:status/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:summary/1541926899148/Put/vlen=0/seqid=0, 111210/cf1:tag/1541926899148/Put/vlen=6/seqid=0, 111210/cf1:title/1541926899148/Put/vlen=18/seqid=0, 111210/cf1:voicebroadcast/1541926899148/Put/vlen=1/seqid=0}))
    - element of array (index: 0)
    - array (class [Lscala.Tuple2;, size 495)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:393)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)
18/11/11 17:14:15 ERROR TaskSetManager: Task 0.0 in stage 1.0 (TID 1) had a not serializable result: org.apache.hadoop.hbase.io.ImmutableBytesWritable
Serialization stack:
    - object not serializable (class: org.apache.hadoop.hbase.io.ImmutableBytesWritable, value: 38 39 31 36 33)
    - field (class: scala.Tuple2, name: _1, type: class java.lang.Object)
    - object (class scala.Tuple2, (38 39 31 36 33,keyvalues={111210/cf1:assigner/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:author/1541926899148/Put/vlen=0/seqid=0, 111210/cf1:canComment/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:channelNames/1541926899148/Put/vlen=12/seqid=0, 111210/cf1:content/1541926899148/Put/vlen=862/seqid=0, 111210/cf1:createTime/1541926899148/Put/vlen=23/seqid=0, 111210/cf1:creator/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:creatorName/1541926899148/Put/vlen=15/seqid=0, 111210/cf1:editor/1541926899148/Put/vlen=15/seqid=0, 111210/cf1:modifyTime/1541926899148/Put/vlen=22/seqid=0, 111210/cf1:newsType/1541926899148/Put/vlen=4/seqid=0, 111210/cf1:originTitle/1541926899148/Put/vlen=0/seqid=0, 111210/cf1:publishTime/1541926899148/Put/vlen=23/seqid=0, 111210/cf1:publisher/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:publisherName/1541926899148/Put/vlen=15/seqid=0, 111210/cf1:rejectTime/1541926899148/Put/vlen=21/seqid=0, 111210/cf1:responsibleEditor/1541926899148/Put/vlen=15/seqid=0, 111210/cf1:showType/1541926899148/Put/vlen=3/seqid=0, 111210/cf1:source/1541926899148/Put/vlen=12/seqid=0, 111210/cf1:status/1541926899148/Put/vlen=1/seqid=0, 111210/cf1:summary/1541926899148/Put/vlen=0/seqid=0, 111210/cf1:tag/1541926899148/Put/vlen=6/seqid=0, 111210/cf1:title/1541926899148/Put/vlen=18/seqid=0, 111210/cf1:voicebroadcast/1541926899148/Put/vlen=1/seqid=0}))
    - element of array (index: 0)
    - array (class [Lscala.Tuple2;, size 495); not retrying

解决:sparkConf中指定序列化配置
val sparkConf = new SparkConf().setAppName("test").setMaster("local").set("spark.serializer", "org.apache.spark.serializer.KryoSerializer")


将一个rdd转换成多个rdd的flatMap

rdd
      .map(e => {
        val cells = e._2.listCells()
        val id = HbaseCommon.getLongValueByQualifier(cells, "content_id", false)

        //channelIds like "11,22,33,54"
        val channelIds = channelOfNewsMap.get(id.toLong).getOrElse("")
        channelIds.toString.split(",")
      })
      .flatMap(e => e)
      .map(e => {
        (e, 1L)
      })
      .reduceByKey(_ + _)
      .collect()
      .map(e => {
        val channelId = e._1
        val count = e._2
        println("channelId:" + channelId + " ,count:" + count)
      })

相关文章

网友评论

    本文标题:Spark API - RDD

    本文链接:https://www.haomeiwen.com/subject/qbflxqtx.html