美文网首页
Flink1.10任务提交流程分析(二)

Flink1.10任务提交流程分析(二)

作者: longLiveData | 来源:发表于2020-06-09 11:04 被阅读0次

    本文仅为笔者平日学习记录之用,侵删
    原文:https://mp.weixin.qq.com/s/MWBoBPVhiB4VgpchtR6_nQ

    Flink1.10任务提交流程分析(一)中分析了从flink run开始到任务提交到集群前的流程分析,对于不同的提交模式Flink中使用不同的PipelineExecutor,本篇基于yarn-per-job模式分析向yarn-cluster提交任务的流程。(注:基于1.10.1分析)

    YarnJobClusterExecutor

    接着上篇的分析,任务最终提交是交给PipelineExecutor来execute,PipelineExecutor的选择是根据不同的提交模式来决定即execution.target参数来决定,对于yarn-per-job会选择YarnJobClusterExecutor类型的executor。

    public class YarnJobClusterExecutor extends AbstractJobClusterExecutor<ApplicationId, YarnClusterClientFactory> {
       public static final String NAME = "yarn-per-job";
       public YarnJobClusterExecutor() {
          super(new YarnClusterClientFactory());
       }
    }
    

    其实现比较简单,比较重要其构造器中YarnClusterClientFactory,用于创建YarnClusterDescriptor,包含了yarn客户端YarnClient、yarn配置、提交yarn的队列等一些提交yarn的信息。它继承了AbstractJobClusterExecutor 抽象任务提交executor,execute也是由AbstractJobClusterExecutor来执行:

    public class AbstractJobClusterExecutor<ClusterID, ClientFactory extends ClusterClientFactory<ClusterID>> implements PipelineExecutor {
    
       private static final Logger LOG = LoggerFactory.getLogger(AbstractJobClusterExecutor.class);
       //代表的就是YarnClusterClientFactory
       private final ClientFactory clusterClientFactory;
    
       public AbstractJobClusterExecutor(@Nonnull final ClientFactory clusterClientFactory) {
          this.clusterClientFactory = checkNotNull(clusterClientFactory);
       }
    
       //执行任务提交
       //pipeline 代表StreamGraph
       public CompletableFuture<JobClient> execute(@Nonnull final Pipeline pipeline, @Nonnull final Configuration configuration) throws Exception {
          //将StreamGraph转换为JobGraph
          final JobGraph jobGraph = ExecutorUtils.getJobGraph(pipeline, configuration);
         //创建提交任务的一些信息:YarnClusterDescriptor
          try (final ClusterDescriptor<ClusterID> clusterDescriptor = clusterClientFactory.createClusterDescriptor(configuration)) {
            //将配置信息封装在ExecutionConfigAccessor中
             final ExecutionConfigAccessor configAccessor = ExecutionConfigAccessor.fromConfiguration(configuration);
             //包含了提交任务所需资源描述:内存大小、并行度 
             final ClusterSpecification clusterSpecification = clusterClientFactory.getClusterSpecification(configuration);
             //提交任务
             final ClusterClientProvider<ClusterID> clusterClientProvider = clusterDescriptor
                   .deployJobCluster(clusterSpecification, jobGraph, 
                                     //是否采用分离模式
                                     configAccessor.getDetachedMode());
             LOG.info("Job has been submitted with JobID " + jobGraph.getJobID());
    
             return CompletableFuture.completedFuture(
                   new ClusterClientJobClientAdapter<>(clusterClientProvider, jobGraph.getJobID()));
          }
       }
    }
    

    关于ClusterSpecification中描述了任务提交到集群所需的资源大小,对于分配模式建议详读一下官网Flink1.10的内存管理机制便于更好的理解。任务最终交给YarnClusterDescriptor deploy。

    Deploy过程

    deploy过程代表了与yarn交互的过程,clusterDescriptor.deployJobCluster会调用内部deployInternal方法:

    private ClusterClientProvider<ApplicationId> deployInternal(
          ClusterSpecification clusterSpecification,
          String applicationName,
          String yarnClusterEntrypoint,
          @Nullable JobGraph jobGraph,
          boolean detached) throws Exception {
        //..... 会做一些检查工作: yarn队列是否存在、配置检查
        //校验资源大小等等
       ApplicationReport report = startAppMaster(
             flinkConfiguration,
             applicationName,
             yarnClusterEntrypoint,
             jobGraph,
             yarnClient,
             yarnApplication,
             validClusterSpecification);
    
       //....
    }
    

    最重的就是startAppMaster,在yarn上启动一个AppMaster进程,其中yarnClusterEntrypoint表示该进程的入口类,也就是JobMaster的启动入口类:org.apache.flink.yarn.entrypoint.YarnJobClusterEntrypoint, 在集群的机器进程上也能看到该类,如果看到这个进程我们就可知道表示的是JobMaster的进程。startAppMaster的过程比较长,这里也会逐一分解:

    private ApplicationReport startAppMaster(
          Configuration configuration,
          String applicationName,
          String yarnClusterEntrypoint,
          JobGraph jobGraph,
          YarnClient yarnClient,
          YarnClientApplication yarnApplication,
          ClusterSpecification clusterSpecification) throws Exception {
    
       // ------------------ Initialize the file systems -------------------------
    
       org.apache.flink.core.fs.FileSystem.initialize(
             configuration,
             PluginUtils.createPluginManagerFromRootFolder(configuration));
    
       //获取homeDir, 表示jar、log配置上传的路径, 一般表示在hdfs上
       //其路径为/user/hadoop, (hadoop表示的当前的用户)
       final FileSystem fs = FileSystem.get(yarnConfiguration);
       final Path homeDir = fs.getHomeDirectory();
       //提交到yarn的描述信息
       ApplicationSubmissionContext appContext = yarnApplication.getApplicationSubmissionContext();
       // 会被上传到hdfs的文件 并且被添加到classpath中
       Set<File> systemShipFiles = new HashSet<>(shipFiles.size());
       // 仅仅是会被上传到hdfs , 但是不会被添加到classpath
       Set<File> shipOnlyFiles = new HashSet<>();
       for (File file : shipFiles) {
          systemShipFiles.add(file.getAbsoluteFile());
       }
    
       final String logConfigFilePath = configuration.getString(YarnConfigOptionsInternal.APPLICATION_LOG_CONFIG_FILE);
       if (logConfigFilePath != null) {
          systemShipFiles.add(new File(logConfigFilePath));
       }
       //将flink_home/lib 下的文件添加到systemShipFiles、通过-yt指定的文件也在里面
       addLibFoldersToShipFiles(systemShipFiles);
    
       //将flink_home/plugins 下的文件添加到shipOnlyFiles
       addPluginsFoldersToShipFiles(shipOnlyFiles);
    
       final ApplicationId appId = appContext.getApplicationId();
    
       // zk-ha相关的配置
       String zkNamespace = getZookeeperNamespace();
       // no user specified cli argument for namespace?
       if (zkNamespace == null || zkNamespace.isEmpty()) {
          // namespace defined in config? else use applicationId as default.
          zkNamespace = configuration.getString(HighAvailabilityOptions.HA_CLUSTER_ID, String.valueOf(appId));
          setZookeeperNamespace(zkNamespace);
       }
    
       configuration.setString(HighAvailabilityOptions.HA_CLUSTER_ID, zkNamespace);
    
       if (HighAvailabilityMode.isHighAvailabilityModeActivated(configuration)) {
          // activate re-execution of failed applications
          appContext.setMaxAppAttempts(
                configuration.getInteger(
                      YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
                      YarnConfiguration.DEFAULT_RM_AM_MAX_ATTEMPTS));
    
          activateHighAvailabilitySupport(appContext);
       } else {
          // set number of application retries to 1 in the default case
          appContext.setMaxAppAttempts(
                configuration.getInteger(
                      YarnConfigOptions.APPLICATION_ATTEMPTS.key(),
                      1));
       }
    
      //userJarFiles  表示用户jar
       final Set<File> userJarFiles = (jobGraph == null)
             // not per-job submission
             ? Collections.emptySet()
             // add user code jars from the provided JobGraph
             : jobGraph.getUserJars().stream().map(f -> f.toUri()).map(File::new).collect(Collectors.toSet());
    
       //需要cache文件上传到hdfs,一般使用在文件共享中
       if (jobGraph != null) {
          for (Map.Entry<String, DistributedCache.DistributedCacheEntry> entry : jobGraph.getUserArtifacts().entrySet()) {
             org.apache.flink.core.fs.Path path = new org.apache.flink.core.fs.Path(entry.getValue().filePath);
             // only upload local files
             if (!path.getFileSystem().isDistributedFS()) {
                Path localPath = new Path(path.getPath());
                Tuple2<Path, Long> remoteFileInfo =
                   Utils.uploadLocalFileToRemote(fs, appId.toString(), localPath, homeDir, entry.getKey());
                jobGraph.setUserArtifactRemotePath(entry.getKey(), remoteFileInfo.f0.toString());
             }
          }
    
          jobGraph.writeUserArtifactEntriesToConfiguration();
       }
    
       //表示启动appMaster需要的资源文件,会从hdfs上下载
       final Map<String, LocalResource> localResources = new HashMap<>(2 + systemShipFiles.size() + userJarFiles.size());
       // 访问hdfs的安全设置
       final List<Path> paths = new ArrayList<>(2 + systemShipFiles.size() + userJarFiles.size());
       // 启动taskExecutor需要的资源文件
       StringBuilder envShipFileList = new StringBuilder();
    
       //几个uploadAndRegisterFiles  方法,将systemShipFiles、shipOnlyFiles、用户jar上传到hdfs
    
       if (userJarInclusion == YarnConfigOptions.UserJarInclusion.ORDER) {
          systemClassPaths.addAll(userClassPaths);
       }
    
       // normalize classpath by sorting
       Collections.sort(systemClassPaths); //系统的一些classpath 排序
       Collections.sort(userClassPaths); //用户classpath 排序
    
       // classPathBuilder: 存放classpath的信息
       StringBuilder classPathBuilder = new StringBuilder();
         /*
          * 构建classpath: shipFile-jar、user-jar、log4j、yaml配置文件
          */
    
       final Path yarnFilesDir = getYarnFilesDir(appId);
       FsPermission permission = new FsPermission(FsAction.ALL, FsAction.NONE, FsAction.NONE);
       fs.setPermission(yarnFilesDir, permission); // set permission for path.
    
       /*
        *中间一堆与安全相关的配置
        */
    
      //执行的java命令信息,启动YarnJobClusterEntrypoint 
       final ContainerLaunchContext amContainer = setupApplicationMasterContainer(
             yarnClusterEntrypoint,
             hasLogback,
             hasLog4j,
             hasKrb5,
             clusterSpecification.getMasterMemoryMB());
    
       if (UserGroupInformation.isSecurityEnabled()) {
          // set HDFS delegation tokens when security is enabled
          LOG.info("Adding delegation token to the AM container.");
          Utils.setTokensFor(amContainer, paths, yarnConfiguration);
       }
    
       amContainer.setLocalResources(localResources);
       fs.close();
    
       // Setup CLASSPATH and environment variables for ApplicationMaster
       final Map<String, String> appMasterEnv = new HashMap<>();
       /**
        * 配置环境变量参数  到  appMasterEnv中,在启动启动YarnJobClusterEntrypoint时用到,
        * 例如:classpath、hadoopUser、appId等
        */
    
       amContainer.setEnvironment(appMasterEnv);
    
        // 还有一堆设置提交任务队列、yarn任务名称的配置信息
    
       // add a hook to clean up in case deployment fails
       Thread deploymentFailureHook = new DeploymentFailureHook(yarnApplication, yarnFilesDir);
       Runtime.getRuntime().addShutdownHook(deploymentFailureHook);
       LOG.info("Submitting application master " + appId);
       //提交任务  
       yarnClient.submitApplication(appContext);
    
       /**
        *  获取任务状态
        */
    }
    

    这部分的流程比较长,总结一下主要有以下几点:

    1. 将shipFiles、plugins、userJar、logFile、flink-conf.yaml、job.graph等文件上传到hdfs

    2. 构建启动需要的classpath、ha-zk配置、安全配置、jobMaster启动命令等

    3. 向yarn提交任务

    在yarn上启动成功后,在JobMaster的工作目录可以看到launch_container.sh这样的一个文件,这个文件里面包含了在startAppMaster所做的所有环境变量参数设置、启动命令。

    总结

    本篇主要介绍了yarn-per-job的任务提交流程,结合前面两篇的分析,到现在应该掌握了如何通过API的方式去实现任务的提交,我认为重要有两点:一是做好参数的解析、配置,二是选择一个合适的PipelineExecutor提交任务。

    相关文章

      网友评论

          本文标题:Flink1.10任务提交流程分析(二)

          本文链接:https://www.haomeiwen.com/subject/bghstktx.html