spark工作原理:分布式,内存运行,迭代式
- spark和MapReduce最大的不同在于它是迭代式的:
MapReduce 只有map和reduce两个阶段
spark的计算模型可以分为n个阶段
RDD
RDD是从HDFS或者HIVE中来的,他是一种元素集合,可以被分为多个partition
容错性:如果计算出错,那么RDD会自动从失败节点中恢复。
弹性:RDD的弹性在于如果你的数据有10w,但是partition只有5w,那么rdd就会将一部分数据写入磁盘,而对用户而言这是透明的,所以这就是RDD的弹性所在。
创建RDD
- 程序中的集合创建,然后用parallelize
- 本地文件创建
- HDFS创建
spark的两种操作分别为transformation和action,其中transformation有lazy特性,如果程序中只有transformation那么即使执行spark,该操作也不会执行,这样spark就能对底层进行优化了。而action会触发一个job。reduceByKey也是transformation算子,foreach是action算子。也就是说整个transformation都在driver上执行,只有action之后,才会提交task到executor上面,之后executor再去hdfs上面读取数据,然后executor再依次执行transformation算子。P40
transformation:map filter flatmap groupByKey reduceByKey sortByKey join cogroup
action: reduce collect count take saveAsTextFile countByKey foreach
foreach是在远程集群执行的,所以性能要比collect,countByKey高很多
RDD持久化:
例如要分别执行两次count,但是第二次的count执行的时候由于第一次之后rdd被其他的数据占用,而导致第二次count又要重新计算一次,又要重新从hdfs中读出,非常浪费资源,所以就要把RDD放到缓存中持久化。
spark在shuffle是也会自动进行持久化,主要是为了节点的失败避免重新计算整个过程
cache()和persist()需要在创建textfile之后直接调用,不然没有效果
MEMORY_ONLY_SER消耗的空间比MEMORY_ONLY小,因为直接将java对象序列化,但是反序列要消耗cpu
MEMORY_ONLY_2则是将序列号保存到别的节点可以恢复失败的节点
共享变量
如果一个算子要用到外部变量,那么这个变量就会被拷贝到task中,task想要共享这个变量是不可能的。所以spark提供了两种,Boardcast Variable 针对读 给每一个executor一份那么里面的所有task就可以共享这个变量了 和 Accumulator 针对写
sc.boardcast(factor)
driver中可以调用accumulate.value
二次排序
要实现ordered和serializable接口
Spark原理 P58
application:我们自己编写的spark
用spark-submit(standalone模式)提交application启动driverAction,
构造出sparkContext,
之后sparkContext构造出DAGScheduler和TaskScheduler,
TaskScheduler去找Master注册application,
之后Master用资源调度算法为Master启动Executor,(Master资源调度算法)
之后Executor反向注册到TaskScheduler所有的Executor注册完后,
Driver结束sparkContext的初始化,
每执行一个action就会创建一个job
job会提交给DAGScheduler
DAGScheduler会将Job划分为多个stage(stage划分算法)
每个stage创建一个TaskSet
然后把TaskSet给TaskScheduler
TaskScheduler把TaskSet中的每一个task提交给每个Executor(task分配算法)
Executor收到task用TaskRunner封装task,然后从线程池中取出一个线程池执行
TaskRunner将代码拷贝执行反序列化
Task有两种:ShuffleMapTask和ResultesultTask,最后一个stage是ResultesultTask其他的都是ShuffleMapTask
每个task处理一个partition
宽依赖和窄依赖
窄依赖Narrow Dependency:每个partition仅仅依赖于一个父partition
宽依赖Shuffle Dependency :相反
spark有三种提交模式
standalone
yarn-cluster :反向注册到一个NodeManager创建Driver
yarn-client:反向注册到本地客户端创建Driver
yarn-client 本地的driver会与yarn产生超大流量,但是本地可以看到所有的log,方便调试
yarn-cluster 用于生产,缺点在于调试不方便
SparkContext
Master
- 主备切换:基于文件系统或zookeeper
-
注册机制:worker driver application分别向master注册
- 状态改变
- 资源调度
application调度机制:threadOutApps和非threadOutApps
调用reduceByKey会触发隐式转换,去找rddToPairRDDFunctions将RDD转换成PairRDDFunctions
DAGScheduler P70
reduceByKey 有三个RDD MapPartitionRDD,shuffleRDD ,MapPartitionRDD,所以它可以是触发两个stage的原因
stage划分:如果上一个RDD是窄依赖就划分到当前的stage,如果是宽依赖就划分到新的stage
Task分配算法
遍历每一种本地化级别、
PROCESS_LOCAL:partition和task进入同一个executor
NODE_LOCAL:不在同一个executor,进程,但是在一个节点
NO_RREF没有本地化
RACK_LOCAL机架本地化
ANY任意
Executor
启动的Executor是CoarseGrainedExecutorBackend
Task
shuffle
- 如果内存缓存不够了,就会一点一点把内存写入磁盘,所以会导致过多的磁盘操作
- 与MapReduce不同,Spark默认不会对数据排序,所以spark能边拉去边聚合,但是MapReduce会,所以spark会快很多,但是spark不能直接处理value算子,需要要先用MapParititionRDD,再用map算子,所以会不方便
BlocklockManager
CacheManager
Checkpoint
持久化是将数据保存到BlockManager中,但是checkpoint改变了依赖。
持久化保存在内存或者磁盘中,所以容易丢失。
但是checkpoint保存在高容错的文件系统中,如HDFS,所以丢失的可能行比较低。
要多checkpoint的rdd先persist(DISK_ONLY),如果不这样,中间rdd由于没有持久化,在写入磁盘的时候需要重新计算一次
Spark性能调优
内存都花到哪里了?
- java的对象头占了16个字节,有些integer对象的头比本身的数据还要打
- string使用char数组保存的,还需要保存数组长度等信息,utf-16每个字符需要连个字节
- Hash和LinkedList这种内部使用链表的数据结构都是用Entry来保存的,Entry也是个对象
kryo序列化
java序列化大对象导致序列化缓慢,但是java序列化比较便捷
而kryo序列化速度快而且占用的内存空间小
val conf = new SparkConf().setMaster().setAppName()
conf.registerKryoClasses(Array(classOf[Counter])) //要注册的原因是全类名比较长,也容易消耗内存
val sc = new SparkConf(conf)
数据结构优化
优先使用数组而不是集合
将HashMap和List拼成特殊格式的字符串
用json代替多层嵌套对象
对多次操作RDD持久化或checkpoint
如果要持久化数据,放在blockmanager中,在可能丢失还要保持高性能,那么干脆第一次计算RDD就checkpoint
虚拟机优化
如果内存不够,会频繁GC,导致task工作线程停止,性能消耗很大
Executor划了60%的内存给RDD缓存,40%给task运行
所以可以改变这个参数,这样可以降低GC发生的频率
可以使用序列化优化rdd partition.
SparkConf().set("spark.storage.memoryFraction","0.5")
Eden去太小会迅速占满老年代,从而引起full gc导致性能下降,所以要调大Eden区,如HDFS的block块64M,序列化之后为3倍,一个Executor中有4个task,所以就要设置为500M
也可以调大survival的大小
广播共享优化
本地化调优 P93
spark.locality
reduceByKey的性能比groupByKey要好,因为他可以本地聚合,所以能用reduceByKey就用reduceByKey
Shuffle优化
用了consolidation还可以减少磁盘IO,如果原来有100ShuffleMapTask那就要用100个里面拉取,但是用了consolidation比如有10个 cpu core 那么就可以只用10个里面拉取
还可以用spark.reducer.maxSizeInFilght加大内存,这样也可以减少拉取次数
同样提高spark.shuffle.file.buffer,可以加大bucket的数量
spark.shuffle.io.maxRetries和spark.shuffle.io.retryWait增加拉取失败重复次数和重试间隔,因为有的时候task正在full gc,会导致task停止,所以可能拉取失败
Spark SQL
原来是hive,但是hive基于MapReduce太慢了,后来spark出了shark,但是shark底层是依赖hive的,所以最后spark出了Spark SQL
内存列存储,也就是一个列为一个对象,减少内存的消耗,避免了大量的GC
列存储的好处:可以跳过不符合的数据,都是同一种数据可以压缩,
RDD转换为Dataframe
- 用反射推断特定数据类型
- 动态创建一个元数据
SparkSQL
组内行号row_number()
udf User-Defined Functions
优化
如果数据量过大,那么尽量不要用collect,而用foreach
用缓存表
广播join表:小于某个阈值的表会被广播出去
spark streaming
updateStateByKey : 会从缓存中拿数据
transform:可以过滤广告黑名单
updateStateByKey 必须设置一个checkpoint目录,不然的话,如果宕机又要重新开始计算
Spark的容错机制是通过血缘关系来保证的
Kafka的Direct可以实现容错机制
spark和kafka
网友评论