Flink的另一个重要组件TaskManager的创建和JobManager的ActorSystem的创建过程类似,不再详细分析。这里以TaskManager的创建到取得与JobManager的联系的过程为主线,重点分析Flink的服务发现的设计方法。
以ZooKeeper集群管理方式为例,整个过程的流程大致可以用下图表示:
TaskManager createTaskManager创建首先得到TM地址和端口,并创建taskManagerActorSystem并以此创建taskManager的Actor以及主JobManager恢复对象leaderRetrievalService。taskManagerActor创建完成,在Actor的预开始方法中启动leaderRetrievalService,监听主JobManager的地址变化:
override def preStart(): Unit = {
log.info(s"Starting TaskManager actor at ${self.path.toSerializationFormat}.")
log.info(s"TaskManager data connection information: $connectionInfo")
log.info(s"TaskManager has $numberOfSlots task slot(s).")
// log the initial memory utilization
if (log.isInfoEnabled) {
log.info(MemoryLogger.getMemoryUsageStatsAsString(ManagementFactory.getMemoryMXBean))
}
try {
leaderRetrievalService.start(this)
} catch {
case e: Exception =>
log.error("Could not start leader retrieval service.", e)
throw new RuntimeException("Could not start leader retrieval service.", e)
}
}
在leader JobManager 发生变化时,leaderRetrievalService 会调用TaskManager的notifyLeaderAddress方法,并最终向TaskManagerActor发送TriggerTaskManagerRegistration,
在该消息的处理方法中,TaskManagerActor向JobManagerActor注册自己的信息:
private def handleRegistrationMessage(message: RegistrationMessage): Unit = {
message match {
case TriggerTaskManagerRegistration
...
...
val jobManager = context.actorSelection(jobManagerURL)
jobManager ! decorateMessage(
RegisterTaskManager(
resourceID,
connectionInfo,
resources,
numberOfSlots)
)
...
}
在jobManager的handleMessage方法中可以看到对RegisterTaskManager消息的处理,这里不再深入分析。下面来看leader JobManager的监听组件LeaderRetrievalService 的设计:
implemtes of leaderRetrievalServiceLeaderRetrievalService 接口有StandaloneLeaderRetrievalService和ZooKeeperLeaderRetrievalService两个实现类,以ZooKeeperLeaderRetrievalService为例,该类同样实现NodeCacheListener接口,通过创建NodeCache对ZooKeeper节点的监听,并在重写的nodeChanged()方法中实现Leader JobManager信息的读取并通知TaskManager。
在ZooKeeper的节点数据发生变化时,ZooKeeperLeaderRetrievalService通知监听者,监听者需实现LeaderRetrievalListener接口,这里的监听者即TaskManager对象。TaskManager实现了LeaderRetrievalListener接口:
/**
* Classes which want to be notified about a changing leader by the {@link LeaderRetrievalService}
* have to implement this interface.
*/
public interface LeaderRetrievalListener {
/**
* This method is called by the {@link LeaderRetrievalService} when a new leader is elected.
*
* @param leaderAddress The address of the new leader
* @param leaderSessionID The new leader session ID
*/
void notifyLeaderAddress(String leaderAddress, UUID leaderSessionID);
void handleError(Exception exception);
}
TaskManager实例通过LeaderRetrievalService 对象的start方法传入,在LeaderRetrievalService重写的nodeChanged()方法中可以看到TaskManager的notifyLeaderAddress方法被调用:
if(!(Objects.equals(leaderAddress, lastLeaderAddress) &&
Objects.equals(leaderSessionID, lastLeaderSessionID))) {
LOG.debug(
"New leader information: Leader={}, session ID={}.",
leaderAddress,
leaderSessionID);
lastLeaderAddress = leaderAddress;
lastLeaderSessionID = leaderSessionID;
leaderListener.notifyLeaderAddress(leaderAddress, leaderSessionID);
}
这里的leaderListener即为传入的TaskManager实例。可以看到,方法中将ZooKeeper上的leaderAddress和leaderSessionID交给了TaskManager,TM以此触发向JobManager的注册。
小结
这里分析了TaskManager的初始化过程,并没有涉及所有组件,而只是重点看了TM向JobManager的注册过程。对应的,JM端实现了Leader JobManager的选举,这部分将在后面继续来看。
原创文章,原文到我的博客
更多关注公众号:
网友评论