美文网首页
Spark-RDD介绍

Spark-RDD介绍

作者: edwin1993 | 来源:发表于2018-06-05 17:29 被阅读0次

    RDD

    1 RDD介绍
    • Driver program:
      包含程序的main()方法,RDDs的定义和操作。
      管理节点,我们称为executors。
    • SparkContext:
      Driver programs 通过SparkContext对象访问Spark
      SparkContext对象代表和一个集群的链接。
      在shell中SparkContext是自动创建的,就是sc变量。

    • RDDs
      Resilient distributed datasets(弹性分布式数据集)。
      并行的分布在整个集群中。
      RDDs是spark分发数据和计算的基础抽象类。
      一个RDD是一个不可改变的分布式集合对象。
      spark中所有的计算都是通过RDD的生成、转换、操作、进行的。
      RDD内部由很多partition(分片)组成。

    • 分片
      每个分片包含一部分数据,partitions可以在集群不同的结点上计算。
      分片是Spark并行处理的单元,Sparks顺序的、并行的处理分片。

    RDD的创建方法

    把存在的集合传递给SparkContext的parallelize()方法,一般测试使用。

    val rdd = sc.parallelize(Array(1,2,3,4),4)
    //参数1:待处理集合
    //参数2:分区个数
    

    需要注意rdd的操作都在worker机上,因此输出rdd的元素将在worker机的标准输出上进行,驱动节点上不会运行,故直接在程序print是没有输出的。需要先对各个worker上的内容进行collect

    另一种是加载外部文件

    val rddText = sc.textFile("filepath")
    
    Scala基础语法
    • 变量声明:
      val 或者 var
      val变量不可修改,一旦分配就不能重新指向。
      var分配后可以重新指向相同类型的值。

    • 匿名函数和类型推断:
      lines.filter(line =>line.contains("world")
      括号内是匿名函数,接受一个参数line
      使用line这个String类型上的方法contains,并返回结果
      line的类型无需指定,会推断出来。

    2 RDD的基本操作Transformation

    Transformation意思是转换。
    从之前的RDD构建一个新的RDD,如map()和filter()。

    逐元素运算
    • map()
      接收函数,将函数应用到RDD的每个元素,返回新的RDD。

    • filter()
      接收函数,返回只包含满足filter函数的元素的新RDD。

    • flatMap()
      对每个输入的元素,输出多个元素。将RDD的元素压扁后输出新的RDD。

    \\读入文件
    scala> val input = sc.textFile("edwintest/inputfile2.txt")
    
    scala> input.collect().foreach(print)
    one two threefour five sixseven eight nightten 
                        
    \\通过map转为(元素,1)格式             
    scala> val mapInput = input.map(word=>(word,1))
    scala> mapInput.collect().foreach(print)
    (one two three,1)(four five six,1)(seven eight night,1)(ten,1)              
    
     \\通过flatMap将split后的元素压缩
    scala> val inputSplit = input.flatMap(line=>line.split(" "))
    scala> inputSplit.collect.foreach(print)
    onetwothreefourfivesixseveneightnightten  
    
    
    集合运算

    RDDs支持数学的集合计算,如并集交集等。

    scala> val rdd1 = sc.parallelize(Array("1","2","3","1","4","3"))
    
    scala> val rdd2 = sc.parallelize(Array("1","2","5","6"))
    
    \\去重
    scala> val rdd_distinct = rdd1.distinct()
    scala> rdd_distinct.collect().foreach(print)
    4231              
    
    \\并
    scala> val rdd_union = rdd1.union(rdd2)
    scala> rdd_union.collect().foreach(println)
    1231431256 
    
    \\交
    scala> val rdd_inter = rdd1.intersection(rdd2)
    scala> rdd_inter.collect().foreach(print)
    21
    
    \\rdd1独有
    scala> val rdd_sub = rdd1.subtract(rdd2)
    scala> rdd_sub.collect().foreach(print)
    433  
    
    

    3 RDD的基本操作Action

    Action在RDD上计算出一个结果,并将结果返回给Driver program,如count(),save

    • reduce()
      接收一个函数,作用在RDD两个类型相同的元素上,返回新元素。可以实现RDD元素中的累加,计数和其它类型的聚集操作。
    scala>  val rdd = sc.parallelize(Array(1,2,3,3))
    rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[6] at parallelize at <console>:27
    
    scala> rdd.collect()
    res3: Array[Int] = Array(1, 2, 3, 3)
    
    scala> rdd.reduce((x,y)=>x+y)
    res4: Int = 9
    
    
    • collect()
      遍历整个RDD,向Driver program返回RDD的内容。
      返回的内容需要单机内容能够容纳。
      大数据情况下,使用saveAsTextFile() action等。

    • take(n)
      返回RDD的n个元素(尝试访问最少的partitions)
      返回结果无序,常用于测试使用

    scala> rdd.take(2)
    res5: Array[Int] = Array(1, 2)
    
    scala> rdd.take(3)
    res6: Array[Int] = Array(1, 2, 3)
    
    
    • top()
      排序(根据RDD中的数据比较器)
      也可以自定义比较器。
    scala> rdd.top(1)
    res7: Array[Int] = Array(3)                                                     
    
    scala> rdd.top(3)
    res8: Array[Int] = Array(3, 3, 2)
    
    
    • foreach()
      计算RDD中的每一个元素,但是不返回到本地
    4 RDD的特性
    • RDDs的血统图:
      Spark维护着RDDs之间的依赖关系和创建关系,叫做血统关系图。Spark通过血统关系图计算RDD的需求并对丢失的数据进行恢复。
    • 延迟计算
      Spark对RDDs的计算并不是在进行函数操作时进行计算的,而是在使用action时才计算。
      可以有效的减少数据传输。
      Spark内部记录metadata 表明transformation的操作已经响应。
      加载数据也会延迟计算,只有在数据需要时才会读入内存。

    • RDD.persist()
      RDD的持久化
      默认RDDs上面进行action时,Spark都重新计算RDDs
      如果想要重复使用一个RDD,则可以使用RDD.presist()
      unpresist()方法可以将缓存移除。

    参数决定优先度。

    6 KeyValue对RDDs
    • 创建KeyValue对RDDs
      使用map函数,返回key-value对
    scala> input.collect().foreach(print)
    one two threefour five sixseven eight nightten                                  
    scala> val rdd1 = input.map(line=>(line.split(" ")(0),line))
    scala> rdd1.collect().foreach(println)
    (one,one two three)                                                             
    (four,four five six)
    (seven,seven eight night)
    (ten,ten)
    
    
    • key-value对的一些常见操作

    现在keys和values已经变为属性,不需要加()

    • reduceByKey()
      将相同key的元素相加。
    scala> val rdd = sc.parallelize(Array((1,2),(3,4),(3,6)))
    scala> val rdd2 = rdd.reduceByKey((x,y)=>x+y)
    scala> rdd2.collect().foreach(println)
    (1,2)                                                                           
    (3,10)
    
    
    • groupByKey()
      将相同key的元素分为一组
    scala> val rdd3 = rdd.groupByKey()
    scala> rdd3.collect().foreach(println)
    (1,CompactBuffer(2))                                                            
    (3,CompactBuffer(4, 6))
    
    • combineByKey()
      (createCombiner,mergeValue,mergeCombiner,partitioner)
      非常重要且常用的函数,是最常用的基于key的聚合函数,返回类型可以和输入类型不一样。很多基于key的聚合函数都用到了它,如groupByKey()。

    combineByKey()
    遍历partition中的元素,元素的key。如果是新元素,则使用我们在参数中提供的createCombiner()函数,如果是这个partition中已经存在的key,就会使用mergeValue()函数。
    最后合集每个partition的时候,使用mergeCombiner()函数

    scala> val scores = sc.parallelize(Array(("jack",80.0),("jack",90.0),("jack",85.0),("mike",80.0),("mike",92.0),("mike",90.0)))
    scala> scores.collect().foreach(print)
    
    (jack,80.0)(jack,90.0)(jack,85.0)(mike,80.0)(mike,92.0)(mike,90.0) 
    
    val result = scores.combineByKey(score=>(1,score),(c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore),(c1:(Int,Double),c2:(Int,Double))=>(c1._1+c2._1,c1._2+c2._2))
    
    scala> result.collect().foreach(print)
    (mike,(3,262.0))(jack,(3,255.0))  
    
    //计算平均数
    scala> val aver = result.map{case(name,(num,score))=>(name,score/num)}
    
    scala> aver.collect().foreach(print)
    (mike,87.33333333333333)(jack,85.0) 
    
    

    createCombiner()中定义的匿名函数,
    第一个参数:createCombiner
    score=>(1,score)
    score为分数,因为ByKey相关的函数传进的参数都是key后面的内容。

    第二个参数:mergeValue
    (c1:(Int,Double),newScore)=>(c1._1+1,c1._2+newScore)
    c1中Int为出现次数,Double为分数累加。c1为key-value形式,是createCombiner的结果。

    第三个参数:mergeCombiner
    (c1:(Int,Double),c2:(Int,Double))=>(c1._1+c2._1,c1._2+c2._2)
    将各个partition的mergeValue进行组合。

    相关文章

      网友评论

          本文标题:Spark-RDD介绍

          本文链接:https://www.haomeiwen.com/subject/odcpsftx.html