Spark笔记:RDD基本操作(上)

作者: Alukar | 来源:发表于2018-06-07 17:53 被阅读31次

    本文主要是讲解spark里RDD的基础操作。RDD是spark特有的数据模型,谈到RDD就会提到什么弹性分布式数据集,什么有向无环图,本文暂时不去展开这些高深概念,在阅读本文时候,大家可以就把RDD当作一个数组,这样的理解对我们学习RDD的API是非常有帮助的。本文所有示例代码都是使用scala语言编写的。

      Spark里的计算都是操作RDD进行,那么学习RDD的第一个问题就是如何构建RDD,构建RDD从数据来源角度分为两类:第一类是从内存里直接读取数据,第二类就是从文件系统里读取,当然这里的文件系统种类很多常见的就是HDFS以及本地文件系统了。

      第一类方式从内存里构造RDD,使用的方法:makeRDD和parallelize方法,如下代码所示:

    /* 使用makeRDD创建RDD */

    /* List */

    valrdd01=sc.makeRDD(List(1,2,3,4,5,6))

    valr01=rdd01.map { x => x * x }

    println(r01.collect().mkString(","))

    /* Array */

    valrdd02=sc.makeRDD(Array(1,2,3,4,5,6))

    valr02=rdd02.filter { x => x < 5}

    println(r02.collect().mkString(","))

    valrdd03=sc.parallelize(List(1,2,3,4,5,6), 1)

    valr03=rdd03.map { x => x + 1}

    println(r03.collect().mkString(","))

    /* Array */

    valrdd04=sc.parallelize(List(1,2,3,4,5,6), 1)

    valr04=rdd04.filter { x => x > 3}

    println(r04.collect().mkString(","))

    大家看到了RDD本质就是一个数组,因此构造数据时候使用的是List(链表)和Array(数组)类型。

      第二类方式是通过文件系统构造RDD,代码如下所示:

    valrdd:RDD[String] =sc.textFile("file:///D:/sparkdata.txt", 1)

    valr:RDD[String] =rdd.flatMap { x => x.split(",") }

    println(r.collect().mkString(","))

      这里例子使用的是本地文件系统,所以文件路径协议前缀是file://。

      构造了RDD对象了,接下来就是如何操作RDD对象了,RDD的操作分为转化操作(transformation)和行动操作(action),RDD之所以将操作分成这两类这是和RDD惰性运算有关,当RDD执行转化操作时候,实际计算并没有被执行,只有当RDD执行行动操作时候才会促发计算任务提交,执行相应的计算操作。区别转化操作和行动操作也非常简单,转化操作就是从一个RDD产生一个新的RDD操作,而行动操作就是进行实际的计算。

      下面是RDD的基础操作API介绍:

    操作类型函数名作用

    转化操作map()参数是函数,函数应用于RDD每一个元素,返回值是新的RDD

    flatMap()参数是函数,函数应用于RDD每一个元素,将元素数据进行拆分,变成迭代器,返回值是新的RDD

    filter()参数是函数,函数会过滤掉不符合条件的元素,返回值是新的RDD

    distinct()没有参数,将RDD里的元素进行去重操作

    union()参数是RDD,生成包含两个RDD所有元素的新RDD

    intersection()参数是RDD,求出两个RDD的共同元素

    subtract()参数是RDD,将原RDD里和参数RDD里相同的元素去掉

    cartesian()参数是RDD,求两个RDD的笛卡儿积

    行动操作collect()返回RDD所有元素

    count()RDD里元素个数

    countByValue()各元素在RDD中出现次数

    reduce()并行整合所有RDD数据,例如求和操作

    fold(0)(func)和reduce功能一样,不过fold带有初始值

    aggregate(0)(seqOp,combop)和reduce功能一样,但是返回的RDD数据类型和原RDD不一样

    foreach(func)对RDD每个元素都是使用特定函数

      下面是以上API操作的示例代码,如下:

      转化操作:

    valrddInt:RDD[Int] =sc.makeRDD(List(1,2,3,4,5,6,2,5,1))

    valrddStr:RDD[String] =sc.parallelize(Array("a","b","c","d","b","a"), 1)

    valrddFile:RDD[String] =sc.textFile(path, 1)

    valrdd01:RDD[Int] =sc.makeRDD(List(1,3,5,3))

    valrdd02:RDD[Int] =sc.makeRDD(List(2,4,5,1))

    /* map操作 */

    println("======map操作======")

    println(rddInt.map(x => x + 1).collect().mkString(","))

    println("======map操作======")

    /* filter操作 */

    println("======filter操作======")

    println(rddInt.filter(x => x > 4).collect().mkString(","))

    println("======filter操作======")

    /* flatMap操作 */

    println("======flatMap操作======")

    println(rddFile.flatMap { x => x.split(",") }.first())

    println("======flatMap操作======")

    /* distinct去重操作 */

    println("======distinct去重======")

    println(rddInt.distinct().collect().mkString(","))

    println(rddStr.distinct().collect().mkString(","))

    println("======distinct去重======")

    /* union操作 */

    println("======union操作======")

    println(rdd01.union(rdd02).collect().mkString(","))

    println("======union操作======")

    /* intersection操作 */

    println("======intersection操作======")

    println(rdd01.intersection(rdd02).collect().mkString(","))

    println("======intersection操作======")

    /* subtract操作 */

    println("======subtract操作======")

    println(rdd01.subtract(rdd02).collect().mkString(","))

    println("======subtract操作======")

    /* cartesian操作 */

    println("======cartesian操作======")

    println(rdd01.cartesian(rdd02).collect().mkString(","))

    println("======cartesian操作======")

    行动操作代码如下:

    valrddInt:RDD[Int] =sc.makeRDD(List(1,2,3,4,5,6,2,5,1))

    valrddStr:RDD[String] =sc.parallelize(Array("a","b","c","d","b","a"), 1)

    /* count操作 */

    println("======count操作======")

    println(rddInt.count())

    println("======count操作======")  

    /* countByValue操作 */

    println("======countByValue操作======")

    println(rddInt.countByValue())

    println("======countByValue操作======")

    /* reduce操作 */

    println("======countByValue操作======")

    println(rddInt.reduce((x ,y) => x + y))

    println("======countByValue操作======")

    /* fold操作 */

    println("======fold操作======")

    println(rddInt.fold(0)((x ,y) => x + y))

    println("======fold操作======")

    /* aggregate操作 */

    println("======aggregate操作======")

    valres:(Int,Int) =rddInt.aggregate((0,0))((x,y) => (x._1+ x._2,y),(x,y) => (x._1+ x._2,y._1+ y._2))

    println(res._1+ ","+ res._2)

    println("======aggregate操作======")

    /* foeach操作 */

    println("======foeach操作======")

    println(rddStr.foreach { x => println(x) })

    println("======foeach操作======")

      RDD操作暂时先学习到这里,剩下的内容在下一篇里再谈了,下面我要说说如何开发spark,安装spark的内容我后面会使用专门的文章进行讲解,这里我们假设已经安装好了spark,那么我们就可以在已经装好的spark服务器上使用spark-shell进行与spark交互的shell,这里我们直接可以敲打代码编写spark程序。但是spark-shell毕竟使用太麻烦,而且spark-shell一次只能使用一个用户,当另外一个用户要使用spark-shell就会把前一个用户踢掉,而且shell也没有IDE那种代码补全,代码校验的功能,使用起来很是痛苦。

      不过spark的确是一个神奇的框架,这里的神奇就是指spark本地开发调试非常简单,本地开发调试不需要任何已经装好的spark系统,我们只需要建立一个项目,这个项目可以是java的也可以是scala,然后我们将spark-assembly-1.6.1-hadoop2.6.0.jar这样的jar放入项目的环境里,这个时候我们就可以在本地开发调试spark程序了。

      大家请看我们装有scala插件的eclipse里的完整代码:

    packagecn.com.sparktest

    importorg.apache.spark.SparkConf

    importorg.apache.spark.SparkConf

    importorg.apache.spark.SparkContext

    importorg.apache.spark.rdd.RDD

    objectSparkTest {

      valconf:SparkConf =newSparkConf().setAppName("xtq").setMaster("local[2]")

      valsc:SparkContext =newSparkContext(conf)

      /**

       * 创建数据的方式--从内存里构造数据(基础)

       */

      defcreateDataMethod():Unit ={

        /* 使用makeRDD创建RDD */

        /* List */

        valrdd01=sc.makeRDD(List(1,2,3,4,5,6))

        valr01=rdd01.map { x => x * x }

        println("===================createDataMethod:makeRDD:List=====================")

        println(r01.collect().mkString(","))

        println("===================createDataMethod:makeRDD:List=====================")

        /* Array */

        valrdd02=sc.makeRDD(Array(1,2,3,4,5,6))

        valr02=rdd02.filter { x => x < 5}

        println("===================createDataMethod:makeRDD:Array=====================")

        println(r02.collect().mkString(","))

        println("===================createDataMethod:makeRDD:Array=====================")

        /* 使用parallelize创建RDD */

        /* List */

        valrdd03=sc.parallelize(List(1,2,3,4,5,6), 1)

        valr03=rdd03.map { x => x + 1}

        println("===================createDataMethod:parallelize:List=====================")

        println(r03.collect().mkString(","))

        println("===================createDataMethod:parallelize:List=====================")

        /* Array */

        valrdd04=sc.parallelize(List(1,2,3,4,5,6), 1)

        valr04=rdd04.filter { x => x > 3}

        println("===================createDataMethod:parallelize:Array=====================")

        println(r04.collect().mkString(","))

        println("===================createDataMethod:parallelize:Array=====================")

      }

      /**

       * 创建Pair Map

       */

      defcreatePairRDD():Unit ={

        valrdd:RDD[(String,Int)] =sc.makeRDD(List(("key01",1),("key02",2),("key03",3)))

        valr:RDD[String] =rdd.keys

        println("===========================createPairRDD=================================")

        println(r.collect().mkString(","))

        println("===========================createPairRDD=================================")

      }

      /**

       * 通过文件创建RDD

       * 文件数据:

       *    key01,1,2.3

              key02,5,3.7

          key03,23,4.8

          key04,12,3.9

          key05,7,1.3

       */

      defcreateDataFromFile(path:String):Unit ={

        valrdd:RDD[String] =sc.textFile(path, 1)

        valr:RDD[String] =rdd.flatMap { x => x.split(",") }

        println("=========================createDataFromFile==================================")

        println(r.collect().mkString(","))

        println("=========================createDataFromFile==================================")

      }

      /**

       * 基本的RDD操作

       */

      defbasicTransformRDD(path:String):Unit ={

        valrddInt:RDD[Int] =sc.makeRDD(List(1,2,3,4,5,6,2,5,1))

        valrddStr:RDD[String] =sc.parallelize(Array("a","b","c","d","b","a"), 1)

        valrddFile:RDD[String] =sc.textFile(path, 1)

        valrdd01:RDD[Int] =sc.makeRDD(List(1,3,5,3))

        valrdd02:RDD[Int] =sc.makeRDD(List(2,4,5,1))

        /* map操作 */

        println("======map操作======")

        println(rddInt.map(x => x + 1).collect().mkString(","))

        println("======map操作======")

        /* filter操作 */

        println("======filter操作======")

        println(rddInt.filter(x => x > 4).collect().mkString(","))

        println("======filter操作======")

        /* flatMap操作 */

        println("======flatMap操作======")

        println(rddFile.flatMap { x => x.split(",") }.first())

        println("======flatMap操作======")

        /* distinct去重操作 */

        println("======distinct去重======")

        println(rddInt.distinct().collect().mkString(","))

        println(rddStr.distinct().collect().mkString(","))

        println("======distinct去重======")

        /* union操作 */

        println("======union操作======")

        println(rdd01.union(rdd02).collect().mkString(","))

        println("======union操作======")

        /* intersection操作 */

        println("======intersection操作======")

        println(rdd01.intersection(rdd02).collect().mkString(","))

        println("======intersection操作======")

        /* subtract操作 */

        println("======subtract操作======")

        println(rdd01.subtract(rdd02).collect().mkString(","))

        println("======subtract操作======")

        /* cartesian操作 */

        println("======cartesian操作======")

        println(rdd01.cartesian(rdd02).collect().mkString(","))

        println("======cartesian操作======")   

      }

      /**

       * 基本的RDD行动操作

       */

      defbasicActionRDD():Unit ={

        valrddInt:RDD[Int] =sc.makeRDD(List(1,2,3,4,5,6,2,5,1))

        valrddStr:RDD[String] =sc.parallelize(Array("a","b","c","d","b","a"), 1)

        /* count操作 */

        println("======count操作======")

        println(rddInt.count())

        println("======count操作======")  

        /* countByValue操作 */

        println("======countByValue操作======")

        println(rddInt.countByValue())

        println("======countByValue操作======")

        /* reduce操作 */

        println("======countByValue操作======")

        println(rddInt.reduce((x ,y) => x + y))

        println("======countByValue操作======")

        /* fold操作 */

        println("======fold操作======")

        println(rddInt.fold(0)((x ,y) => x + y))

        println("======fold操作======")

        /* aggregate操作 */

        println("======aggregate操作======")

        valres:(Int,Int) =rddInt.aggregate((0,0))((x,y) => (x._1+ x._2,y),(x,y) => (x._1+ x._2,y._1+ y._2))

        println(res._1+ ","+ res._2)

        println("======aggregate操作======")

        /* foeach操作 */

        println("======foeach操作======")

        println(rddStr.foreach { x => println(x) })

        println("======foeach操作======")   

      }

      defmain(args:Array[String]):Unit ={

        println(System.getenv("HADOOP_HOME"))

        createDataMethod()

        createPairRDD()

        createDataFromFile("file:///D:/sparkdata.txt")

        basicTransformRDD("file:///D:/sparkdata.txt")

        basicActionRDD()

        /*打印结果*/

        /*D://hadoop

    ===================createDataMethod:makeRDD:List=====================

    1,4,9,16,25,36

    ===================createDataMethod:makeRDD:List=====================

    ===================createDataMethod:makeRDD:Array=====================

    1,2,3,4

    ===================createDataMethod:makeRDD:Array=====================

    ===================createDataMethod:parallelize:List=====================

    2,3,4,5,6,7

    ===================createDataMethod:parallelize:List=====================

    ===================createDataMethod:parallelize:Array=====================

    4,5,6

    ===================createDataMethod:parallelize:Array=====================

    ===========================createPairRDD=================================

    key01,key02,key03

    ===========================createPairRDD=================================

    key01,1,2.3,key02,5,3.7,key03,23,4.8,key04,12,3.9,key05,7,1.3

    =========================createDataFromFile==================================

    2,3,4,5,6,7,3,6,2

    ======map操作======

    ======filter操作======

    5,6,5

    ======filter操作======

    ======flatMap操作======

    key01

    ======flatMap操作======

    ======distinct去重======

    4,6,2,1,3,5

    ======distinct去重======

    ======union操作======

    1,3,5,3,2,4,5,1

    ======union操作======

    ======intersection操作======

    1,5

    ======intersection操作======

    ======subtract操作======

    3,3

    ======subtract操作======

    ======cartesian操作======

    (1,2),(1,4),(3,2),(3,4),(1,5),(1,1),(3,5),(3,1),(5,2),(5,4),(3,2),(3,4),(5,5),(5,1),(3,5),(3,1)

    ======cartesian操作======

    ======count操作======

    9

    ======count操作======

    ======countByValue操作======

    Map(5 -> 2, 1 -> 2, 6 -> 1, 2 -> 2, 3 -> 1, 4 -> 1)

    ======countByValue操作======

    ======countByValue操作======

    29

    ======countByValue操作======

    ======fold操作======

    29

    ======fold操作======

    ======aggregate操作======

    19,10

    ======aggregate操作======

    ======foeach操作======

    a

    b

    c

    d

    b

    a

    ======foeach操作======*/

      }

    }

      Spark执行时候我们需要构造一个SparkContenxt的环境变量,构造环境变量时候需要构造一个SparkConf对象,例如代码:setAppName("xtq").setMaster("local[2]")

      appName就是spark任务名称,master为local[2]是指使用本地模式,启动2个线程完成spark任务。

      在eclipse里运行spark程序时候,会报出如下错误:

    java.io.IOException:Could not locate executable null\bin\winutils.exe in the Hadoop binaries.

        at org.apache.hadoop.util.Shell.getQualifiedBinPath(Shell.java:355)

        at org.apache.hadoop.util.Shell.getWinUtilsPath(Shell.java:370)

        at org.apache.hadoop.util.Shell.(Shell.java:363)

        at org.apache.hadoop.util.StringUtils.(StringUtils.java:79)

        at org.apache.hadoop.security.Groups.parseStaticMapping(Groups.java:104)

        at org.apache.hadoop.security.Groups.(Groups.java:86)

        at org.apache.hadoop.security.Groups.(Groups.java:66)

        at org.apache.hadoop.security.Groups.getUserToGroupsMappingService(Groups.java:280)

        at org.apache.hadoop.security.UserGroupInformation.initialize(UserGroupInformation.java:271)

        at org.apache.hadoop.security.UserGroupInformation.ensureInitialized(UserGroupInformation.java:248)

        at org.apache.hadoop.security.UserGroupInformation.loginUserFromSubject(UserGroupInformation.java:763)

        at org.apache.hadoop.security.UserGroupInformation.getLoginUser(UserGroupInformation.java:748)

        at org.apache.hadoop.security.UserGroupInformation.getCurrentUser(UserGroupInformation.java:621)

        at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2160)

        at org.apache.spark.util.Utils$$anonfun$getCurrentUserName$1.apply(Utils.scala:2160)

        at scala.Option.getOrElse(Option.scala:120)

        at org.apache.spark.util.Utils$.getCurrentUserName(Utils.scala:2160)

        at org.apache.spark.SparkContext.(SparkContext.scala:322)

        at cn.com.sparktest.SparkTest$.(SparkTest.scala:10)

        at cn.com.sparktest.SparkTest$.(SparkTest.scala)

        at cn.com.sparktest.SparkTest.main(SparkTest.scala)

      该错误不会影响程序的运算,但总是让人觉得不舒服,这个问题是因为spark运行依赖于hadoop,可是在window下其实是无法安装hadoop,只能使用cygwin模拟安装,而新版本的hadoop在windows下使用需要使用winutils.exe,解决这个问题很简单,就是下载一个winutils.exe,注意下自己操作系统是32位还是64位,找到对应版本,然后放置在这样的目录下:

      D:\hadoop\bin\winutils.exe

      然后再环境变量里定义HADOOP_HOME= D:\hadoop

      环境变量的改变要重启eclipse,这样环境变量才会生效,这个时候程序运行就不会报出错误了。

    想学习大数据或者想学习大数据的朋友,我整理了一套大数据的学习视频免费分享给大家,从入门到实战都有,大家可以加微信:Lxiao_28获取,还可以入微信群交流!(备注领取资料,真实有效)。

    相关文章

      网友评论

        本文标题:Spark笔记:RDD基本操作(上)

        本文链接:https://www.haomeiwen.com/subject/hdxnsftx.html