spark

作者: 徐振杰 | 来源:发表于2019-03-16 17:00 被阅读0次

    spark工作原理:分布式,内存运行,迭代式

    1. spark和MapReduce最大的不同在于它是迭代式的:
      MapReduce 只有map和reduce两个阶段
      spark的计算模型可以分为n个阶段

    RDD
    RDD是从HDFS或者HIVE中来的,他是一种元素集合,可以被分为多个partition

    容错性:如果计算出错,那么RDD会自动从失败节点中恢复。
    弹性:RDD的弹性在于如果你的数据有10w,但是partition只有5w,那么rdd就会将一部分数据写入磁盘,而对用户而言这是透明的,所以这就是RDD的弹性所在。

    创建RDD

    1. 程序中的集合创建,然后用parallelize
    2. 本地文件创建
    3. HDFS创建

    spark的两种操作分别为transformation和action,其中transformation有lazy特性,如果程序中只有transformation那么即使执行spark,该操作也不会执行,这样spark就能对底层进行优化了。而action会触发一个job。reduceByKey也是transformation算子,foreach是action算子。也就是说整个transformation都在driver上执行,只有action之后,才会提交task到executor上面,之后executor再去hdfs上面读取数据,然后executor再依次执行transformation算子。P40

    transformation:map filter flatmap groupByKey reduceByKey sortByKey join cogroup
    action: reduce collect count take saveAsTextFile countByKey foreach

    foreach是在远程集群执行的,所以性能要比collect,countByKey高很多

    RDD持久化:
    例如要分别执行两次count,但是第二次的count执行的时候由于第一次之后rdd被其他的数据占用,而导致第二次count又要重新计算一次,又要重新从hdfs中读出,非常浪费资源,所以就要把RDD放到缓存中持久化。
    spark在shuffle是也会自动进行持久化,主要是为了节点的失败避免重新计算整个过程
    cache()和persist()需要在创建textfile之后直接调用,不然没有效果
    MEMORY_ONLY_SER消耗的空间比MEMORY_ONLY小,因为直接将java对象序列化,但是反序列要消耗cpu
    MEMORY_ONLY_2则是将序列号保存到别的节点可以恢复失败的节点

    共享变量
    如果一个算子要用到外部变量,那么这个变量就会被拷贝到task中,task想要共享这个变量是不可能的。所以spark提供了两种,Boardcast Variable 针对读 给每一个executor一份那么里面的所有task就可以共享这个变量了 和 Accumulator 针对写
    sc.boardcast(factor)
    driver中可以调用accumulate.value

    二次排序
    要实现ordered和serializable接口

    Spark原理 P58
    application:我们自己编写的spark
    用spark-submit(standalone模式)提交application启动driverAction,
    构造出sparkContext,
    之后sparkContext构造出DAGScheduler和TaskScheduler,
    TaskScheduler去找Master注册application,
    之后Master用资源调度算法为Master启动Executor,(Master资源调度算法)
    之后Executor反向注册到TaskScheduler所有的Executor注册完后,
    Driver结束sparkContext的初始化,
    每执行一个action就会创建一个job
    job会提交给DAGScheduler
    DAGScheduler会将Job划分为多个stage(stage划分算法)
    每个stage创建一个TaskSet
    然后把TaskSet给TaskScheduler
    TaskScheduler把TaskSet中的每一个task提交给每个Executor(task分配算法)
    Executor收到task用TaskRunner封装task,然后从线程池中取出一个线程池执行
    TaskRunner将代码拷贝执行反序列化
    Task有两种:ShuffleMapTask和ResultesultTask,最后一个stage是ResultesultTask其他的都是ShuffleMapTask
    每个task处理一个partition

    宽依赖和窄依赖

    窄依赖Narrow Dependency:每个partition仅仅依赖于一个父partition
    宽依赖Shuffle Dependency :相反

    spark有三种提交模式
    standalone
    yarn-cluster :反向注册到一个NodeManager创建Driver
    yarn-client:反向注册到本地客户端创建Driver

    yarn-client 本地的driver会与yarn产生超大流量,但是本地可以看到所有的log,方便调试
    yarn-cluster 用于生产,缺点在于调试不方便

    SparkContext

    Master

    1. 主备切换:基于文件系统或zookeeper
    2. 注册机制:worker driver application分别向master注册


    3. 状态改变
    4. 资源调度
      application调度机制:threadOutApps和非threadOutApps

    调用reduceByKey会触发隐式转换,去找rddToPairRDDFunctions将RDD转换成PairRDDFunctions

    DAGScheduler P70
    reduceByKey 有三个RDD MapPartitionRDD,shuffleRDD ,MapPartitionRDD,所以它可以是触发两个stage的原因
    stage划分:如果上一个RDD是窄依赖就划分到当前的stage,如果是宽依赖就划分到新的stage

    Task分配算法
    遍历每一种本地化级别、
    PROCESS_LOCAL:partition和task进入同一个executor
    NODE_LOCAL:不在同一个executor,进程,但是在一个节点
    NO_RREF没有本地化
    RACK_LOCAL机架本地化
    ANY任意

    Executor
    启动的Executor是CoarseGrainedExecutorBackend

    Task

    shuffle

    1. 如果内存缓存不够了,就会一点一点把内存写入磁盘,所以会导致过多的磁盘操作
    2. 与MapReduce不同,Spark默认不会对数据排序,所以spark能边拉去边聚合,但是MapReduce会,所以spark会快很多,但是spark不能直接处理value算子,需要要先用MapParititionRDD,再用map算子,所以会不方便

    BlocklockManager
    CacheManager
    Checkpoint
    持久化是将数据保存到BlockManager中,但是checkpoint改变了依赖。
    持久化保存在内存或者磁盘中,所以容易丢失。
    但是checkpoint保存在高容错的文件系统中,如HDFS,所以丢失的可能行比较低。
    要多checkpoint的rdd先persist(DISK_ONLY),如果不这样,中间rdd由于没有持久化,在写入磁盘的时候需要重新计算一次

    Spark性能调优

    内存都花到哪里了?

    1. java的对象头占了16个字节,有些integer对象的头比本身的数据还要打
    2. string使用char数组保存的,还需要保存数组长度等信息,utf-16每个字符需要连个字节
    3. Hash和LinkedList这种内部使用链表的数据结构都是用Entry来保存的,Entry也是个对象

    kryo序列化
    java序列化大对象导致序列化缓慢,但是java序列化比较便捷
    而kryo序列化速度快而且占用的内存空间小
    val conf = new SparkConf().setMaster().setAppName()
    conf.registerKryoClasses(Array(classOf[Counter])) //要注册的原因是全类名比较长,也容易消耗内存
    val sc = new SparkConf(conf)

    数据结构优化
    优先使用数组而不是集合
    将HashMap和List拼成特殊格式的字符串
    用json代替多层嵌套对象

    对多次操作RDD持久化或checkpoint
    如果要持久化数据,放在blockmanager中,在可能丢失还要保持高性能,那么干脆第一次计算RDD就checkpoint

    虚拟机优化
    如果内存不够,会频繁GC,导致task工作线程停止,性能消耗很大
    Executor划了60%的内存给RDD缓存,40%给task运行
    所以可以改变这个参数,这样可以降低GC发生的频率
    可以使用序列化优化rdd partition.
    SparkConf().set("spark.storage.memoryFraction","0.5")

    Eden去太小会迅速占满老年代,从而引起full gc导致性能下降,所以要调大Eden区,如HDFS的block块64M,序列化之后为3倍,一个Executor中有4个task,所以就要设置为500M
    也可以调大survival的大小

    广播共享优化

    本地化调优 P93
    spark.locality

    reduceByKey的性能比groupByKey要好,因为他可以本地聚合,所以能用reduceByKey就用reduceByKey

    Shuffle优化
    用了consolidation还可以减少磁盘IO,如果原来有100ShuffleMapTask那就要用100个里面拉取,但是用了consolidation比如有10个 cpu core 那么就可以只用10个里面拉取
    还可以用spark.reducer.maxSizeInFilght加大内存,这样也可以减少拉取次数
    同样提高spark.shuffle.file.buffer,可以加大bucket的数量
    spark.shuffle.io.maxRetries和spark.shuffle.io.retryWait增加拉取失败重复次数和重试间隔,因为有的时候task正在full gc,会导致task停止,所以可能拉取失败

    Spark SQL
    原来是hive,但是hive基于MapReduce太慢了,后来spark出了shark,但是shark底层是依赖hive的,所以最后spark出了Spark SQL
    内存列存储,也就是一个列为一个对象,减少内存的消耗,避免了大量的GC
    列存储的好处:可以跳过不符合的数据,都是同一种数据可以压缩,

    RDD转换为Dataframe

    1. 用反射推断特定数据类型
    2. 动态创建一个元数据

    SparkSQL
    组内行号row_number()
    udf User-Defined Functions

    优化
    如果数据量过大,那么尽量不要用collect,而用foreach
    用缓存表
    广播join表:小于某个阈值的表会被广播出去


    spark streaming
    updateStateByKey : 会从缓存中拿数据
    transform:可以过滤广告黑名单

    updateStateByKey 必须设置一个checkpoint目录,不然的话,如果宕机又要重新开始计算

    Spark的容错机制是通过血缘关系来保证的
    Kafka的Direct可以实现容错机制

    spark和kafka


    相关文章

      网友评论

          本文标题:spark

          本文链接:https://www.haomeiwen.com/subject/glyrmqtx.html