Dispatcher启动
Dispatcher启动流程嵌套较多,主要会启动Dispatcher组件,同时拉起jobmaster;附上详细的嵌套图供参考:

首先接上篇,
DefaultDispatcherResourceManagerComponentFactory中创建dispatcherRunner
dispatcherRunner =
dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElectionService(),
fatalErrorHandler,
new HaServicesJobGraphStoreFactory(highAvailabilityServices),
ioExecutor,
rpcService,
partialDispatcherServices);
最终经过各种套娃,创建DispatcherRunnerLeaderElectionLifecycleManager对象:
private DispatcherRunnerLeaderElectionLifecycleManager(
T dispatcherRunner, LeaderElectionService leaderElectionService) throws Exception {
this.dispatcherRunner = dispatcherRunner;
this.leaderElectionService = leaderElectionService;
/**
* han_pf
*执行leader选举,选举成功调用dispatcherRunner的leaderContender.grantLeadership方法
*jm启动的三大组件都需要进行leader选举,
*
*/
leaderElectionService.start(dispatcherRunner);
}

此处选举成功后会调用DefaultDispatcherRunner.grantLeadership(UUID leaderSessionID)方法,执行选举后的逻辑。
DefaultDispatcherRunner:
@Override
public void grantLeadership(UUID leaderSessionID) {
runActionIfRunning(
() -> {
LOG.info(
"{} was granted leadership with leader id {}. Creating new {}.",
getClass().getSimpleName(),
leaderSessionID,
DispatcherLeaderProcess.class.getSimpleName());
startNewDispatcherLeaderProcess(leaderSessionID);
});
}
DefaultDispatcherRunner会创建出一个DispatcherLeaderProcess然后启动
private void startNewDispatcherLeaderProcess(UUID leaderSessionID) {
/**
* han_pf
*停止已有的,异步操作
*/
stopDispatcherLeaderProcess();
/**
* han_pf
*创建新的,只是一个对象的创建,继续往下看start方法
*/
dispatcherLeaderProcess = createNewDispatcherLeaderProcess(leaderSessionID);
final DispatcherLeaderProcess newDispatcherLeaderProcess = dispatcherLeaderProcess;
FutureUtils.assertNoException(
previousDispatcherLeaderProcessTerminationFuture.thenRun(
newDispatcherLeaderProcess::start));
}
如上会调用
AbstractDispatcherLeaderProcess的实现类的onStart方法,
@Override
public final void start() {
runIfStateIs(State.CREATED, this::startInternal);
}
private void startInternal() {
log.info("Start {}.", getClass().getSimpleName());
state = State.RUNNING;
/**
* han_pf
*执行实现类的onStart方法,实现类有两个SessionDispatcherLeaderProcess和JobDispatcherLeaderProcess,
* 次数是session模式,所以看SessionDispatcherLeaderProcess
*/
onStart();
}
SessionDispatcherLeaderProcess:
@Override
protected void onStart() {
/**
* han_pf
*jobGraphStore服务的启动,其实启动一个监听,在主节点迁移时恢复jobGraph执行
*/
startServices();
onGoingRecoveryOperation =
recoverJobsAsync()
/**
* han_pf
*createDispatcherIfRunning 核心方法
*/
.thenAccept(this::createDispatcherIfRunning)
.handle(this::onErrorIfRunning);
}
/**
* han_pf
*一路在创建
*/
private void createDispatcher(Collection<JobGraph> jobGraphs) {
final DispatcherGatewayService dispatcherService =
dispatcherGatewayServiceFactory.create(
DispatcherId.fromUuid(getLeaderSessionId()), jobGraphs, jobGraphStore);
completeDispatcherSetup(dispatcherService);
}
最终调用DefaultDispatcherGatewayServiceFactory.create方法创建并启动dispatcher
RpcEndpoint
public AbstractDispatcherLeaderProcess.DispatcherGatewayService create(
DispatcherId fencingToken,
Collection<JobGraph> recoveredJobs,
JobGraphWriter jobGraphWriter) {
final Dispatcher dispatcher;
try {
dispatcher =
dispatcherFactory.createDispatcher(
rpcService,
fencingToken,
recoveredJobs,
(dispatcherGateway, scheduledExecutor, errorHandler) ->
new NoOpDispatcherBootstrap(),
PartialDispatcherServicesWithJobGraphStore.from(
partialDispatcherServices, jobGraphWriter));
} catch (Exception e) {
throw new FlinkRuntimeException("Could not create the Dispatcher rpc endpoint.", e);
}
/**
* han_pf
* 真正的启动
*/
dispatcher.start();
return DefaultDispatcherGatewayService.from(dispatcher);
}
其中Dispatcher是一个RpcEndpoint,构造方法执行完后将回调onStart()方法

