Application总共10 core的话,使用5个Executor每个2核要优于10个Executor每个1核,Executor启动需要时间
1 广播变量
当Executor中使用到了Driver的变量,不使用广播变量,有多少task就有多少变量副本;使用广播变量,每个Executor只有 一个副本
2 累加器
/**
* 累加器 accumulator
* 1.累加器只能在driver端定义 初始化
* 2.累加器取值.value只能在driver端
* 分布式没有全局变量的概念
*/
object PBroadcast {
def main(args: Array[String]): Unit = {
val conf = new SparkConf().setMaster("local").setAppName("test")
val sc = new SparkContext(conf)
// 初始值可以是对象(涉及自定义累加器)或者字符串
val accumulator: Accumulator[Int] = sc.accumulator(0)
val rdd = sc.textFile("./words")
// 使用变量统计行数
// var i = 0
rdd.map(s=>{
// executor端执行
// i不回收到driver
// i += 1
accumulator.add(1)
// 分区中取值无意义
// println(accumulator)
s
}).collect()
// i为driver端
// println(i)
println(accumulator.value)
}
}
3 SparkShell
# 启动
./spark-shell --master spark://node-01:7877
# Spark context available as sc.
sc.textFile("hdfs://node-01:9820/spark/data/words").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect()
# 指定日志目录
./spark-shell --master spark://node-01:7877 --conf spark.eventLog.enabled=true --conf spark.eventLog.dir=hdfs://node-01:8020/spark/log
# 配置文件中修改
cp spark-defaults.conf.template spark-defaults.conf
vi spark-defaults.conf
--------------
spark.eventLog.enabled true
spark.eventLog.dir hdfs://node-01:8020/spark/log
# 配置历史日志
spark.history.fs.logDirectory hdfs://node-01:8020/spark/log
# 日志压缩
spark.eventLog.compress true
--------------
# node-03启动日志服务器
./start-history-server.sh
4 shuffle
reduceByKey把每个key的value聚合,结果每个key对应一个value
shuffle read
上个stage相同的key写入同一分区文件,分区数取决于reduce task的并行度
shuffle write
reduce task寻找属于自己的分区文件
hash shuffle
map task按key的hash值写入buffer缓冲区,buffer 32k,每个buffer溢写磁盘小文件,shuffle完才聚合
文件数 = M * R
文件多 -> 频繁GC OOM 跨节点连接断掉几率增大
优化
每个core的executor共用buffer
文件数 = C * R
sort shuffle
为了解决hash shuffle小文件多的问题而引入
文件数 = 2 * M
sort shuffle by pass
不排序
5 shuffle寻址
![](https://img.haomeiwen.com/i5880229/ee577aa16ed479af.png)
MapOutputTracker
管理磁盘小文件,主(Driver)从(Executor)关系
BlockManager
管理块,主从
BlockManagerMaster(Driver)
- DiskStore 管理磁盘
- MemoryStore 管理内存
- ConnectionManager 连接其他BlockManager
- BlockTransferService 拉取数据
BlockManagerWorker(Executor)
- DiskStore 管理磁盘
- MemoryStore 管理内存
- ConnectionManager 连接其他BlockManager
- BlockTransferService 拉取数据
6 内存管理
静态内存管理
spark1.6之前是静态内存管理
![](https://img.haomeiwen.com/i5880229/3e9549da0c7b169e.png)
shuffle第一次内存会oom
shuffle第二次内存不够不会oom,会溢写磁盘
解决oom
- 减少数据拉取
- 增大executor总内存
统一内存管理
![](https://img.haomeiwen.com/i5880229/7e6b7d66ef3acc58.png)
shuffle和持久化内存可以互相借用
spark1.6默认统一内存管理
6 shuffle调优
建议提交任务时设置
增大shuffle buffer
增大read拉取的buffer
增加拉取重试次数
加长重试等待间隔
设置shuffle机制 hash | sort,是否排序(by pass)
hash shuffle合并文件
7 master ha
两种方式: FileSystem ZK
master挂掉不影响已经运行的task,但不能再提交任务
FileSystem需要手动切换恢复,推荐ZK
# node-01修改配置 conf/spark-env.sh
---------
export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=node-02:2181,node-03:2181,node-04:2181 -Dspark.deploy.zookeeper.dir=/var/spark"
---------
# 配置发送到node-02 node-03
# node-02配置master地址 conf/spark-env.sh
------------
# master地址
export SPARK_MASTER_IP=node-02
------------
# node-01启动集群
# node-02单独启动master
./sbin/start-master.sh
# kill node-01的master可以看到node-02由standby变为active
# ha提交任务时需要指定两台机器
./spark-submit --master spark://node-01:7877,node-02:7877 ...
# master切换不影响已经运行的task
# client模式连不上master会自动切换
网友评论