Dispatcher类图属性
image.png- endpoints是一个ConcurrentMap[String, MessageLoop],负责存储 endpoint name 和MessageLoop的映射关系。
- endpointRefs: ConcurrentMap[RpcEndpoint, RpcEndpointRef] 包含了 RpcEndpoint 和 RpcEndpointRef 的映射关系。
- stopped 标志 Dispatcher 是否已经停止了
-sharedLoop是SharedMessageLoop实例对象,负责处理收到的消息
RPC 服务端启动时调用setupEndpoint
override def setupEndpoint(name: String, endpoint: RpcEndpoint): RpcEndpointRef = {
dispatcher.registerRpcEndpoint(name, endpoint)
}
实际调用的是Dispatcher的registerRpcEndpoint方法
def registerRpcEndpoint(name: String, endpoint: RpcEndpoint): NettyRpcEndpointRef = {
// 构造RpcEndpointAddress
val addr = RpcEndpointAddress(nettyEnv.address, name)
// 构造endpointRef 引用
val endpointRef = new NettyRpcEndpointRef(nettyEnv.conf, addr, nettyEnv)
synchronized {
if (stopped) {
throw new IllegalStateException("RpcEnv has been stopped")
}
if (endpoints.containsKey(name)) {
throw new IllegalArgumentException(s"There is already an RpcEndpoint called $name")
}
// This must be done before assigning RpcEndpoint to MessageLoop, as MessageLoop sets Inbox be
// active when registering, and endpointRef must be put into endpointRefs before onStart is
// called.
// 保存endpoint和endpointRef引用对应关系
endpointRefs.put(endpoint, endpointRef)
var messageLoop: MessageLoop = null
try {
messageLoop = endpoint match {
case e: IsolatedRpcEndpoint =>
new DedicatedMessageLoop(name, e, this)
case _ =>
// SharedMessageLoop 构造Inbox对象,维护name和endpoint映射关系
sharedLoop.register(name, endpoint)
sharedLoop
}
endpoints.put(name, messageLoop)
} catch {
case NonFatal(e) =>
endpointRefs.remove(endpoint)
throw e
}
}
endpointRef
}
Dispatcher处理消息
最终都是调用postMessage方法进行消息处理
private def postMessage(
endpointName: String,
message: InboxMessage,
callbackIfStopped: (Exception) => Unit): Unit = {
val error = synchronized {
val loop = endpoints.get(endpointName)
if (stopped) {
Some(new RpcEnvStoppedException())
} else if (loop == null) {
Some(new SparkException(s"Could not find $endpointName."))
} else {
loop.post(endpointName, message)
None
}
}
// We don't need to call `onStop` in the `synchronized` block
error.foreach(callbackIfStopped)
}
主要是loop.post(endpointName, message)方法,进一步调度SharedMessageLoop的post方法,比较简单,就是把message放入到LinkedBlockingQueue队列中
override def post(endpointName: String, message: InboxMessage): Unit = {
val inbox = endpoints.get(endpointName)
inbox.post(message)
setActive(inbox)
}
protected final def setActive(inbox: Inbox): Unit = active.offer(inbox)
SharedMessageLoop会启动一个线程池执行receiveLoopRunnable run方法
/** Thread pool used for dispatching messages. */
override protected val threadpool: ThreadPoolExecutor = {
val numThreads = getNumOfThreads(conf)
val pool = ThreadUtils.newDaemonFixedThreadPool(numThreads, "dispatcher-event-loop")
for (i <- 0 until numThreads) {
pool.execute(receiveLoopRunnable)
}
pool
}
protected val receiveLoopRunnable = new Runnable() {
override def run(): Unit = receiveLoop()
}
private def receiveLoop(): Unit = {
try {
while (true) {
try {
val inbox = active.take()
if (inbox == MessageLoop.PoisonPill) {
// Put PoisonPill back so that other threads can see it.
setActive(MessageLoop.PoisonPill)
return
}
inbox.process(dispatcher)
} catch {
case NonFatal(e) => logError(e.getMessage, e)
}
}
} catch {
case _: InterruptedException => // exit
case t: Throwable =>
try {
// Re-submit a receive task so that message delivery will still work if
// UncaughtExceptionHandler decides to not kill JVM.
threadpool.execute(receiveLoopRunnable)
} finally {
throw t
}
}
}
}
PoisonPill 是一个空的EndpointData对象,起了一个标志位的作用,如果想要停止 Diapatcher ,会把PoisonPill 喂给 receiver 吃,当threadpool 执行 MessageLoop 任务时, 吃到了毒药,马上退出,线程也就死掉了。再调用 inbox.process(dispatcher)方法,处理message
/**
* Process stored messages.
*/
def process(dispatcher: Dispatcher): Unit = {
var message: InboxMessage = null
inbox.synchronized {
if (!enableConcurrent && numActiveThreads != 0) {
return
}
message = messages.poll()
if (message != null) {
numActiveThreads += 1
} else {
return
}
}
while (true) {
safelyCall(endpoint) {
message match {
case RpcMessage(_sender, content, context) =>
try {
endpoint.receiveAndReply(context).applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
} catch {
case e: Throwable =>
context.sendFailure(e)
// Throw the exception -- this exception will be caught by the safelyCall function.
// The endpoint's onError function will be called.
throw e
}
case OneWayMessage(_sender, content) =>
endpoint.receive.applyOrElse[Any, Unit](content, { msg =>
throw new SparkException(s"Unsupported message $message from ${_sender}")
})
case OnStart =>
endpoint.onStart()
if (!endpoint.isInstanceOf[ThreadSafeRpcEndpoint]) {
inbox.synchronized {
if (!stopped) {
enableConcurrent = true
}
}
}
case OnStop =>
val activeThreads = inbox.synchronized { inbox.numActiveThreads }
assert(activeThreads == 1,
s"There should be only a single active thread but found $activeThreads threads.")
dispatcher.removeRpcEndpointRef(endpoint)
endpoint.onStop()
assert(isEmpty, "OnStop should be the last message")
case RemoteProcessConnected(remoteAddress) =>
endpoint.onConnected(remoteAddress)
case RemoteProcessDisconnected(remoteAddress) =>
endpoint.onDisconnected(remoteAddress)
case RemoteProcessConnectionError(cause, remoteAddress) =>
endpoint.onNetworkError(cause, remoteAddress)
}
}
inbox.synchronized {
// "enableConcurrent" will be set to false after `onStop` is called, so we should check it
// every time.
if (!enableConcurrent && numActiveThreads != 1) {
// If we are not the only one worker, exit
numActiveThreads -= 1
return
}
message = messages.poll()
if (message == null) {
numActiveThreads -= 1
return
}
}
}
}
Diapatcher 的职责变得单一,只需要把数据分发就可以了。具体分发数据要如何处理的问题留给了 Inbox,Inbox 把关注点放在了 如何处理这些消息上。考虑并解决了 一次性批量处理消息问题、多线程安全问题、异常抛出问题,多消息分支处理问题等等问题
Outbox和Inbox类似,两者区别endpoint和endpointRef是在同一个机器上,message放入Inbox上,否则放入到OutBox上
image.png
网友评论