Dispatcher:
@Override
public void onStart() throws Exception {
try {
/**
* han_pf
*注册服务-metrics
*/
startDispatcherServices();
} catch (Throwable t) {
final DispatcherException exception =
new DispatcherException(
String.format("Could not start the Dispatcher %s", getAddress()), t);
onFatalError(exception);
throw exception;
}
/**
* han_pf
* 恢复原有任务,
* 启动dispatcher,
* 拉起jobmaster
*/
startRecoveredJobs();
this.dispatcherBootstrap =
this.dispatcherBootstrapFactory.create(
getSelfGateway(DispatcherGateway.class),
this.getRpcService().getScheduledExecutor(),
this::onFatalError);
}
onstart()方法会首先注册metrics服务,之后执行startRecoveredJobs()进行已有任务的恢复。同时Dispatcher也会接收客户端新提交的任务,根据任务信息拉起一个新的jobMaster负责任务的执行。
private void runRecoveredJob(final JobGraph recoveredJob) {
checkNotNull(recoveredJob);
try {
runJob(recoveredJob, ExecutionType.RECOVERY);
} catch (Throwable throwable) {
onFatalError(
new DispatcherException(
String.format(
"Could not start recovered job %s.", recoveredJob.getJobID()),
throwable));
}
}
/**
* han_pf
* 客户端新提交任务是也会通过webMonitorEndpoint的handler处理后转发到Dispatcher,最终也会执行到此方法。
*/
private void runJob(JobGraph jobGraph, ExecutionType executionType) throws Exception {
Preconditions.checkState(!runningJobs.containsKey(jobGraph.getJobID()));
long initializationTimestamp = System.currentTimeMillis();
JobManagerRunner jobManagerRunner =
createJobManagerRunner(jobGraph, initializationTimestamp);
//TODO 每个任务,对应一个jobManagerRunner
runningJobs.put(jobGraph.getJobID(), jobManagerRunner);
JobManagerRunner createJobManagerRunner(JobGraph jobGraph, long initializationTimestamp)
throws Exception {
final RpcService rpcService = getRpcService();
/**
* han_pf
* 只是对象的创建和需要的工厂类的赋值,jobMasterServiceFactory,SlotPoolServiceSchedulerFactory,jobManagerLeaderElectionService等
* 重要看start()方法
*jobManagerRunner实现类JobMasterServiceLeadershipRunner
*/
JobManagerRunner runner =
jobManagerRunnerFactory.createJobManagerRunner(
jobGraph,
configuration,
rpcService,
highAvailabilityServices,
heartbeatServices,
jobManagerSharedServices,
new DefaultJobManagerJobMetricGroupFactory(jobManagerMetricGroup),
fatalErrorHandler,
initializationTimestamp);
//
runner.start();
return runner;
}
}
JobMasterServiceLeadershipRunner:
@Override
public void start() throws Exception {
LOG.debug("Start leadership runner for job {}.", getJobID());
/**
* han_pf
* 又看到熟悉的leader选举方法
*/
leaderElectionService.start(this);
}
@Override
public void grantLeadership(UUID leaderSessionID) {
runIfStateRunning(
() -> startJobMasterServiceProcessAsync(leaderSessionID),
"starting a new JobMasterServiceProcess");
}
@GuardedBy("lock")
private void startJobMasterServiceProcessAsync(UUID leaderSessionId) {
sequentialOperation =
sequentialOperation.thenRun(
() ->
runIfValidLeader(
leaderSessionId,
ThrowingRunnable.unchecked(
() ->
/**
* han_pf
* 关键创建jobmaster并启动
*/
verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(
leaderSessionId)),
"verify job scheduling status and create JobMasterServiceProcess"));
handleAsyncOperationError(sequentialOperation, "Could not start the job manager.");
}
@GuardedBy("lock")
private void verifyJobSchedulingStatusAndCreateJobMasterServiceProcess(UUID leaderSessionId)
throws FlinkException {
final RunningJobsRegistry.JobSchedulingStatus jobSchedulingStatus =
getJobSchedulingStatus();
if (jobSchedulingStatus == RunningJobsRegistry.JobSchedulingStatus.DONE) {
jobAlreadyDone();
} else {
createNewJobMasterServiceProcess(leaderSessionId);
}
}
@GuardedBy("lock")
private void createNewJobMasterServiceProcess(UUID leaderSessionId) throws FlinkException {
Preconditions.checkState(jobMasterServiceProcess.closeAsync().isDone());
......
......
/**
* han_pf
* 又开始一路创建,目前还没看到jobMaster启动
* 到DefaultJobMasterServiceProcessFactory进行创建工作
*/
jobMasterServiceProcess = jobMasterServiceProcessFactory.create(leaderSessionId);
........
}
flink中的leader选举使用curatorframeworker,leader选举成功后最终是执行leaderContender的grandLeaderShip()方法. Jobmanager中有以下主要的leaderContender都会执行leader选举。

