美文网首页
Flink1.10基于工厂模式的任务提交与SPI机制

Flink1.10基于工厂模式的任务提交与SPI机制

作者: longLiveData | 来源:发表于2020-06-08 09:38 被阅读0次

    本文仅为笔者平日学习记录之用,侵删
    原文:https://mp.weixin.qq.com/s/t9Z2d1Pay4EiJBIU1dNrsw

    Flink任务执行模式包含了yarn-session、standalone、per-job、local, 在1.10中又增加k8s的执行模式,那么在任务提交过程中如何根据不同的执行模式进行任务提交呢?主要通过两个接口来实现:PipelineExecutorFactory 与PipelineExecutor。PipelineExecutorFactory用于在不同模式下创建不同的PipelineExecutor, 用于提交任务,PipelineExecutorFactory表示的一个创建执行器工厂接口,PipelineExecutor 表示一个执行器接口,正如你所想这里使用的就是经典的工厂设计模式,在任务提交过程中会根据不同的提交模式, 使用不同的PipelineExecutorFactory创建不同的PipelineExecutor。

    public interface PipelineExecutorFactory {
       /**
        * Returns the name of the executor that this factory creates.
        */
       String getName();
       /**
          根据configuration判断是否满足当前的factory
        */
       boolean isCompatibleWith(final Configuration configuration);
       /**
        * 获取对应模式下的executor
        */
       PipelineExecutor getExecutor(final Configuration configuration);
    }
    

    PipelineExecutorFactory几个实现分别为:

    1. LocalExecutorFactory(local)
    2. RemoteExecutorFactory(standalone)
    3. YarnJobClusterExecutorFactory(per-job)
    4. YarnSessionClusterExecutorFactory(yarn-session)
    public interface PipelineExecutor {
    
       /**
        * 执行任务
        */
       CompletableFuture<JobClient> execute(final Pipeline pipeline, final Configuration configuration) throws Exception;
    }
    

    PipelineExecutor对应实现:

    1. RemoteExecutor(standalone)

    2. LocalExecutor(local)

    3. YarnJobClusterExecutor(per-job)

    4. YarnSessionClusterExecutor(yarn-session)

    那么具体是如何选择factory呢?由PipelineExecutorServiceLoader接口来完成,其只有一个实现类DefaultExecutorServiceLoader, 透过命名你可能会才想到这里面用到了ServiceLoader,你的猜想是正确的,它就是通过SPI机制去加载flink所提供的不同factory,在META-INF.services 下可以找到其对应的配置:

    DefaultExecutorServiceLoader.java部分源码

    //SPI机制
    private static final ServiceLoader<PipelineExecutorFactory> defaultLoader = ServiceLoader.load(PipelineExecutorFactory.class);
    
    //获取对应的factory
    public PipelineExecutorFactory getExecutorFactory(final Configuration configuration) {
       checkNotNull(configuration);
    
       final List<PipelineExecutorFactory> compatibleFactories = new ArrayList<>();
       final Iterator<PipelineExecutorFactory> factories = defaultLoader.iterator();
       while (factories.hasNext()) {
          try {
             final PipelineExecutorFactory factory = factories.next();
             //判断标准 根据任务启动配置
             if (factory != null && factory.isCompatibleWith(configuration)) {
                compatibleFactories.add(factory);
             }
          } catch (Throwable e) {
             if (e.getCause() instanceof NoClassDefFoundError) {
                LOG.info("Could not load factory due to missing dependencies.");
             } else {
                throw e;
             }
          }
       }
       //只能有一个factory符合要求
       if (compatibleFactories.size() > 1) {
          final String configStr =
                configuration.toMap().entrySet().stream()
                      .map(e -> e.getKey() + "=" + e.getValue())
                      .collect(Collectors.joining("\n"));
    
          throw new IllegalStateException("Multiple compatible client factories found for:\n" + configStr + ".");
       }
       return compatibleFactories.isEmpty() ? null : compatibleFactories.get(0);
    }
    

    ServiceLoader.load(PipelineExecutorFactory.class) 会从类路径的META-INF.services下找到PipelineExecutorFactory的全路径文件,然后实例化出所有的factory,通过PipelineExecutorFactory.isCompatibleWith找到匹配的factory。

    相关文章

      网友评论

          本文标题:Flink1.10基于工厂模式的任务提交与SPI机制

          本文链接:https://www.haomeiwen.com/subject/sqtltktx.html