美文网首页
Spark Core

Spark Core

作者: 436048bfc6a1 | 来源:发表于2019-01-15 22:35 被阅读0次

    1.spark core
    1.1 学习方法

    对于关键类的关键方法一定看相应的注释
    

    1.2 什么是RDD

    A Resilient Distributed Dataset (RDD), 
    the basic abstraction in Spark.
    
    弹性的分布式数据集(RDD),是spark中最基本的抽象
    
    解释
    1. 弹性指的是:
        eg. RDDA =map=> RDDB =map=> RDDC
              当RDDC的机器failure, 可以从RDDB计算出RDDC
             所以弹性指的是从错误恢复的特性
    
    represents an immutable, partitioned collection of elements 
    that can be operated on in parallel
    
    代表一个不可变的,可以并行操作的分区的元素的集合
    
    解释:
    (1) 不可变:
          A => map => B, 但是在这里A和B不一样 
    (2) 分区的集合
          eg. List(1, 2, 3, 4, 5, 6, 7, 8, 9, 10)
                1, 2, 3     在第一台机器上
                4, 5, 6     在第二台机器上
                7, 8, 9, 10 在第三台机器上
    
    This class contains the basic operations available on all RDDs
    such as `map`, `filter`, and `persist`
    
    class RDD 包含了在所有RDD上可用的基本操作
    如map, filter和persist
    
    In addition, org.apache.spark.rdd.PairRDDFunctions
    contains operations available only on RDDs of key-value
    pairs, such as `groupByKey` and `join`;
    
    除此之外,org.apache.spark.rdd.PairRDDFunctions
    包含了只在key-value形式的RDD上才可以使用的操作,
    如groupby和join
    
    解释:
    (1) 如 select from * A join B on a.id = b.id;
    如果在spark里执行类型这种语句,id就是key,其他则为value
    
    Internally, each RDD is characterized by five main properties:
    - A list of partitions
    - A function for computing each split
    - A list of dependencies on other RDDs
    - Optionally, a Partitioner for key-value RDDs 
      (e.g. to say that the RDD is hash-partitioned)
    - Optionally, a list of preferred locations to compute each split on 
      (e.g. block locations foran HDFS file)
    
    内部上说,每个RDD特性都由以下五个主要性质决定的
    - 有一组分区
    - 有函数可以作用到每个分区上
    - 有一组作用在其他RDDs上的依赖
    - 可选,对于key-value RDDs有一个分区器
    - 可选,有对于计算每个split都有一组喜好的位置
    
    解释
    - 一个RDD是由分区构成的
    - 对RDD所使用的函数其实是作用到其所有分区上
      eg .rdd.map() <==>rdd.split1.map() + rdd.split2.map()
      在spark里partition <==> task, 有几个partition就有几个task
    - eg. RDDA =map=> RDDB =map=> RDDC
          RDDC依赖于RDDB和RDDA
    - 对于一个有三副本的文件,其存在三台机器上
      file1: hadoop001 => replica1
             hadoop002 => replica2
             hadoop0003 => replica3
      最好的是将计算放在这三个机器上,但是有时由于三台机器繁忙,没有资源进行计算,此时有两种选择,拷贝数据或者等待
      但是最好选择等待,以为拷贝数据花费时间更多,消耗资源更大,所以
      移动计算 > 移动数据
    

    1.3 源码解释

    abstract class RDD[T: ClassTag](
        @transient private var _sc: SparkContext,
        @transient private var deps: Seq[Dependency[_]]
      ) extends Serializable with Loggin
    
    解释
    1. RDD是抽象类,需要实现
    2. 继承了序列化,因为要进行网络传输
    3. Serializable 和 Loggin 都是trait,正常写法是 with Serializable with Loggin
       但是由于scala特性,第一个trait可以写extends,不用写with
    
    下面是RDD的其中一个实现类HadoopRDD
    
    class HadoopRDD[K, V](
        sc: SparkContext,
        broadcastedConf: Broadcast[SerializableConfiguration],
        initLocalJobConfFuncOpt: Option[JobConf => Unit],
        inputFormatClass: Class[_ <: InputFormat[K, V]],
        keyClass: Class[K],
        valueClass: Class[V],
        minPartitions: Int)
      extends RDD[(K, V)](sc, Nil) with Logging
    

    1.3.1 源码中体现RDD的五大特性

       Implemented by subclasses to return the set of partitions in this RDD. This method will only
       be called once, so it is safe to implement a time-consuming computation in it.
       The partitions in this array must satisfy the following property:
       rdd.partitions.zipWithIndex.forall { case (partition, index) => partition.index == index }
    
    protected def getPartitions: Array[Partition]
    对应RDD的第一个特性
    
    Implemented by subclasses to compute a given partition.
    
    def compute(split: Partition, context: TaskContext): Iterator[T]
    对应的是RDD第二个特性
    
    Implemented by subclasses to return how this RDD depends on parent RDDs. This method will only
    be called once, so it is safe to implement a time-consuming computation in it.
    
    //Seq[Dependency[_]代表集合,指多个Dependencies
    protected def getDependencies: Seq[Dependency[_]] = deps
    对应RDD第三个特性
    

    1.3.2 在RDD的实现类是如何体现的(HadoopRDD)

     //返回分区数组
      override def getPartitions: Array[Partition] = {
        val jobConf = getJobConf()
        // add the credentials here as this can be called before SparkContext initialized
        SparkHadoopUtil.get.addCredentials(jobConf)
        try {
          val allInputSplits = getInputFormat(jobConf).getSplits(jobConf, minPartitions)
          val inputSplits = if (ignoreEmptySplits) {
            allInputSplits.filter(_.getLength > 0)
          } else {
            allInputSplits
          }
          val array = new Array[Partition](inputSplits.size)
          for (i <- 0 until inputSplits.size) {
            array(i) = new HadoopPartition(id, i, inputSplits(i))
          }
          array
        } catch {
          case e: InvalidInputException if ignoreMissingFiles =>
            logWarning(s"${jobConf.get(FileInputFormat.INPUT_DIR)} doesn't exist and no" +
                s" partitions returned from this path.", e)
            Array.empty[Partition]
        }
      }
    

    官网原文1

    Spark revolves around the concept of a resilient distributed dataset (RDD), 
    which is a fault-tolerant collection of elements that can be operated on in parallel.
    There are two ways to create RDDs: parallelizing an existing collection in your driver program, 
    or referencing a dataset in an external storage system, 
    such as a shared filesystem, HDFS, HBase, 
    or any data source offering a Hadoop InputFormat.
    

    翻译1

    RDD的概念是spark重要的元素。
    RDD就是一个可以并行操作的容错的集合。
    有两种方式可以创建RDD
    在driver program中并行化一个现有的collection
    在外部存储系统(如HDFS、HBASE或者其它能提供InputFormat的数据源)refer一个dataset
    

    解释1

    第一种在测试时候使用
    

    源代码1

      def parallelize[T: ClassTag](
          //seq可以理解为数组
          seq: Seq[T],
         //numSlices有默认值,默认值为defaultParallelism
          numSlices: Int = defaultParallelism): RDD[T] = withScope {
        assertNotStopped()
        new ParallelCollectionRDD[T](this, seq, numSlices, Map[Int, Seq[String]]())
      }
    

    源代码1注释

    Methods for creating RDDs
    Distribute a local Scala collection to form an RDD
    
    创建RDD的方法
    为了生成RDD,提供本地的scala集合
    

    源代码2

    //ParallelCollectionRDD继承与RDD
    //说明ParallelCollectionRDD也是RDD
    private[spark] class ParallelCollectionRDD[T: ClassTag](
        sc: SparkContext,
        @transient private val data: Seq[T],
        numSlices: Int,
        locationPrefs: Map[Int, Seq[String]])
        extends RDD[T](sc, Nil) {......}
    

    源代码注释2

    Return an array that contains all of the elements in this RDD
    
    返回一个包含在此RDD的所有元素的array
    

    源代码3

      def collect(): Array[T] = withScope {
        val results = sc.runJob(this, (iter: Iterator[T]) => iter.toArray)
        Array.concat(results: _*)
      }
    
    源代码注释3
    Read a text file from HDFS, a local file system (available on all nodes),
    or any Hadoop-supported file system URI, and return it as an RDD of Strings.
    
    从HDFS或者本地文件系统(在所有节点上都可用)
    注: 也就是说要保证所有节点上相同路径相同的数据
    

    源代码4

      def textFile(
          path: String,
          minPartitions: Int = defaultMinPartitions): RDD[String] = withScope {
        assertNotStopped()
        hadoopFile(path, classOf[TextInputFormat], classOf[LongWritable], classOf[Text],
          minPartitions).map(pair => pair._2.toString).setName(path)
      }
    

    代码1

    function main() {
      if $cygwin; then
        # Workaround for issue involving JLine and Cygwin
        # (see http://sourceforge.net/p/jline/bugs/40/).
        # If you're using the Mintty terminal emulator in Cygwin, may need to set the
        # "Backspace sends ^H" setting in "Keys" section of the Mintty options
        # (see https://github.com/sbt/sbt/issues/562).
        stty -icanon min 1 -echo > /dev/null 2>&1
        export SPARK_SUBMIT_OPTS="$SPARK_SUBMIT_OPTS -Djline.terminal=unix"
        "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
        stty icanon echo > /dev/null 2>&1
      else
        export SPARK_SUBMIT_OPTS
        "${SPARK_HOME}"/bin/spark-submit --class org.apache.spark.repl.Main --name "Spark shell" "$@"
      fi
    }
    
    

    代码解释1

    #在此设置默认参数
    #调用spark-submit函数
    ${SPARK_HOME}"/bin/spark-submit
    --class org.apache.spark.repl.Main 
    --name "Spark shell" 
    "$@"
    

    example1


    complete_jobs.png
    上图中,显示调用一次collect,完成1个job
    

    注意1

    如果spark是一个有三台机器组成的集群,集群的机器名分别为
    HDFS1
    HDFS2
    HDFS3
    
    如果使用textFile()读取文件,必须要保证该文件在三台机器上都存在
    
    否则的话会报org.apache.hadoop.mapred.InvalidInputException: Input path does not exist错误
    

    官网原文及翻译1

    If using a path on the local filesystem, 
    the file must also be accessible at the same path on worker nodes. 
    Either copy the file to all workers or use a network-mounted shared file system.
    
    如果使用本地文件系统,该文件必须能在其他的节点上的相同路径上能被访问
    要么将文件拷贝到所有的工作节点上
    要么使用网络共享文件系统(如HDFS)
    
    All of Spark’s file-based input methods, including textFile, 
    support running on directories, compressed files, and wildcards as well. 
    For example, you can use textFile("/my/directory"), 
    textFile("/my/directory/*.txt"), 
    and textFile("/my/directory/*.gz").
    
    所有spark的基于文件的input方法,包括textFile()方法,
    支持读取目录,压缩文件并且支持正则表达式
    
    The textFile method also takes an optional second argument 
    for controlling the number of partitions of the file. 
    By default, 
    Spark creates one partition for each block of the file 
    (blocks being 128MB by default in HDFS), 
    but you can also ask for a higher number of partitions by passing a larger value. 
    Note that you cannot have fewer partitions than blocks.
    
    textFile方法可以接受第二个可选的参数
    该参数能够控制文件分区的数量,即minPartitions
    默认情况下,Spark能够为文件的每个block创建一个分区
    (在HDFS上,一个block是128M)
    但是可以通过传递一个更大的值来要求更多的分区数量
    也就是说并行度提高了,同一时刻处理数据的job多了,
    在资源足够情况下,性能也就越高
    
    不能让分区数比block小
    
    但是分区数可以比block小,但是会造成每一个task处理的数据就更多了,
    因为会发生一个文件会被拆分成很大的+很小的
    
    !!!暂时不知道会造成什么问题,以后解决
    (默认情况下,eg.一个有10个block的文件可以被拆分为10个分区
    如果分区数比block小,partition1是整个block1+一小部分block2
    一个task运行一个partition,此时partition就会比原来的大
    )
    
    Apart from text files, 
    Spark’s Scala API also supports several other data formats:
    
    除了text file,spark也支持其他的数据类型
    
    SparkContext.wholeTextFiles lets you read a directory containing multiple small text files, 
    and returns each of them as (filename, content) pairs. 
    This is in contrast with textFile, 
    which would return one record per line in each file. 
    Partitioning is determined by data locality which, 
    in some cases, may result in too few partitions.
    For those cases, wholeTextFiles provides an optional second argument 
    for controlling the minimal number of partitions.
    
    SparkContext.wholeTextFiles让你读一个包含多个小的text文件的目录
    已(filename, content)形式返回一个key-value对。
    这个是与textFiles进行对比,它的一个record就是任意文件的一行
    分区是由数据位置决定的,可能会造成分区过小
    所以,wholeTextFiles 提供另一个参数来控制最小分区
    
    For [SequenceFiles], use SparkContext’s `sequenceFile[K, V]` method
    where `K` and `V` are the types of key and values in the file
    
    对于sequenceFile,使用SparkContext的sequenceFile[K, V]方法
    其中k是key, v是value
    
    For other Hadoop InputFormats, you can use the SparkContext.hadoopRDD method
    
    对于其他Hadoop的InputFormats,
    可以使用SparkContext.hadoopRDD 方法
    
    RDD.saveAsObjectFile and SparkContext.objectFile support 
    saving an RDD in a simple format consisting of serialized Java objects
    
    RDD.saveAsObjectFile和SparkContext.objectFile
    支持
    已由连续的java object所组成的简单形式来保存RDD
    

    源代码5

          path: String,
          minPartitions: Int = defaultMinPartitions): RDD[(String, String)] = withScope {
        assertNotStopped()
        val job = NewHadoopJob.getInstance(hadoopConfiguration)
        // Use setInputPaths so that wholeTextFiles aligns with hadoopFile/textFile in taking
        // comma separated files as input. (see SPARK-7155)
        NewFileInputFormat.setInputPaths(job, path)
        val updateConf = job.getConfiguration
        new WholeTextFileRDD(
          this,
          classOf[WholeTextFileInputFormat],
          classOf[Text],
          classOf[Text],
          updateConf,
          minPartitions).map(record => (record._1.toString, record._2.toString)).setName(path)
      }
    

    源代码注释4

    Read a directory of text files from HDFS, 
    a local file system (available on all nodes), 
    or any Hadoop-supported file system URI. 
    Each file is read as a single record and returned in a
    key-value pair, 
    where the key is the path of each file, the value is the content of each file.
    
    读取一个HDFS(或本地文件系统或任意Hadoop支持的文件系统的URI)上的text文件的目录
    每一个文件当读取时都会被当成单个记录
    返回是一个key-value对
    key就是每个文件的路径,value就是每个文件的内容
    
    Small files are preferred, large file is also allowable, but may cause bad performance
    小文件适合用(但是不常用),大文件也会被允许使用,但是会造成性能上的损失
    

    官网原文及翻译2

    RDDs support two types of operations: transformations, 
    which create a new dataset from an existing one, 
    and actions, 
    which return a value to the driver program after running a computation on the dataset. 
    For example, map is a transformation that passes each dataset element 
    through a function and returns a new RDD representing the results. 
    On the other hand, reduce is an action that aggregates all the elements of the RDD 
    using some function and returns the final result to the driver program 
    (although there is also a parallel reduceByKey that returns a distributed dataset).
    
    RDD支持两种类型的操作:
    transformations
    从已有的数据集创建新的数据集
    也就是说
      RDDA ==transformation==>RDDB
      transformation包括map、filter等
      RDD在代码层面上来看就是一个类
    
    actions
    在经过在数据集上计算之后,返回给driver program一个值
    例如: map是一个transformation, 通过一个函数来讲数据集的每个元素传入
    返回一个代表了结果集的新的RDD
    
    在Spark上的transformation是lazy的
    也就是说当在RDD上进行transformation, 是不启动task
      因为对于RDD的transformation,是pipeline操作的
      这也符合RDD特性之一: dependency
      所以需要等待transformation操作都输入后, 才启动task对其进行pipeline操作
      但是当执行action操作,就必须立刻启动task执行操作
      所以此时,启动一个task,进行transformation,然后进行action操作
    
    他们不会直接计算结果,相反,他们会记住应用在base dataset上的transformation
    transformation只有当action需要结果时才会计算
    并且将该结果返回给driver program
    
    此设计可以使spark高效的运行
    例如: datasets经过map之后的数据集用于reduce action上
    只有reduce的结果返回给driver program,而不是将mapped 数据集返回给driver program
      
    

    源代码6

      def sortBy[K](
          f: (T) => K,
          //默认是升序
          ascending: Boolean = true,
          numPartitions: Int = this.partitions.length)
          (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
        this.keyBy[K](f)
            .sortByKey(ascending, numPartitions)
            .values
      }
    

    术语解释1(重要)

    Application:
      User program built on Spark
      Consists of a driver program and executors on the cluster
    
    应用:
      构建在spark上的用户应用程序(eg.idea上的scala object)
      由在集群上的一个driver program和多个executors所组成
    
    Application jar
      A jar containing the user's Spark application
      In some cases users will want to create an "uber jar"--
      --containing their application along with its dependencies
      The user's jar should never include Hadoop or Spark libraries, --
      --however, these will be added at runtime.
    
    Application jar
      一个jar包含了用户的Spark应用程序
    
    Driver program
      The process running the main() function of the application-- 
      --and creating the SparkContext
    
    Driver program
      运行应用main()方法的进程,并且能创建SparkContext
      所以在main方法里创建SparkContext的程序就是driver program
    
    Cluster manager
      An external service for acquiring resources on the cluster 
      (e.g. standalone manager, Mesos, YARN)
    
      在集群上申请资源的外部的服务
    
      好处: 代码开发过程中不用关注代码运行在哪里
            运行各种模式下,其代码都是相同的
    
    Deploy mode
      Distinguishes where the driver process runs.
      In "cluster" mode, the framework launches the driver inside of the cluster
      In "client" mode, the submitter launches the driver outside of the cluster
    
    Deploy mode
      分辨driver process运行在哪里
      在集群模式, 框架在集群内启动框架
      在client模式, submitter在cluster外面启动driver
    
    Worker node
      Any node that can run application code in the cluster
    
    Worker node
      在集群上能够运行应用的node被称为Worker node
    
    Executor
      A process launched for an application on a worker node
      that runs tasks and keeps data in memory or disk storage across them
      Each application has its own executors
    
    Executor
       启动一个服务于worker node(eg. node manager)上的进程, 运行在container里
       运行tasks(map或filter),并且可以将数据放于内存中或者是跨节点的磁盘上。
       每个应用程序有其独立的executors
    
    Task 
      A unit of work that will be sent to one executor
    
    Task
      发送给executor的工作单元
    
    Job
      A parallel computation consisting of multiple tasks 
      that gets spawned in response to a Spark action (e.g. save, collect)  
    
    Job
      由多个task组成的一个并行计算
      一个action触发一个job
    
    Stage
      Each job gets divided into smaller sets of tasks 
      called stages that depend on each other
      similar to the map and reduce stages in MapReduce
    
    Stage
      1个job会被分成多个stage
      遇到shuffle就产生新的stage
    

    figure1


    spark开发流程.png

    相关文章

      网友评论

          本文标题:Spark Core

          本文链接:https://www.haomeiwen.com/subject/wovudqtx.html