RDD(二)

作者: 436048bfc6a1 | 来源:发表于2019-03-09 16:36 被阅读0次
    1. class RDD源码解析

    1.1 RDD源码

    abstract class RDD[T: ClassTag](
        @transient private var _sc: SparkContext,
        @transient private var deps: Seq[Dependency[_]]
      ) extends Serializable with Logging {......}
    

    1.2 RDD类解释

    (1) RDD是抽象类,需要实现
    (2) 继承了序列化,因为要进行网络传输
    (3) Serializable 和 Loggin 都是trait,
        正常写法是 with Serializable with Loggin    
        但是由于scala特性,第一个trait可以写extends,不用写with
    

    1.3 RDD class中如何体现RDD的五大特性(主要是前三大特性)

    (1) 第一大特性(A list of partitions)

    注释

    Implemented by subclasses to return the set of partitions in this RDD. This method will only be called once, so it is safe to implement a time-consuming computation in it. The partitions in this array must satisfy the following property: rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }

    源码

    protected def getPartitions: Array[Partition]
    

    (2) 第二大特性(A function for computing each split)

    注释

    Implemented by subclasses to compute a given partition.

    源码

    def compute(split: Partition, context: TaskContext): Iterator[T]
    

    (3) 第三大特性(A list of dependencies on other RDDs)

    注释

    Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only be called once, so it is safe to implement a time-consuming computation in it.

    源码

    //Seq[Dependency[_]代表集合,指多个Dependencies
    protected def getDependencies: Seq[Dependency[_]] = deps
    

    1.4 HadoopRDD

    1.4.1 源码

    class HadoopRDD[K, V](
        sc: SparkContext,
        broadcastedConf: Broadcast[SerializableConfiguration],
        initLocalJobConfFuncOpt: Option[JobConf => Unit],
        inputFormatClass: Class[_ <: InputFormat[K, V]],
        keyClass: Class[K],
        valueClass: Class[V],
        minPartitions: Int)
      extends RDD[(K, V)](sc, Nil) with Logging{
        //返回分区数组
        override def getPartitions: Array[Partition] = {
          val jobConf = getJobConf()
          SparkHadoopUtil.get.addCredentials(jobConf)
          try {
            val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
            val inputSplits = if (ignoreEmptySplits) {
              allInputSplits.filter(_.getLength > 0)
            } else {
              allInputSplits
            }
            val array = new Array[Partition](inputSplits.size)
            for (i <- 0 until inputSplits.size) {
              array(i) = new HadoopPartition(id, i, inputSplits(i))
            }
            array
          } catch {
            case e: InvalidInputException if ignoreMissingFiles =>
              logWarning(s"${jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
                s" partitions returned from this path.", e)
              Array.empty[Partition]
          }
        }
    }
    
    1. RDD操作

    RDDs support two types of operations

    RDD支持两种类型的操作
    

    transformations, which create a new dataset from an existing one

    transformations 从已有的数据集创建新的数据集
    注:
      RDDA  ==(transformation)==> RDDB
      transformation包括map、filter等
      RDD在代码层面上来看就是一个类
    

    actions, which return a value to the driver program after running a computation on the dataset

    actions是在经过在数据集上计算之后,返回给driver program一个值
    

    For example, map is a transformation that passes each dataset element through a function and returns a new RDD representing the results.

    例如: map是一个transformation, 通过一个函数来将数据集的每个元素传入 返回一个代表了结果集的新的RDD
    

    On the other hand, reduce is an action that aggregates all the elements of the RDD using some function and returns the final result to the driver program (although there is also a parallel reduceByKey that returns a distributed dataset).

    另一方面,reduce是一个可以使用函数聚合RDD的所有元素
    并且返回给driver program的最后结果
    (尽管有一个可以返回一个分布式数据集的并行的reduceByKey)的action
    

    2.1 在spark上进行transmation的一些解释

    在Spark上的transformation是lazy的

    (1) 因为当在RDD上进行transformation, 是不启动task
        对于RDD的transformation,是pipeline操作的,这也符合RDD特性之一: dependency
        但是当执行action操作,就必须立刻启动task执行操作   所以此时,
        启动一个task,进行transformation,然后进行action操作
    (2) 他们不会直接计算结果,相反
         他们会记住应用在base dataset上的transformation transformation
         只有当action需要结果时才会计算 并且将该结果返回给driver program
    (3) 此设计可以使spark高效的运行
        eg. datasets经过map之后的数据集用于reduce action上
        只有reduce的结果返回给driver program
        而不是将mapped 数据集返回给driver program
    
    
    1. transformation

    3.1 map

    Return a new distributed dataset formed by passing each element of the source through a function func.

    返回一个新的分布式数据集, 该数据集是通过传递每个元素到一个函数之后得到
    

    3.2 filter

    Return a new dataset formed by selecting those elements of the source on which func returns true.

    返回一个新的数据集,该数据集是将source的每个元素通过一个方程
    选择其函数的返回值为true的元素
    

    3.3 flatmap

    Similar to map, but each input item can be mapped to 0 or more output items (so func should return a Seq rather than a single item).

    类似于map, 但是每个input的元素都能映射出0个或多个item
    

    Example: wordCount

    a.flatMap(x => x.split(" ")).map((_,1)).collect
    

    3.4 mapPartitions

    Similar to map, but runs separately on each partition (block) of the RDD, so func must be of type Iterator<T> => Iterator<U> when running on an RDD of type T.

    和map类似,但是会独立的运行RDD的每个分区
    

    Example

    1 rdd有100个partition, 1 partition有1w个元素, 所以map操作会有100万个partition, 但是mapPartitions是对patition进行操作,所以会有100个connection

    3.5 coalesce(着重讲)

    Decrease the number of partitions in the RDD to numPartitions. Useful for running operations more efficiently after filtering down a large dataset.

    减少RDD分区的数量到numPartitions, 当对large dataset进行多次filter down操作后, 运行操作更高效
    
    解释: RDD.filter().filter().filter().filter().filter() => newRDD
    此时RDD的某些分区可能无数据
    此时可以使用newRDD.coalesce(5)
    
    

    coalesce源码

      def coalesce(numPartitions: Int, shuffle: Boolean = false,
                   partitionCoalescer: Option[PartitionCoalescer] = Option.empty)
                  (implicit ord: Ordering[T] = null)
          : RDD[T] = withScope {......}
    

    coalesce源码注释

    Return a new RDD that is reduced into numPartitions partitions

    返回一个新的RDD,该RDD减少为numPartitions的分区
    

    3.6 repartition

    Return a new RDD that has exactly numPartitions partitions

    返回一个指定numPartitions分区的新的RDD
    

    repartition源代码

    def repartition(numPartitions: Int)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
        coalesce(numPartitions, shuffle = true)
    }
    

    从源代码可知,repartition底层调用coalesce

    1. Action

    4.1 collection
    注意

    很大的结果集不能用collect显示, 因为collect最后是将结果集以array形式显示
    所以其解决方法如下:
      (1) 使用take()函数,查看前几个
      (2) 传到hdfs上
    

    4.2 take

    Take the first num elements of the RDD

    4.3 count

    返回RDD的元素个数

    4.4 reduce()

    example: reduce(+), 也就是rdd元素两两相加

    4.5 first

    4.6 top

    4.7 foreach

    Run a function func on each element of the dataset

    在数据集上每隔元素都运行相同的function
    

    注意

    如果将数据写入到数据库,使用foreachPartition
    因为foreachPartition就是action
    

    4.8 sortBy

    代码

    a.sortBy(x=>x,false).collect
    

    源代码

    def sortBy[K](
          f: (T) => K,
          //默认是升序
          ascending: Boolean = true,
          numPartitions: Int = this.partitions.length)
          (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
        this.keyBy[K](f)
            .sortByKey(ascending, numPartitions)
            .values
    }
    

    相关文章

      网友评论

          本文标题:RDD(二)

          本文链接:https://www.haomeiwen.com/subject/acxepqtx.html