概述
Spark主程序的入口。一个SparkContext代表连接Spark集群,并且能用来创建RDD,累加器,广播变量在集群上。
一个JVM只能有一个SparkContext。不过这个限制可能会被移除详情见 SPARK-2243 for more details.
源码分析
A)SparkContext概述
// The call site where this SparkContext was constructed.
private val creationSite: CallSite = Utils.getCallSite()
/**
* When called inside a class in the spark package, returns the name of the user code class
* (outside the spark package) that called into Spark, as well as which Spark method they called.
* This is used, for example, to tell users where in their code each RDD got created.
*
* @param skipClass Function that is used to exclude non-user-code classes.
*/
def getCallSite(skipClass: String => Boolean = sparkInternalExclusionFunction): CallSite = {
// Keep crawling up the stack trace until we find the first function not inside of the spark
// package. We track the last (shallowest) contiguous Spark method. This might be an RDD
// transformation, a SparkContext function (such as parallelize), or anything else that leads
// to instantiation of an RDD. We also track the first (deepest) user method, file, and line.
var lastSparkMethod = "<unknown>"
var firstUserFile = "<unknown>"
var firstUserLine = 0
var insideSpark = true
val callStack = new ArrayBuffer[String]() :+ "<unknown>"
Thread.currentThread.getStackTrace().foreach { ste: StackTraceElement =>
// When running under some profilers, the current stack trace might contain some bogus
// frames. This is intended to ensure that we don't crash in these situations by
// ignoring any frames that we can't examine.
if (ste != null && ste.getMethodName != null
&& !ste.getMethodName.contains("getStackTrace")) {
if (insideSpark) {
if (skipClass(ste.getClassName)) {
lastSparkMethod = if (ste.getMethodName == "<init>") {
// Spark method is a constructor; get its class name
ste.getClassName.substring(ste.getClassName.lastIndexOf('.') + 1)
} else {
ste.getMethodName
}
callStack(0) = ste.toString // Put last Spark method on top of the stack trace.
} else {
if (ste.getFileName != null) {
firstUserFile = ste.getFileName
if (ste.getLineNumber >= 0) {
firstUserLine = ste.getLineNumber
}
}
callStack += ste.toString
insideSpark = false
}
} else {
callStack += ste.toString
}
}
}
功能描述:获取当前SparkContext的当前调用栈,将栈里最高进栈底的属于Spark或者Scala核心的类压入callStack的栈顶,并将此类的方法存入lastSparkMethod。将栈里最靠近栈顶的用户类放入callStack,将此类的行号存入firstUserLine,类名存入firstUserFile,最终返回样例类CallSite存储了最短栈和长度默认为20的最长栈的样例。
// If true, log warnings instead of throwing exceptions when multiple SparkContexts are active
private val allowMultipleContexts: Boolean =
config.getBoolean("spark.driver.allowMultipleContexts", false)
SparkContext.markPartiallyConstructed(this, allowMultipleContexts)
功能描述SparkContext默认只有一个实例(由属性spark.driver.allowMultipleContexts来控制),用户需要多个SparkContext实例时,可以构建为true,方法markPartiallyConstructed用来确保实例的唯一性,并将标记为正在构建中。
接下来对SparkConf进行复制,对各种信息校验。
_conf = config.clone()
_conf.validateSettings()
if (!_conf.contains("spark.master")) {
throw new SparkException("A master URL must be set in your configuration")
}
if (!_conf.contains("spark.app.name")) {
throw new SparkException("An application name must be set in your configuration")
}
从上面可以看出来,创建SparkContext必须设置Master和App.Name。否者会抛出异常。
B)创建执行环境SparkEnv
def isLocal: Boolean = Utils.isLocalMaster(_conf)
//An asynchronous listener bus for Spark events
private[spark] def listenerBus: LiveListenerBus = _listenerBus
//Create the Spark execution environment (cache, map output tracker, etc)
_env = createSparkEnv(_conf, isLocal, listenerBus)
上述代码中conf是SparkConf,isLocal标识是否是单机模式,ListenerBus是采用监听器模式维护各类时间的处理。
1.创建安全管理器SecurityManager
2.创建基于Akka的分布式消息系统ActorSystem
3.创建Map任务输出跟踪器MapOutputTracker
4.实例化ShuffleManager
5.创建ShuffleMemoryManager
6.创建块传输服务BlockTransferService
7.创建BlocakManagerMaster
8.创建块管理器BlockManager
9.创建广播管理器BroadcastManager
10.创建缓存管理器CacheManager
11.创建Http文件服务器HttpFileServer
12.创建测量系统MetricsSystem
13.创建SparkEnv
C)安全管理器SecurityManager
对权限,账号进行设置,如果使用Yarn。则需要生成secret key登陆,最后给当前系统设置默认口令认证实例。
D)基于Akka的分布式消息系统
scala认为java线程通过共享数据以及通过锁来维护数据以及线程安全很糟糕。而且容易引起锁的征用,降低并发性能。甚至会有死锁。在Scala中只要字定义类型继承Actor,并且提供act方法,就如果java实现Runnable接口实现run方法一样,但是不能直接调用act方法,而是通过发送消息的方式。传递数据。如!
Actor ! message
E)MapOutputTracker
每个map任务或者reduce任务都会有一个唯一标识,分别为reduceid和mapid,每个reduce任务的输入可能是多个map任务的输出,reduce会到各个map任务上拉去block,这个过程叫shuffle。每批shuffle都由shuffleid。
MapOutputTrackerMaster内部使用mapStatuses:TimeStampedHshMap[Int,Array[MapStatus]]来维护跟踪各个map任务的输出其中key对应shuffleid。
F)实例化ShuffleManager
shuffleManager负责管理本地以及远程的block数据的shuffle操作。ShuffleManager默认为通过反射方式来生成SortShuffleManager的实例,可以修改属性为Hash使用HashShuffleManager。
SortShuffleManager通过indexShuffleBlockManager间接操作BlockManager中的DiskBlockManager将map结果写入本地。并根据shuffleid,mapid写入索引文件
// Let the user specify short names for shuffle managers
val shortShuffleMgrNames = Map(
"sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName,
"tungsten-sort" -> classOf[org.apache.spark.shuffle.sort.SortShuffleManager].getName)
val shuffleMgrName = conf.get("spark.shuffle.manager", "sort")
val shuffleMgrClass =
shortShuffleMgrNames.getOrElse(shuffleMgrName.toLowerCase(Locale.ROOT), shuffleMgrName)
val shuffleManager = instantiateClass[ShuffleManager](shuffleMgrClass)
G)块传输服务BlockTransferService
默认使用netty提供异步时间提供web客户端以及服务获取远程节点上Block的集合NettyBlockTransferService
创建TaskScheduler
// We need to register "HeartbeatReceiver" before "createTaskScheduler" because Executor will
// retrieve "HeartbeatReceiver" in the constructor. (SPARK-6640)
_heartbeatReceiver = env.rpcEnv.setupEndpoint(
HeartbeatReceiver.ENDPOINT_NAME, new HeartbeatReceiver(this))
// Create and start the scheduler
val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
_schedulerBackend = sched
_taskScheduler = ts
_dagScheduler = new DAGScheduler(this)
_heartbeatReceiver.ask[Boolean](TaskSchedulerIsSet)
// start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's
// constructor
_taskScheduler.start()
createTaskScheduler传入sc,master,depolyMode返回SchedulerBackend,TaskScheduler.
首先根据正则表达式判断Master URL
master match {
case "local" =>
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, 1)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_N_REGEX(threads) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*] estimates the number of cores on the machine; local[N] uses exactly N threads.
val threadCount = if (threads == "*") localCpuCount else threads.toInt
if (threadCount <= 0) {
throw new SparkException(s"Asked to run locally with $threadCount threads")
}
val scheduler = new TaskSchedulerImpl(sc, MAX_LOCAL_TASK_FAILURES, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_N_FAILURES_REGEX(threads, maxFailures) =>
def localCpuCount: Int = Runtime.getRuntime.availableProcessors()
// local[*, M] means the number of cores on the computer with M failures
// local[N, M] means exactly N threads with M failures
val threadCount = if (threads == "*") localCpuCount else threads.toInt
val scheduler = new TaskSchedulerImpl(sc, maxFailures.toInt, isLocal = true)
val backend = new LocalSchedulerBackend(sc.getConf, scheduler, threadCount)
scheduler.initialize(backend)
(backend, scheduler)
case SPARK_REGEX(sparkUrl) =>
val scheduler = new TaskSchedulerImpl(sc)
val masterUrls = sparkUrl.split(",").map("spark://" + _)
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
(backend, scheduler)
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave) =>
// Check to make sure memory requested <= memoryPerSlave. Otherwise Spark will just hang.
val memoryPerSlaveInt = memoryPerSlave.toInt
if (sc.executorMemory > memoryPerSlaveInt) {
throw new SparkException(
"Asked to launch cluster with %d MB RAM / worker but requested %d MB/worker".format(
memoryPerSlaveInt, sc.executorMemory))
}
val scheduler = new TaskSchedulerImpl(sc)
val localCluster = new LocalSparkCluster(
numSlaves.toInt, coresPerSlave.toInt, memoryPerSlaveInt, sc.conf)
val masterUrls = localCluster.start()
val backend = new StandaloneSchedulerBackend(scheduler, sc, masterUrls)
scheduler.initialize(backend)
backend.shutdownCallback = (backend: StandaloneSchedulerBackend) => {
localCluster.stop()
}
(backend, scheduler)
case masterUrl =>
val cm = getClusterManager(masterUrl) match {
case Some(clusterMgr) => clusterMgr
case None => throw new SparkException("Could not parse Master URL: '" + master + "'")
}
try {
val scheduler = cm.createTaskScheduler(sc, masterUrl)
val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
cm.initialize(scheduler, backend)
(backend, scheduler)
} catch {
case se: SparkException => throw se
case NonFatal(e) =>
throw new SparkException("External scheduler cannot be instantiated", e)
}
}
}
image.png
网友评论