分析flink-on-yarn per-job flink集群整体调用链 flink-1.10
flink-on-yarnyarn-作业提交流程
- ./flink run -d -m yarn-cluster ./flinkExample.jar
1.vim bin/flink
exec $JAVA_RUN $JVM_ARGS "${log_setting[@]}" -classpath manglePathList ``"$CC_CLASSPATH:$INTERNAL_HADOOP_CLASSPATHS"
" org.apache.flink.client.cli.CliFrontend "$@"`
2.得到入口类
org.apache.flink.client.cli.CliFrontend[main]
// 3. load the custom command lines
final List<CustomCommandLine> customCommandLines = loadCustomCommandLines(
configuration,
configurationDirectory);
// Command line interface of the YARN session, with a special initialization here
// to prefix all options with y/yarn.
final String flinkYarnSessionCLI = "org.apache.flink.yarn.cli.FlinkYarnSessionCli";
try {
customCommandLines.add(
loadCustomCommandLine(flinkYarnSessionCLI,
configuration,
configurationDirectory,
"y",
"yarn"));
} catch (NoClassDefFoundError | Exception e) {
LOG.warn("Could not load CLI class {}.", flinkYarnSessionCLI, e);
}
parseParameters()->run()->executeProgram()-> PackagedProgram.java program.invokeInteractiveModeForExecution()->
3.调用到主类,用户代码主类
callMainMethod(mainClass, args)
-
用户代码
flink-stream-application
env.execute()
-
flink-client 端代码流程
flink-streaming-java StreamExecutionEnvironment.java
execute() streamGraph
flink-client AbstractJobClusterExecutor.java
execute()
1.streamGraph -> JobGraph
2.deployJobCluster -
flink-yarn
FlinkYarnSessionCli[main]->run->yarnClusterDescriptor.deploySessionCluster(clusterSpecification)->YarnClusterDescriptor.java.deploySessionCluster->deployInternal
return deployInternal(
clusterSpecification,
"Flink session cluster",
getYarnSessionClusterEntrypoint(),
null,
false);
注意这里有获取ClusterEntrypoint的方法,得到启动AM container 之后会调用该类的main方法,启动集群
YarnSessionClusterEntrypoint.java[main] ClusterEntrypoint.runClusterEntrypoint(yarnSessionClusterEntrypoint);->clusterEntrypoint.startCluster();-> ClusterEntrypoint.java runCluster(configuration);
1.检查环境check if required Hadoop environment,HADOOP_CONF_DIR||YARN_CONF_DIR
2.startAppMaster
This method will block until the ApplicationMaster/JobManager have been deployed on YARN
返回-集群客户端ApplicationReport
startAppMaster(
Configuration configuration,
String applicationName,
String yarnClusterEntrypoint,
JobGraph jobGraph,
YarnClient yarnClient,
YarnClientApplication yarnApplication,
ClusterSpecification clusterSpecification)
yarnClient.submitApplication(appContext);
通过 yarn-client向yarn-RM提交Submitting application master 要求启动AppMaster(AppMaster和jobManger在一个进程中两个线程),返回RestClusterClient flink集群客户端给flink-client,到此为止,应用请求已提交到Yarn的ResourceManager上了
-
yarn流程,该部分主要是在NodeManager上启动AM container都是Yarn本身行为
YarnClientImpl.java
RMAppManager.java submitApplication()
this.rmContext.getDispatcher().getEventHandler().handle(new RMAppEvent(applicationId, RMAppEventType.START));
主要是创建RMAppEvent,向RM申请container -
flink-on-yarn Per-job 集群启动流程分析
AM container加载运行的入口是 YarnJobClusterEntryPoint.java 中的main()方法
jobMaster启动流程
The executable entry point for the Yarn Application Master Process for a single Flink job.
ClusterEntrypoint.runClusterEntrypoint(yarnJobClusterEntrypoint)->
ClusterEntrypoint.java-> runClusterEntrypoint()->clusterEntrypoint.startCluster()->runCluster()->
clusterComponent = dispatcherResourceManagerComponentFactory.create(
configuration,
ioExecutor,
commonRpcService,
haServices,
blobServer,
heartbeatServices,
metricRegistry,
archivedExecutionGraphStore,
new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryServiceRpcService()),
this);
DefaultDispatcherResourceManagerComponentFactory.java
public DispatcherResourceManagerComponent create(
Configuration configuration,
Executor ioExecutor,
RpcService rpcService,
HighAvailabilityServices highAvailabilityServices,
BlobServer blobServer,
HeartbeatServices heartbeatServices,
MetricRegistry metricRegistry,
ArchivedExecutionGraphStore archivedExecutionGraphStore,
MetricQueryServiceRetriever metricQueryServiceRetriever,
FatalErrorHandler fatalErrorHandler){
//资源管理器
resourceManager = resourceManagerFactory.createResourceManager(
configuration,
ResourceID.generate(),
rpcService,
highAvailabilityServices,
heartbeatServices,
fatalErrorHandler,
new ClusterInformation(hostname, blobServer.getPort()),
webMonitorEndpoint.getRestBaseUrl(),
resourceManagerMetricGroup);
//注意这里就是启动Dispatcher rpc服务的
log.debug("Starting Dispatcher REST endpoint.");
webMonitorEndpoint.start();
log.debug("Starting Dispatcher.");
dispatcherRunner = dispatcherRunnerFactory.createDispatcherRunner(
highAvailabilityServices.getDispatcherLeaderElectionService(),
fatalErrorHandler,
new HaServicesJobGraphStoreFactory(highAvailabilityServices),
ioExecutor,
rpcService,
partialDispatcherServices);
log.debug("Starting ResourceManager.");
resourceManager.start();
resourceManagerRetrievalService.start(resourceManagerGatewayRetriever);
//启动 dispathcer 选举服务,同时也启动 dispathcer Zookeeper开源客户端Curator实现leader选举逻辑 flink选择的方式leaderLatch选举
dispatcherLeaderRetrievalService.start(dispatcherGatewayRetriever);->ZooKeeperLeaderRetrievalService.java start()
return new DispatcherResourceManagerComponent(
dispatcherRunner,
resourceManager,
dispatcherLeaderRetrievalService,
resourceManagerRetrievalService,
webMonitorEndpoint);
}
在同一进程中启动Dispatcher,ResourceManager和WebMonitorEndpoint组件服务
方法里面启动了 resourcemanager 和 dispatcher ,而Dispatcher类就是客户端提交Job的入口,参见 Dispatcher 注释,具体来说是其中的submitJob方法
JobMaster 是负责单个 JobGraph 的执行的,JobManager是老的runtime框架,JobMaster是社区 flip-6引入的新的runtime框架。目前起作用的应该是JobMaster
- 分析jobMaster中的ResourceManager组件
public class YarnResourceManager extends ActiveResourceManager<YarnWorkerNode> implements AMRMClientAsync.CallbackHandler, NMClientAsync.CallbackHandler
该组件实现了yarn 的 AMRMClientAsync.CallbackHandler接口,在Container分配完之后,回调
onContainersAllocated方法->requiredContainers.forEach(this::startTaskExecutorInContainer)
启动taskManagerContainer->
ContainerLaunchContext taskExecutorLaunchContext = Utils.createTaskExecutorContext(
flinkConfig,
yarnConfig,
env,
taskManagerParameters,
taskManagerDynamicProperties,
currDir,
//taskmanager container 运行类
YarnTaskExecutorRunner.class,
log);
- 分析 dispathcer 组件
Flink中JobMaster、ResourceManager、Dispatcher、WebMonitorEndpoint提供了基于zookeeper高可用
接上代码 Starting Dispatcher 分析createDispatcherRunner->DefaultDispatcherRunnerFactory.java createDispatcherRunner ->DefaultDispatcherRunner.java create() ->DispatcherRunnerLeaderElectionLifecycleManager.createFor(dispatcherRunner, leaderElectionService)->return new DispatcherRunnerLeaderElectionLifecycleManager<>(dispatcherRunner, leaderElectionService) ->DispatcherRunnerLeaderElectionLifecycleManager.leaderElectionService.start(dispatcherRunner) -> ZooKeeperLeaderElectionService.java start() leaderLatch.addListener(this) leaderLatch.start()
启动选举
一旦选主成功,会调用ZooKeeperLeaderElectionService isLeader()->leaderContender.grantLeadership(issuedLeaderSessionID)-> JobManagerRunnerImpl extends JobManagerRunner ->grantLeadership() verifyJobSchedulingStatusAndStartJobManager
到这里找到jobMaster的启动方法,grantLeadership
实现类中有 JobManagerRunner, ResourceManager 、DefaultDispatcherRunner、WebMonitorEndpoint 里面都是jobMaster相关主件的启动启动方法
private CompletableFuture<Void> verifyJobSchedulingStatusAndStartJobManager(UUID leaderSessionId) {
final CompletableFuture<JobSchedulingStatus> jobSchedulingStatusFuture = getJobSchedulingStatus();
return jobSchedulingStatusFuture.thenCompose(
jobSchedulingStatus -> {
if (jobSchedulingStatus == JobSchedulingStatus.DONE) {
return jobAlreadyDone();
} else {
return startJobMaster(leaderSessionId);
}
});
}
这里使用了jdk1.8 CompletableFuture.thenthenCompose特性,从zk获取job.graph文件的hdfs路径然后读取为JobGraph->JobManagerRunnerImpl.java startJobMaster-> return startJobMaster(leaderSessionId)
->startFuture = jobMasterService.start(new JobMasterId(leaderSessionId));
->JobMaster.java start();
->startJobExecution(newJobMasterId)
->startJobMasterServices
->resetAndStartScheduler
->由jobGraph创建ExecutionGraph
final SchedulerNG newScheduler = createScheduler(newJobManagerJobMetricGroup)
->JobMaster.java createScheduler->SchedulerNGFactory->createInstance->LegacySchedulerFactory.java createInstance->LegacyScheduler->SchedulerBase.java createAndRestoreExecutionGraph
createExecutionGraph(currentJobManagerJobMetricGroup, shuffleMaster, partitionTracker),成功创建ExecutionGraph
-jobMaster.java ->schedulerAssignedFuture.thenRun(this::startScheduling);
schedulerNG.startScheduling()->SchedulerBase.java startScheduling()->startSchedulingInternal()
->LegacyScheduler.java startSchedulingInternal()->ExecutionGraph.java executionGraph.scheduleForExecution();
-> final CompletableFuture<Void> newSchedulingFuture = SchedulingUtils.schedule(
scheduleMode,
getAllExecutionVertices(),
this);
scheduleLazy()->ExecutionVertex.scheduleForExecution()->Execution.scheduleForExecution()->this::deploy()-->taskManagerGateway.submitTask(deployment, rpcTimeout), executor)
->RpcTaskManagerGateway.submitTask(TaskDeploymentDescriptor tdd, Time timeout)
->TaskExecutor.submitTask()->task.startTaskThread();->Task.java impl Thread executingThread.start()->run()->doRun()->AbstractInvokable invokable.invoke()
run():The core work method that bootstraps the task and executes its code.
AbstractInvokable invokable.invoke():he TaskManager invokes the {@link #invoke()} method when executing a task.
->invoke->runMailboxLoop():便开始处理Source端消费的数据,并流入下游算子处理,也就是执行ExecutionVertex生成者逻辑->IntermediateResult (中间结果集合)ExecutionEdge(消费者)
分析 runMailboxLoop
OneInputStreamTask.java extends StreamTask
public OneInputStreamTask(
Environment env,
@Nullable TimerService timeProvider) {
super(env, timeProvider);
}
protected StreamTask(
Environment environment,
@Nullable TimerService timerService,
Thread.UncaughtExceptionHandler uncaughtExceptionHandler,
StreamTaskActionExecutor.SynchronizedStreamTaskActionExecutor actionExecutor,
TaskMailbox mailbox) {
super(environment);
...
this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor);
...
}
// This method implements the default action of the task (e.g. processing one event from the input)
protected void processInput(MailboxDefaultAction.Controller controller){
InputStatus status = inputProcessor.processInput();
}
StreamOneInputProcessor#processInput
@Override
public InputStatus processInput() throws Exception {
注意这里
InputStatus status = input.emitNext(output);
if (status == InputStatus.END_OF_INPUT) {
synchronized (lock) {
operatorChain.endHeadOperatorInput(1);
}
}
return status;
}
StreamTaskNetworkInput.java ->emitNext(DataOutput<T> output)
->processElement(deserializationDelegate.getInstance(), output)
->private void processElement(StreamElement recordOrMark, DataOutput<T> output) throws Exception {
if (recordOrMark.isRecord()){
output.emitRecord(recordOrMark.asRecord());
} else if (recordOrMark.isWatermark()) {
statusWatermarkValve.inputWatermark(recordOrMark.asWatermark(), lastChannel);
} else if (recordOrMark.isLatencyMarker()) {
output.emitLatencyMarker(recordOrMark.asLatencyMarker());
} else if (recordOrMark.isStreamStatus()) {
statusWatermarkValve.inputStreamStatus(recordOrMark.asStreamStatus(), lastChannel);
} else {
throw new UnsupportedOperationException("Unknown type of StreamElement");
}
}
->OneInputStreamTask.java emitRecord(StreamRecord<IN> record)
->public void emitRecord(StreamRecord<IN> record) throws Exception {
synchronized (lock) {
numRecordsIn.inc();
operator.setKeyContextElement1(record);
operator.processElement(record);
}
}
StreamMap.java
processElement->StreamMap.java ->processElement(StreamRecord<IN> element)
->output.collect(element.replace(userFunction.map(element.getValue())))
processElement的具体实现都是flink的各个算子
到这里终于找到用户定义函数执行的地方,到此,整个作业执行都已经理清楚
- 再分析:runMailboxLoop()->runMailboxLoop()
注意这里执行默认动作的地方-也是真正执行用户udf的地方
mailboxDefaultAction.runDefaultAction(defaultActionContext)
调起过程
this.mailboxProcessor = new MailboxProcessor(this::processInput, mailbox, actionExecutor)
StreamTask.java#runMailboxLoop
public void runMailboxLoop() throws Exception {
final TaskMailbox localMailbox = mailbox;
Preconditions.checkState(
localMailbox.isMailboxThread(),
"Method must be executed by declared mailbox thread!");
assert localMailbox.getState() == TaskMailbox.State.OPEN : "Mailbox must be opened!";
final MailboxController defaultActionContext = new MailboxController(this);
while (processMail(localMailbox)) {
//注意这里执行默认动作的地方-也是真正执行用户udf的地方
mailboxDefaultAction.runDefaultAction(defaultActionContext); // lock is acquired inside default action as needed
}
总结:
1.先启动AMMaster container,然后启动内部resourceManager(负责和yarn-RS请求资源,并且taskManagercontainer会在远程启动),再启动WebMonitorEndpoint,最后启动Dispatcher组件
2.flink cluster on yarn 的方式分析-Dispatcher启动之后,会启动JobMaster,并且会读取job.graph文件解析为ExecutionGraph,然后JobMaster allocateResourcesForExecution,再远程taskManager进行作业的部署
3.还有一种LeaderRetrievalHandler netty channelRead0 ,client通过提交jar包的方式,启动jobMaster,JobSubmitHandler.java handleRequest,上传jar,配置文件等,然后submit jobgraph,Dispatcher.java submitJob,最后实现都是JobManagerRunnerImpl.java
参考
https://zhuanlan.zhihu.com/p/83141161
https://cloud.tencent.com/developer/article/1586184
yarn 架构
https://matt33.com/2018/09/01/yarn-architecture-learn/
flink zookeeper curator
https://blog.csdn.net/u013516966/article/details/103867207
https://blog.csdn.net/weixin_41608066/article/details/105566489
网友评论