1.spark运行结构
1.application 包括driver和分布在不同集群不同节点上的executor代码
2.driver指的是application中的main函数并且创建的sparkcontext,sparkcontext作用是创建运行环境
spark的运行环境是什么?
一个CoarseGrainedExecutorBackend只有一个executor
spark on yarn

2.序列化问题(闭包问题)与线程安全问题
在driver端初始化了一个object或class实例,要在executor运行,必须实现序列化接口
如果实例是object类型,则每个executor共享一个,如果是class类型,及new了一个实例,则一个task一个实例
在函数中初始化实例,如果是单例的,则一个进程(executor)只有一个实例,如果是class类型,看调用的算子,如果是map,则没来一条数据就new 一个,如果是mappartition,则一个分区一个实例
多个线程共用一个变量,会出现线程安全问题,例如:
object DateUntil {
private val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
def parse(st: String): Long = {
val date = format.parse(st)
date.getTime
}
}
加锁改进:
object DateUntil {
private val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
def parse(st: String): Long = synchronized {
val date = format.parse(st)
date.getTime
}
}
效果不好,改进:mappartition算子,一个分区一个simpledateformat
rdd.mapPartitions(data => {
val format = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss")
data.map(txt => {
val date = format.parse(txt)
date.getTime
})
})
3.shuffle

在join之前调用groupbykey,下游的数据明确知道要拉取的数据的分区,就没有shuffle,就无需划分stage,就是窄依赖.

网友评论