美文网首页sparkspark
spark 和 pyspark的理解

spark 和 pyspark的理解

作者: 社交达人叔本华 | 来源:发表于2019-03-09 13:46 被阅读0次

最近学习了spark 相关的内容,写个笔记记录一下自己目前对于spark的理解,方便以后查阅。在本文的写作过程中,主要参考了1.宽依赖与窄依赖的区别;2.spark中几个概念的梳理;[3.spark shuffle的理解](https://blog.csdn.net/zylove2010/article/details/79067149)这样三篇博客,写的非常好,建议大家都去看看。

1.简介

  Spark可以说是圈内最流行的几个大数据处理框架之一了,类似地位的可能还有storm之类的。其最大的优点就是能够几乎底层透明的完成分布式的计算,非常方便开发。

  Spark是可以搭建在很多平台上的,本文面对的环境就是比较常见的Spark+Hadoop(作为文件系统)+Hive(作为分布式数据库)的配置。

  本文重点想讲解的就是spark整个程序的生命周期(我们能够接触到的几个环节,还没有到太底层的需要看源码的程度)。

2.spark的核心数据结构-RDD

RDD是spark 的核心数据结构,spark 之所以能够做到把分布式做成近乎底层透明,正是依靠了RDD.RDD全称弹性分布式数据集(Resilient Distributed Datasets).

2.1 partition ,task 与 RDD

  抛开一切我们想想一下,假如说现在有一张很大很大(千万级)的数据表格要你处理,我们按传统的思维方式来搞的话,就是:操作表格嘛,那么我就用个dataframe(R语言和python的pandas包中都有此概念)来封装这个数据表呗,然后我不就是各种骚操作嗖嗖嗖,像什么计数用的count函数啦,排序用的sort函数啦,分组用的groupby函数啦。

  现在问题来啦!你好像忘记了我一个很重要的前提,这个表很大很大啊!你用dataframe来封装的前提是得把这些数据全部加载到内存啊。这显然是不现实的。那么我们就要想办法,最直观的办法就是Divide & Conquer。因为我们看看我们想做的这些操作,无论是计数,还是排序,还是分组,都是能够先分成小数据集,并行处理,然后再合并出结果的。因此我们的解决方案来啦,以计数为例,我们首先把这个大数据集分成多个小数据集(知识点,敲黑板!!这就是partition),每个小数据集我们启动一个子任务去让他做计数(知识点,敲黑板!!这就是task),每个子任务执行完毕之后再汇总成最终的结果。

RDD

  其实,spark 就是帮我们把上面的工作完成了。Instead of 手动的分割文件,手动的分配任务,手动的汇总结果,我们只需要把我们的数据封装成RDD数据类型,就能够像是在操作普通小数据集一样的完成常见的那几种数据操作。

2.2 进击的RDD-Dataframe简介

  在RDD的基础之上,spark又提出了升级版的数据结构-Dataframe,不过dataframe注意:这里的dataframe是pyspark中的叫法,在scala等语言中使用的dataset的名称

  那么什么是dataframe呢,简单的说你可以把他想象成数据库里的一张表,他有自己的column,row,还包括一些针对表的操作。如下面盗来的这张图所示:

dataframe示例-盗图自易佰教程

  spark中通过引入dataframe的数据结构带来了很多好处,在这里我们只重点说一说其中的两个:

  • 效率高。dataframe自带的一些操作都是经过优化的,能够以极为高效的方式完成任务。
  • 操作简便。经过又一层的封装,spark中的数据操作变得更加友好,上手很快,相比之下原来的针对rdd的操作可以称得上是非常原始了。

3.spark的生命周期

  spark 作为一个大数据处理的框架,具有自己完整的生命周期。

3.1 全生命周期

  闲言少叙,我在这里一句话串联一下整个第三节的脉络:
  一段程序在spark里叫做一个application,一个application会划分成很多个job(划分条件是action),一个job会划分成很多个stage(划分条件是shuffle),每个stage里所有被处理的数据会划分之后交给很多个子任务去处理(划分条件是partition)

spark生命周期图

3.2 拆解讲解 - job & action

  前文提到了,一个action的产生将会促使application切分Job。那么什么是action?简单来说就是spark中对于rdd的操作可以分成两类:tranformationaction。听到这两个名字,相信很多人已经明白了,transformation就是只是在做一些变形之类的操作,有点类似于hadoop里面的map,比如整体加个1啊什么的。而action是实际需要求值出结果的操作,比如说count什么的。

  这个概念有点像lazy evaluation的操作,估计和spark的正宗语言是scala有关。总之,就是不到万不得已不求值,求值就要切分job.

3.3 拆解讲解 - stage & shuffle

  现在说一说job内部的划分-stage。前面提到了,spark是不到万不得已不求值,求值才划分job,因此在一个job内部就完全是transforamtion的操作。

  但是,即使都是变换操作也是有不一样的,有的变换是一一对应的变换,比如说每个元素都加1;而有的变换则是涉及到整个RDD,比如groupby.这就是窄依赖宽依赖的变换。

宽依赖和窄依赖

  为什么突然整这么一个概念呢,记住一句话:宽依赖引发shuffle 操作,shuffle操作导致stage切分。 想一下,我们现在把每个rdd交给很多个小的task取执行了,大家各自执行各自的(并行),执行完了之后如果没啥问题接着走后面的操作,直到最后汇总,这种就是完全并行的操作,理想的情况。但是总有一些操作搅屎,它是全局的操作(宽依赖),它必须得等待前面分好的所有子任务全部执行完他才能执行,换句话说就是必须得先在他这里汇总一下。

  那么我们现在得出了一个结论:stage是spark种并行处理的最大单位。一个stage以内的各种操作都可以各自搞各自的,互不影响,从而最高的利用并行开发的效率。而出了stage就只能顺序执行所有操作了。

4.如何提高执行效率?——spark并发度的计算

  到这里,主要的内容都差不多讲完了,但是我写这篇文章最大的原因还没有说。其实就是我在使用pyspark的时候遇到的一个问题。简单来说就是程序老是崩,总是提醒我内存不足,我经过好几天的折腾才发现是自己设定的问题。也总结出来了一条经验,就是想提高spark的效率就可以从两个角度出发:

  • 修改配置内容,增加并发度。核心配置excutor.instances是spark处理器的个数(虚拟的可以多分配一些),excutor.cores是spark处理器的核心个数(虚拟的可以多分配一些)。
    spark = SparkSession.builder.enableHiveSupport(). \
        master("yarn"). \
        config('spark.executor.memory', '15g'). \
        config('spark.executor.cores', '10'). \
        config('spark.executor.instances', '20'). \
        config('spark.driver.memory', '20g'). \
        getOrCreate()
  • 修改RDD的partition,划分更小的task。前面提到的我的那个问题的本质原因不是并发度的问题,而是划分之后的任务还是太大了,交给每一个核心去处理内存都会扛不住,所以需要手动的划分(原本采用的是默认的划分,默认划分取决于你数据的输入,比如从hdf来的文件就是和你file split分片保持一致)。

    df.repartition.groupyBy("city").count().sortBy('count')

5.赠送内容:spark的输出进度条怎么解读

进度条

  最后一个问题,spark跑任务的时候的那个进度条里的数字都是啥玩意?相信很多人刚开始的时候都搞不太明白。简单说明一下:

  • stage:就是前面提到的job内部划分的stage,不多说了
  • 进度条尾端(X+Y)/Z:X是已经执行完的任务总数,基本上和数据的partition是保持一致的。Y是活跃的(准备好可以执行的)但是还没执行的的任务数。Z是总任务数。
  • 特殊现象:有时候会出现活跃任务数是负数!!这是什么情况?——这个负数就是执行失败的任务数,需要重新执行的。

相关文章

  • spark 和 pyspark的理解

    最近学习了spark 相关的内容,写个笔记记录一下自己目前对于spark的理解,方便以后查阅。在本文的写作过程中,...

  • Spark-pyspark

    pyspark介绍 pyspark是Spark官方提供的API接口,同时pyspark也是Spark中的一个程序。...

  • MAC Spark安装和环境变量设置

    本文使用的是Spark和python结合Spark的API组件pyspark安装Spark之前确保已有JDK环境和...

  • pySpark 中文API (2)

    pyspark.sql模块 模块上下文 Spark SQL和DataFrames的重要类: pyspark.sql...

  • PySpark初见

    PySpark PySpark 是 Spark 为 Python 开发者提供的 API。 子模块pyspark.s...

  • pyspark

    pyspark version 输出spark的版本 print("pyspark version"+str(sc...

  • Spark Python API Docs(part two)

    pyspark.sql module Module context Spark SQL和DataFrames中的重...

  • 4.pyspark.sql.Column

    Spark SQL和DataFrames重要的类有: pyspark.sql.SQLContext: DataFr...

  • 9.pyspark.sql.WindowSpec

    Spark SQL和DataFrames重要的类有: pyspark.sql.SQLContext: DataFr...

  • 10.pyspark.sql.FrameReader

    Spark SQL和DataFrames重要的类有: pyspark.sql.SQLContext: DataFr...

网友评论

    本文标题:spark 和 pyspark的理解

    本文链接:https://www.haomeiwen.com/subject/sfpspqtx.html