美文网首页
Spark概念及使用简介

Spark概念及使用简介

作者: 漂泊的胡萝卜 | 来源:发表于2018-09-06 17:12 被阅读0次

    Spark是什么?

    Spark是一个分布式计算平台(Hadoop是一个包含分布式计算、存储、管理的大数据生态系统),它对标的是Hadoop的MapReduce,那么与Hadoop的MapReduce相比,它具有什么优势呢?

    1. 更快
    2. 更容易使用
      除了Java之外,提供了Scala、Python、R的API;
    3. 好用的库
      基于Spark Core提供了Spark SQL、Spark Streaming、MLib、GraphX等库;
    4. 运行方便
      能够在生产环境下借助Spark生态,也可以在本地测试时独立运行。

    Spark架构

    spark架构图

    Spark的架构图如上,其核心是Spark Core,可以从HDFS、HBase获取数据;它的运行方式可以是本地运行、独立运行、借助Mesos、YARN等运行;基于Spark Core,提供了Spark SQL(类似Hive)、Spark Streaming(类似Storm)、MLib、GraphX。

    Spark的运行

    1.四种运行模式

    Spark的运行模式主要有Local,Standalone,yarn,mesos等,其中Local主要用于本地调试,而yarn是最常见的生产模式。

    2.如何运行

    Spark的bin目录下,有若干个启动脚本:
    pyspark:python交互式接口
    spark-class:java交互式接口
    spark-shell:scala交互式接口
    spark-submit:提交文件执行

    Spark关键概念——RDD

    1.RDD(Resilient Distributed Dataset) 弹性分布数据集概念

    它代表一个不可变、可分区、里面的元素可并行计算的集合。RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性。


    RDD示意图解.png
    RDD初始来源.png

    RDD特点如下:
    (1)一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。

    (2)一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

    (3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

    (4)一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

    (5)一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

    2. 生成RDD

    由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等。
    测试时,也可以由数组等生产RDD。

    3. RDD算子

    RDD包含两大类算子:

    1. 转化操作(Transformation),从已有数据集转化出新的数据集;
      常见的有:map、flatMap、filter、distinct、union、intersection、substract、cartesian、collect;
    2. 启动操作(Action),对数据集进行计算,并返回结果;
      常见的有:collect、count、countByValue、reduce、foreach。

    Transformation都是惰性求值的,也就是说它不会马上进行计算,仅仅是计算转换操作,当遇到Action操作时,才会触发计算。

    4. RDD的宽依赖和窄依赖

    由于RDD是粗粒度的操作数据集,每个Transformation操作都会生成一个新的RDD,所以RDD之间就会形成类似流水线的前后依赖关系;RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。如图所示显示了RDD之间的依赖关系。


    dependency.png

    从图中可知:

    窄依赖:是指每个父RDD的一个Partition最多被子RDD的一个Partition所使用,例如map、filter、union等操作都会产生窄依赖;(独生子女)

    宽依赖:是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey、sortByKey等操作都会产生宽依赖;(超生)

    在这里我们是从父RDD的partition被使用的个数来定义窄依赖和宽依赖,因此可以用一句话概括下:如果父RDD的一个Partition被子RDD的一个Partition所使用就是窄依赖,否则的话就是宽依赖。因为是确定的partition数量的依赖关系,所以RDD之间的依赖关系就是窄依赖;由此我们可以得出一个推论:即窄依赖不仅包含一对一的窄依赖,还包含一对固定个数的窄依赖。
    一对固定个数的窄依赖的理解:即子RDD的partition对父RDD依赖的Partition的数量不会随着RDD数据规模的改变而改变;换句话说,无论是有100T的数据量还是1P的数据量,在窄依赖中,子RDD所依赖的父RDD的partition的个数是确定的,而宽依赖是shuffle级别的,数据量越大,那么子RDD所依赖的父RDD的个数就越多,从而子RDD所依赖的父RDD的partition的个数也会变得越来越多。

    Shuffle和Stage

    宽依赖会导致Shuffle,Shuffle即数据在分片上的重新分布,会产生网络传输是Spark执行过程中最耗时的部分。
    调度器会在产生宽依赖的地方(Shuffle)形成一个stage,同一个stage内的RDD操作会流式执行,不会发生数据迁移。


    stage.png

    数据集持久化

    Spark中的持久化概念与数据库中的持久化概念稍有区别:Spark中数据集的持久化更偏向于缓存的概念,将中间结果保存下来,以便后续步骤重复利用,避免了一些耗时步骤的反复运行。
    Spark提供了多种持久化级别

    持久化级别 说明
    MEMORY_ONLY 将RDD以非序列化的Java对象存储在JVM中。 如果没有足够的内存存储RDD,则某些分区将不会被缓存,每次需要时都会重新计算。 这是默认级别。
    MEMORY_AND_DISK 将RDD以非序列化的Java对象存储在JVM中。如果数据在内存中放不下,则溢写到磁盘上.需要时则会从磁盘上读取
    MEMORY_ONLY_SER (Java and Scala) 将RDD以序列化的Java对象(每个分区一个字节数组)的方式存储.这通常比非序列化对象(deserialized objects)更具空间效率,特别是在使用快速序列化的情况下,但是这种方式读取数据会消耗更多的CPU。
    MEMORY_AND_DISK_SER (Java and Scala) 与MEMORY_ONLY_SER类似,但如果数据在内存中放不下,则溢写到磁盘上,而不是每次需要重新计算它们。
    DISK_ONLY 将RDD分区存储在磁盘上。
    MEMORY_ONLY_2, MEMORY_AND_DISK_2等 与上面的储存级别相同,只不过将持久化数据存为两份,备份每个分区存储在两个集群节点上。
    OFF_HEAP(实验中) 与MEMORY_ONLY_SER类似,但将数据存储在堆内存中。 这需要启用堆内存。

    共享变量(累加器和共享变量)

    Spark中因为算子中的真正逻辑是发送到Executor中去运行的,所以当Executor中需要引用外部变量时,需要使用广播变量累加器相当于统筹大变量,常用于计数,统计。

    广播变量:一些公共规则、公共配置可以使用广播
    广播变量预先存储在Driver上,当Task需要广播变量时,它会查看它所在的Executor是否包含,若无所在Executor的BlockManager向Driver获取广播变量。
    广播变量更新时,要先删除,再声明新的广播变量。

    val conf = new SparkConf()
    conf.setMaster("local").setAppName("brocast")
    val sc = new SparkContext(conf)
    val list = List("hello xasxt")
    val broadCast = sc.broadcast(list)
    val lineRDD = sc.textFile("./words.txt")
    lineRDD.filter { x => broadCast.value.contains(x) }.foreach { println}
    sc.stop()
    

    具体示例如上图,不使用广播变量,则Driver会把该变量发给每个Task。(在Driver定义,在Executor使用)

    累加器:
    累加器通常用于统计,在Driver定义,由Executor更新。

    import org.apache.spark.{SparkConf, SparkContext}
     
    object AccumulatorOperator {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        conf.setMaster("local").setAppName("accumulator")
        val sc = new SparkContext(conf)
        val accumulator = sc.accumulator(0)
        sc.textFile("./records.txt",2).foreach {//两个变量
          x =>{accumulator.add(1)
          println(accumulator)}}
        println(accumulator.value)
        sc.stop()
      }
    }
    

    参考文献:
    https://www.cnblogs.com/qingyunzong/p/8899715.html#_label0_0(RDD介绍,超详细)
    https://blog.csdn.net/databatman/article/details/53023818#4shuffle-%E5%92%8C-stage(shuffle讲的不错,调优讲的不错)
    https://www.cnblogs.com/liuliliuli2017/p/6782687.html(共享变量)
    https://www.cnblogs.com/LHWorldBlog/p/8424681.html(广播变量和累加器)

    相关文章

      网友评论

          本文标题:Spark概念及使用简介

          本文链接:https://www.haomeiwen.com/subject/eddtgftx.html