DefaultJobMasterServiceProcessFactory:
@Override
public JobMasterServiceProcess create(UUID leaderSessionId) {
return new DefaultJobMasterServiceProcess(
jobId,
leaderSessionId,
jobMasterServiceFactory,
cause -> createArchivedExecutionGraph(JobStatus.FAILED, cause));
}
DefaultJobMasterServiceFactory:
public DefaultJobMasterServiceProcess(
JobID jobId,
UUID leaderSessionId,
JobMasterServiceFactory jobMasterServiceFactory,
Function<Throwable, ArchivedExecutionGraph> failedArchivedExecutionGraphFactory) {
this.jobId = jobId;
this.leaderSessionId = leaderSessionId;
this.jobMasterServiceFuture =
jobMasterServiceFactory.createJobMasterService(leaderSessionId, this);
。。。
//异步操作
}
@Override
public CompletableFuture<JobMasterService> createJobMasterService(
UUID leaderSessionId, OnCompletionActions onCompletionActions) {
return CompletableFuture.supplyAsync(
FunctionUtils.uncheckedSupplier(
() -> internalCreateJobMasterService(leaderSessionId, onCompletionActions)),
executor);
}
private JobMasterService internalCreateJobMasterService(
UUID leaderSessionId, OnCompletionActions onCompletionActions) throws Exception {
/**
* han_pf
* 终于启动起来了,jobMaster也是一个RpcEndpoint,构造方法执行完后将执行生命周期方法,onStart()
*/
final JobMaster jobMaster =
new JobMaster(
rpcService,
JobMasterId.fromUuidOrNull(leaderSessionId),
jobMasterConfiguration,
ResourceID.generate(),
jobGraph,
haServices,
slotPoolServiceSchedulerFactory,
jobManagerSharedServices,
heartbeatServices,
jobManagerJobMetricGroupFactory,
onCompletionActions,
fatalErrorHandler,
userCodeClassloader,
shuffleMaster,
lookup ->
new JobMasterPartitionTrackerImpl(
jobGraph.getJobID(), shuffleMaster, lookup),
new DefaultExecutionDeploymentTracker(),
DefaultExecutionDeploymentReconciler::new,
initializationTimestamp);
jobMaster.start();
return jobMaster;
}
以上终于启动了jobMaster,jobMaster也是一个RpcEndpoint,构造方法执行完后将执行生命周期方法,onStart();主要的执行逻辑在onStart()方法中生成,
JobMaster:
@Override
protected void onStart() throws JobMasterException {
try {
startJobExecution();
} catch (Exception e) {
final JobMasterException jobMasterException =
new JobMasterException("Could not start the JobMaster.", e);
handleJobMasterError(jobMasterException);
throw jobMasterException;
}
}
/**
* han_pf
*核心执行两个方法,
* 1. 启动关键的两个服务taskManagerHeartbeatManager,resourceManagerHeartbeatManager
* 2.任务的调度
*
*/
private void startJobExecution() throws Exception {
validateRunsInMainThread();
JobShuffleContext context = new JobShuffleContextImpl(jobGraph.getJobID(), this);
shuffleMaster.registerJob(context);
/**
* han_pf
* 真正的启动jobmaster服务
*/
startJobMasterServices();
log.info(
"Starting execution of job '{}' ({}) under job master id {}.",
jobGraph.getName(),
jobGraph.getJobID(),
getFencingToken());
/**
* han_pf
* 启动调度器
*/
startScheduling();
}
jobMaster主要的方法startJobExecutio(),做两件大事,1. 第一个作为任务执行的master,创建与TaskManager的心跳管理器,启动slotPoolService 建立起与ResourceManager leader的联系。
- 启动任务调度器。
JobMaster主要的两个成员变量slotPoolService 资源分配流程,schedulerNG 任务调度执行流程;
在 JM 启动后,SlotPool 连接 ResourceManager,SchedulerNG 分配作业并向 scheduler 申请 slot,scheduler 根据不同策略向 SlotPool 申请资源,申请成功后 ExecutionVertex 进行 Deploy。
SchedulerNG默认实现类DefaultScheduler创建过程中会生成executionGraph,
JobMaster构造方法
RpcService rpcService,
JobMasterId jobMasterId,
JobMasterConfiguration jobMasterConfiguration,
ResourceID resourceId,
JobGraph jobGraph,
HighAvailabilityServices highAvailabilityService,
SlotPoolServiceSchedulerFactory slotPoolServiceSchedulerFactory,
JobManagerSharedServices jobManagerSharedServices,
HeartbeatServices heartbeatServices,
JobManagerJobMetricGroupFactory jobMetricGroupFactory,
OnCompletionActions jobCompletionActions,
FatalErrorHandler fatalErrorHandler,
ClassLoader userCodeLoader,
ShuffleMaster<?> shuffleMaster,
PartitionTrackerFactory partitionTrackerFactory,
ExecutionDeploymentTracker executionDeploymentTracker,
ExecutionDeploymentReconciler.Factory executionDeploymentReconcilerFactory,
long initializationTimestamp)
throws Exception {
resourceManagerLeaderRetriever =
highAvailabilityServices.getResourceManagerLeaderRetriever();
this.slotPoolService =
checkNotNull(slotPoolServiceSchedulerFactory).createSlotPoolService(jid);
this.shuffleMaster = checkNotNull(shuffleMaster);
this.schedulerNG =
createScheduler(
slotPoolServiceSchedulerFactory,
executionDeploymentTracker,
jobManagerJobMetricGroup,
jobStatusListener);
」
DefaultSchedulerFactory:
@Override
public SchedulerNG createInstance(
。。。。。
。。。
return new DefaultScheduler(
log,
jobGraph,
ioExecutor,
jobMasterConfiguration,
schedulerComponents.getStartUpAction(),
new ScheduledExecutorServiceAdapter(futureExecutor),
userCodeLoader,
new CheckpointsCleaner(),
checkpointRecoveryFactory,
jobManagerJobMetricGroup,
schedulerComponents.getSchedulingStrategyFactory(),
FailoverStrategyFactoryLoader.loadFailoverStrategyFactory(jobMasterConfiguration),
restartBackoffTimeStrategy,
new DefaultExecutionVertexOperations(),
new ExecutionVertexVersioner(),
schedulerComponents.getAllocatorFactory(),
initializationTimestamp,
mainThreadExecutor,
jobStatusListener,
executionGraphFactory,
shuffleMaster,
rpcTimeout);
」
最终是在SchedulerBase构造方法中生成executionGraph.,具体executionGraph生成过程中的细节后续专门分析四种Graph生成细节进行详细分析。
SchedulerBase:
/**
* han_pf
* executionGraph的构建
*/
this.executionGraph =
createAndRestoreExecutionGraph(
completedCheckpointStore,
checkpointsCleaner,
checkpointIdCounter,
initializationTimestamp,
mainThreadExecutor,
jobStatusListener);
schedulerNG.startScheduling()分析
DefaultScheduler
@Override
protected void startSchedulingInternal() {
log.info(
"Starting scheduling with scheduling strategy [{}]",
schedulingStrategy.getClass().getName());
transitionToRunning();
schedulingStrategy.startScheduling();
}
PipelinedRegionSchedulingStrategy
private void maybeScheduleRegion(
final SchedulingPipelinedRegion region,
final Map<ConsumedPartitionGroup, Boolean> consumableStatusCache) {
if (!areRegionInputsAllConsumable(region, consumableStatusCache)) {
return;
}
checkState(
areRegionVerticesAllInCreatedState(region),
"BUG: trying to schedule a region which is not in CREATED state");
final List<ExecutionVertexDeploymentOption> vertexDeploymentOptions =
SchedulingStrategyUtils.createExecutionVertexDeploymentOptions(
regionVerticesSorted.get(region), id -> deploymentOption);
/**
* han_pf
* 分配slot并deploy
*/
schedulerOperations.allocateSlotsAndDeploy(vertexDeploymentOptions);
}
以上,通过Dispatcher的创建,过程中恢复已有作业,同时会接收客户端提交的新的作业,然后拉起Jobmaster真正的管理作业,JobMaster会建立和ResourceManager、TaskManager心跳,用于申请任务所需要的slot和部署任务到对应slot中,关于最后slot的分配及任务的部署后续专门详细分析,通过此篇希望帮助各位梳理Dispacher的启动过程及主要用途,具体核心细节后续可专项分析。
网友评论