美文网首页
flume1.9源码分析(一)从编译到启动

flume1.9源码分析(一)从编译到启动

作者: CarsonCao | 来源:发表于2019-03-18 23:58 被阅读0次

    1. 源码编译

    1. 下载地址
      github选择branch 1.9(https://github.com/apache/flume/tree/flume-1.9

    git clone git@github.com:apache/flume.git

    1. 配置maven依赖库
      下载完源码之后按照maven项目导入到idea中,然后配置maven的依赖库。
      如果有国外代理可以不用配置,否则可将maven配置成国内的库,比如阿里云的maven库:
    <mirror>  
        <id>nexus-aliyun</id>
        <mirrorOf>*</mirrorOf>
        <name>Nexus aliyun</name>
        <url>http://maven.aliyun.com/nexus/content/groups/public</url>
    </mirror>
    <!--以上阿里云maven库的配置来自网络,作者未亲自验证,如遇问题请读者自行查资料解决。-->
    

    本人采用的是公司内部的maven库,依赖的包都比较全,编译很顺畅,没有出现网上遇到编译不通过的问题,所以此章节的配置请读者自行解决,网上有很多解决方案。

    1. 编译源码
      如下命令编译源码:
      mvn clean install -DskipTests
      编译完成会在每个模块下看到各自相应的target文件夹,里面有编译之后的jar包。
    [INFO] Reactor Summary:
    [INFO] 
    [INFO] Build Support ...................................... SUCCESS [  1.854 s]
    [INFO] Apache Flume ....................................... SUCCESS [ 11.992 s]
    [INFO] Flume NG SDK ....................................... SUCCESS [ 16.187 s]
    [INFO] Flume NG Hadoop Credential Store Config Filter ..... SUCCESS [  0.135 s]
    [INFO] Flume NG Config Filters API ........................ SUCCESS [  2.549 s]
    [INFO] Flume NG Configuration ............................. SUCCESS [  5.963 s]
    [INFO] Flume Auth ......................................... SUCCESS [  6.558 s]
    [INFO] Flume NG Core ...................................... SUCCESS [ 18.504 s]
    [INFO] Flume NG Sinks ..................................... SUCCESS [  0.145 s]
    [INFO] Flume NG HDFS Sink ................................. SUCCESS [  8.566 s]
    [INFO] Flume NG IRC Sink .................................. SUCCESS [  3.347 s]
    [INFO] Flume NG Channels .................................. SUCCESS [  0.115 s]
    [INFO] Flume NG JDBC channel .............................. SUCCESS [  5.174 s]
    [INFO] Flume NG file-based channel ........................ SUCCESS [ 11.072 s]
    [INFO] Flume NG Spillable Memory channel .................. SUCCESS [  3.804 s]
    [INFO] Flume NG Node ...................................... SUCCESS [ 10.520 s]
    [INFO] Flume NG Embedded Agent ............................ SUCCESS [  3.632 s]
    [INFO] Flume NG HBase Sink ................................ SUCCESS [  7.410 s]
    [INFO] Flume NG HBase2 Sink ............................... SUCCESS [  8.654 s]
    [INFO] Flume NG ElasticSearch Sink ........................ SUCCESS [  4.994 s]
    [INFO] Flume NG Morphline Solr Sink ....................... SUCCESS [  6.844 s]
    [INFO] Flume Shared Utils ................................. SUCCESS [  0.067 s]
    [INFO] Flume Shared Kafka ................................. SUCCESS [  2.755 s]
    [INFO] Flume Shared Kafka Test Utils ...................... SUCCESS [  3.389 s]
    [INFO] Flume Kafka Sink ................................... SUCCESS [  3.689 s]
    [INFO] Flume HTTP/S Sink .................................. SUCCESS [  3.416 s]
    [INFO] Flume NG Kite Dataset Sink ......................... SUCCESS [  5.341 s]
    [INFO] Flume NG Hive Sink ................................. SUCCESS [  4.872 s]
    [INFO] Flume Sources ...................................... SUCCESS [  0.091 s]
    [INFO] Flume Scribe Source ................................ SUCCESS [  4.130 s]
    [INFO] Flume JMS Source ................................... SUCCESS [  3.664 s]
    [INFO] Flume Twitter Source ............................... SUCCESS [  3.274 s]
    [INFO] Flume Kafka Source ................................. SUCCESS [  4.323 s]
    [INFO] Flume Taildir Source ............................... SUCCESS [  4.337 s]
    [INFO] flume-kafka-channel ................................ SUCCESS [  3.949 s]
    [INFO] Flume legacy Sources ............................... SUCCESS [  0.050 s]
    [INFO] Flume legacy Avro source ........................... SUCCESS [  3.007 s]
    [INFO] Flume legacy Thrift Source ......................... SUCCESS [  4.780 s]
    [INFO] Flume NG Environment Variable Config Filter ........ SUCCESS [  2.054 s]
    [INFO] flume-ng-hadoop-credential-store-config-filter ..... SUCCESS [  2.773 s]
    [INFO] Flume NG External Process Config Filter ............ SUCCESS [  2.402 s]
    [INFO] Flume NG Clients ................................... SUCCESS [  0.059 s]
    [INFO] Flume NG Log4j Appender ............................ SUCCESS [  6.521 s]
    [INFO] Flume NG Tools ..................................... SUCCESS [  2.914 s]
    [INFO] Flume NG distribution .............................. SUCCESS [ 11.804 s]
    [INFO] Flume NG Integration Tests ......................... SUCCESS [  3.099 s]
    [INFO] ------------------------------------------------------------------------
    [INFO] BUILD SUCCESS
    [INFO] ------------------------------------------------------------------------
    [INFO] Total time: 03:45 min
    [INFO] Finished at: 2019-01-21T11:54:33+08:00
    [INFO] Final Memory: 252M/1584M
    
    

    2. flume调试

    2.1 flume使用

    这里我们先简单复习下flume的使用,下面用一个最简单的例子做介绍。
    启动一个flume agent的命令需要指定调用的模块名称,可用的模块包括:help, agent, avro-client, tool, version等,启动一个agent的命令格式如下:
    bin/flume-ng agent -n $agent_name -c conf -f conf/flume-conf.properties.template

    首先新建一个agent的配置文件:

    # example.conf: A single-node Flume configuration
    
    # Name the components on this agent
    a1.sources = r1
    a1.sinks = k1
    a1.channels = c1
    
    # Describe/configure the source
    a1.sources.r1.type = netcat
    a1.sources.r1.bind = localhost
    a1.sources.r1.port = 44444
    
    # Describe the sink
    a1.sinks.k1.type = logger
    
    # Use a channel which buffers events in memory
    a1.channels.c1.type = memory
    a1.channels.c1.capacity = 1000
    a1.channels.c1.transactionCapacity = 100
    
    # Bind the source and sink to the channel
    a1.sources.r1.channels = c1
    a1.sinks.k1.channel = c1
    

    然后执行如下命令:
    bin/flume-ng agent --conf conf --conf-file ./conf/example.conf --name a1 -Dflume.root.logger=DEBUG,console
    其中,--conf 的简称为 -c--conf-file 的简称为 -f--name 的简称为 -n

    flume执行脚本主函数run_flume()中默认会执行 set -x 的语句,所以执行flume启动命令之后,可以在日志中发现脚本最终执行了如下命令:
    exec /usr/lib/jvm/java-8-oracle/bin/java -Xmx20m -Dflume.root.logger=INFO,console -cp '/home/caolch/Downloads/software_store/apache-flume-1.9.0-bin/conf:/home/caolch/Downloads/software_store/apache-flume-1.9.0-bin/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file ./conf/example.conf --name a1
    通过telnet工具与flume的source建立链接,发送字符串,flume在接收到数据之后,logger sink将接受到的数据打印在屏幕上。

    $ telnet localhost 44444
    Trying 127.0.0.1...
    Connected to localhost.localdomain (127.0.0.1).
    Escape character is '^]'.
    Hello world! <ENTER>
    OK
    
    2.2 程序入口

    flume-ng脚本:

    ################################
    # constants
    ################################
    
    FLUME_AGENT_CLASS="org.apache.flume.node.Application"
    FLUME_AVRO_CLIENT_CLASS="org.apache.flume.client.avro.AvroCLIClient"
    FLUME_VERSION_CLASS="org.apache.flume.tools.VersionInfo"
    FLUME_TOOLS_CLASS="org.apache.flume.tools.FlumeToolsMain"
    
    ...
    
    run_flume() {
      local FLUME_APPLICATION_CLASS
    
      if [ "$#" -gt 0 ]; then
        FLUME_APPLICATION_CLASS=$1
        shift
      else
        error "Must specify flume application class" 1
      fi
    
      if [ ${CLEAN_FLAG} -ne 0 ]; then
        set -x
      fi
      $EXEC $JAVA_HOME/bin/java $JAVA_OPTS $FLUME_JAVA_OPTS "${arr_java_props[@]}" -cp "$FLUME_CLASSPATH" \
          -Djava.library.path=$FLUME_JAVA_LIBRARY_PATH "$FLUME_APPLICATION_CLASS" $*
    }
    
    ...
    
    # finally, invoke the appropriate command
    if [ -n "$opt_agent" ] ; then
      run_flume $FLUME_AGENT_CLASS $args
    elif [ -n "$opt_avro_client" ] ; then
      run_flume $FLUME_AVRO_CLIENT_CLASS $args
    elif [ -n "${opt_version}" ] ; then
      run_flume $FLUME_VERSION_CLASS $args
    elif [ -n "${opt_tool}" ] ; then
      run_flume $FLUME_TOOLS_CLASS $args
    else
      error "This message should never appear" 1
    fi
    

    从脚本可以看出org.apache.flume.node.Application 为程序的主入口。

    在idea中利用Alt+7命令查看类的structure结构如下:


    Application类结构
    2.3 远程Debug

    为了便于对代码进行debug分析,下面介绍一下flume的远程debug的配置方法。总共分为两步:第一步,修改flume启动脚本;第二步,idea的debug配置中添加remote配置项。

    (1)修改flume启动脚本
    打开flume-ng启动文件,找到"JAVA_OPTS=",添加如下内容:

    flume-ng配置文件
    JAVA_OPTS="-Xmx20m -Xdebug -Xrunjdwp:transport=dt_socket,address=5005,server=y,suspend=y"
    

    (2)修改flume启动脚本
    在idea界面上依次点击"Run"->"debug..."->"Edit Configurations",点击左上角的加号,新增一个remote配置项,idea的默认端口号是5005,这里的端口号一定要跟flume配置的一致。

    idea端debug添加remote配置项

    配置完成,启动flume,会看到flume正在监听5005端口,此时启动idea调试。

    + exec /usr/lib/jvm/java-8-oracle/bin/java -Xmx20m -Xdebug -Xrunjdwp:transport=dt_socket,address=5005,server=y,suspend=y -Dflume.root.logger=DEBUG,console -cp '/home/caolch/Downloads/software_store/apache-flume-1.9.0-bin/conf:/home/caolch/Downloads/software_store/apache-flume-1.9.0-bin/lib/*:/lib/*' -Djava.library.path= org.apache.flume.node.Application --conf-file ./conf/example.conf --name a1
    Listening for transport dt_socket at address: 5005
    
    2.4 代码分析

    我们从main函数开始分析。
    首先是第一行初始化ssl的全局参数:

    public static void main(String[] args) {
    
        try {
          /*初始化ssl的全局参数,利用System.getEnv()和System.setProperty(), flume可利用ssl进行加解密传输*/
          SSLUtil.initGlobalSSLParameters();
    

    进入函数内部发现主要是调用initSysPropFromEnvVar函数将系统级别的关于ssl的参数放到Property中。关于 System.getEnv()System.getProperty() 的对比详见https://www.jianshu.com/p/dbe4795b61ac

     private static void initSysPropFromEnvVar(String sysPropName, String envVarName,
                                                String description) {
        if (System.getProperty(sysPropName) != null) {
          LOGGER.debug("Global SSL " + description + " has been initialized from system property.");
        } else {
          String envVarValue = System.getenv(envVarName);
          if (envVarValue != null) {
            System.setProperty(sysPropName, envVarValue);
            LOGGER.debug("Global SSL " + description +
                " has been initialized from environment variable.");
          } else {
            LOGGER.debug("No global SSL " + description + " specified.");
          }
        }
      }
    

    flume启动日志中也可以查看初始化ssl参数的过程:

    2019-02-24 12:05:55,036 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL keystore path specified.
    2019-02-24 12:05:55,040 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL keystore password specified.
    2019-02-24 12:05:55,041 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL keystore type specified.
    2019-02-24 12:05:55,041 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL truststore path specified.
    2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL truststore password specified.
    2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL truststore type specified.
    2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL include protocols specified.
    2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL exclude protocols specified.
    2019-02-24 12:05:55,046 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL include cipher suites specified.
    2019-02-24 12:05:55,047 (main) [DEBUG - org.apache.flume.util.SSLUtil.initSysPropFromEnvVar(SSLUtil.java:95)] No global SSL exclude cipher suites specified.
    
    

    下面继续看main函数,接下来是参数解析部分,先上代码。

      /*参数解析*/
          Options options = new Options();
    
          Option option = new Option("n", "name", true, "the name of this agent");
          option.setRequired(true);
          options.addOption(option);
    
          option = new Option("f", "conf-file", true,
              "specify a config file (required if -z missing)");
          option.setRequired(false);
          options.addOption(option);
    
          option = new Option(null, "no-reload-conf", false,
              "do not reload config file if changed");
          options.addOption(option);
    
          // Options for Zookeeper
          option = new Option("z", "zkConnString", true,
              "specify the ZooKeeper connection to use (required if -f missing)");
          option.setRequired(false);
          options.addOption(option);
    
          option = new Option("p", "zkBasePath", true,
              "specify the base path in ZooKeeper for agent configs");
          option.setRequired(false);
          options.addOption(option);
    
          option = new Option("h", "help", false, "display help text");
          options.addOption(option);
    
          CommandLineParser parser = new GnuParser();
          CommandLine commandLine = parser.parse(options, args);
    
          if (commandLine.hasOption('h')) {
            new HelpFormatter().printHelp("flume-ng agent", options, true);
            return;
          }
    
          String agentName = commandLine.getOptionValue('n');
          boolean reload = !commandLine.hasOption("no-reload-conf");
    
          boolean isZkConfigured = false;
          if (commandLine.hasOption('z') || commandLine.hasOption("zkConnString")) {
            isZkConfigured = true;
          }
    

    在调试的时候很容易发现,执行章节2.1的flume命令,main函数中收到的参数包括--conf-file和--name:
    args: --conf-file ./conf/example.conf --name a1

    flume的配置文件有两种获取方式,可从zookeeper或者文件中获取配置信息。每种方式都含有自动更新配置(重启所有组件)和不自动更新配置两种操作。

    参数zkConnString不为空时,会直接从zookeeper中获取配置信息,否则从文件中获取。如果加上配置参数 no-reload-conf,flume不会自动更新配置参数,默认不加这个参数flume会自动监听配置信息的变化并且利用eventBus触发重读配置文件并重新启动所有组件。

    以从文件获取配置信息,并且监听配置文件变化自动重启所有组件的情况来举例说明flume的调用顺序,代码如下:

    //...省略若干行
     boolean reload = !commandLine.hasOption("no-reload-conf");
    //...省略若干行
    List<LifecycleAware> components = Lists.newArrayList();
    
            if (reload) {
              EventBus eventBus = new EventBus(agentName + "-event-bus");
              PollingPropertiesFileConfigurationProvider configurationProvider =
                  new PollingPropertiesFileConfigurationProvider(
                      agentName, configurationFile, eventBus, 30);
              components.add(configurationProvider);
              application = new Application(components);
              eventBus.register(application);
            } 
    //...省略若干行
     application.start();
    

    如果含有参数no-reload-conf,则 reload=true

    以上代码用到了guava EventBus,guava的EventBus是观察者模式的一种优雅的解决方案,利用EventBus实现事件的发布和订阅,可以节省很多工作量。guava EventBus的原理和使用参见:https://www.jianshu.com/p/f8ba312904f4 。EventBus的观察者(事件订阅者)需要用@Subscribe 注释标注的函数来处理事件发布者发过来的事件。EventBus.register()用来注册观察者。
    在类Application中,我们可以找到事件处理方法handleConfigurationEvent(MaterializedConfiguration conf)

    /*guava EventBus中用@Subscribe标记,定义监听处理方法*/
      @Subscribe
      public void handleConfigurationEvent(MaterializedConfiguration conf) {
        try {
          lifecycleLock.lockInterruptibly();
          stopAllComponents();
          startAllComponents(conf);
        } catch (InterruptedException e) {
          logger.info("Interrupted while trying to handle configuration event");
          return;
        } finally {
          // If interrupted while trying to lock, we don't own the lock, so must not attempt to unlock
          if (lifecycleLock.isHeldByCurrentThread()) {
            lifecycleLock.unlock();
          }
        }
      }
    

    该方法调用stopAllComponents()和startAllComponents(conf)函数对所有的组件进行了重启。

    找到了事件的处理逻辑,那么往EventBus发送事件的发布者在哪里??
    带着问题,我们需要重新回到刚才的代码,可以看到在创建完 EventBus 对象之后,又new了一个类PollingPropertiesFileConfigurationProvider的对象,该类实现了接口LifecycleAware,flume中所有的组件都实现自该接口。

    PollingPropertiesFileConfigurationProvider继承关系
    最终,PollingPropertiesFileConfigurationProvider的对象被添加到全局属性List<LifecycleAware> components中,
     public Application(List<LifecycleAware> components) {
        this.components = components;
        supervisor = new LifecycleSupervisor();
      }
    

    然后调用Application的start()方法,对components进行启动。

      public void start() {
        lifecycleLock.lock();
        try {
          for (LifecycleAware component : components) {
            supervisor.supervise(component,
                new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
          }
        } finally {
          lifecycleLock.unlock();
        }
      }
    

    具体每个component是怎么启动的,我们可以深入到LifecycleSupervisor.supervise()函数中查看:

        MonitorRunnable monitorRunnable = new MonitorRunnable();
        monitorRunnable.lifecycleAware = lifecycleAware;
        monitorRunnable.supervisoree = process;
        monitorRunnable.monitorService = monitorService;
    
        supervisedProcesses.put(lifecycleAware, process);
    
        ScheduledFuture<?> future = monitorService.scheduleWithFixedDelay(
            monitorRunnable, 0, 3, TimeUnit.SECONDS);
        monitorFutures.put(lifecycleAware, future);
    
    

    通过ScheduleWithFixedDelay延时调用任务monitorRunnable,任务执行完之后,等待3s继续调度执行。
    MonitorRunnable的run函数中lifecycleAware.start()说明执行了传入组件的start()方法。

              switch (supervisoree.status.desiredState) {
                  case START:
                    try {
                      lifecycleAware.start();
    

    回到刚才的PollingPropertiesFileConfigurationProvider类中,我们发现在start()方法中,new了一个单线程执行器Executors.newSingleThreadScheduledExecutor(),然后每隔30s(interval=30s,Application类调用的时候传入)调度执行一次FileWatcherRunnable任务。

     public PollingPropertiesFileConfigurationProvider(String agentName,
          File file, EventBus eventBus, int interval) {
        super(agentName, file);
        this.eventBus = eventBus;
        this.file = file;
        this.interval = interval;
        counterGroup = new CounterGroup();
        lifecycleState = LifecycleState.IDLE;
      }
    
      @Override
      public void start() {
        LOGGER.info("Configuration provider starting");
    
        Preconditions.checkState(file != null,
            "The parameter file must not be null");
    
        executorService = Executors.newSingleThreadScheduledExecutor(
                new ThreadFactoryBuilder().setNameFormat("conf-file-poller-%d")
                    .build());
    
          /*新启动一个线程,监控到有文件变动就将getConfiguration加到eventBus中,eventBus有事件更新会调用Application类中
        * 用@Subscribe修饰的函数,也就是 public void handleConfigurationEvent(MaterializedConfiguration conf)
        *  eventBus.post(getConfiguration())将conf对象通过总线传给了handleConfigurationEvent去处理*/
        FileWatcherRunnable fileWatcherRunnable =
            new FileWatcherRunnable(file, counterGroup);
    
        executorService.scheduleWithFixedDelay(fileWatcherRunnable, 0, interval,
            TimeUnit.SECONDS);
    
        lifecycleState = LifecycleState.START;
    
        LOGGER.debug("Configuration provider started");
      }
    
    

    FileWatcherRunnable任务用于监控配置文件的变化,

     long lastModified = file.lastModified();
          if (lastModified > lastChange) {
    //省略若干行
    

    如果配置文件发生变化,则调用eventBus.post(getConfiguration())语句将事件发送到eventBus主线,eventBus负责调用观察者(Application)调用事件处理函数(handleConfigurationEvent(MaterializedConfiguration conf))处理事件。

    
     public class FileWatcherRunnable implements Runnable {
    
        private final File file;
        private final CounterGroup counterGroup;
    
        private long lastChange;
    
        public FileWatcherRunnable(File file, CounterGroup counterGroup) {
          super();
          this.file = file;
          this.counterGroup = counterGroup;
          this.lastChange = 0L;
        }
    
        @Override
        public void run() {
          LOGGER.debug("Checking file:{} for changes", file);
    
          counterGroup.incrementAndGet("file.checks");
    
          long lastModified = file.lastModified();
    
          if (lastModified > lastChange) {
            LOGGER.info("Reloading configuration file:{}", file);
    
            counterGroup.incrementAndGet("file.loads");
    
            lastChange = lastModified;
    
            try {
              eventBus.post(getConfiguration());
            } catch (Exception e) {
              LOGGER.error("Failed to load configuration data. Exception follows.",
                  e);
            } catch (NoClassDefFoundError e) {
              LOGGER.error("Failed to start agent because dependencies were not " +
                  "found in classpath. Error follows.", e);
            } catch (Throwable t) {
              // caught because the caller does not handle or log Throwables
              LOGGER.error("Unhandled error", t);
            }
          }
        }
      }
    

    到这里整个flume的启动过程就讲完了,有人会问,这里只启动了PollingPropertiesFileConfigurationProvider,并没有启动flume的channel、source和sink。其实在第一次启动的时候,lastModifiedlastChange这两个值是不相等的,

    //省略若干行
      this.lastChange = 0L;
    //省略若干行
      if (lastModified > lastChange) {
    //省略若干行
    

    就会触发eventBus,调用handleConfigurationEvent函数,handleConfigurationEvent函数中有语句startAllComponents(conf),里面有对channel、source和sink的启动语句,具体在下一篇文章里介绍。

    2.5回顾

    下面我们总结一下整个flume的调用顺序。
    Application->LifecycleSupervisor-(3s调度一次)>MonitorRunnable->PollingPropertiesFileConfigurationProvider-(30s调度一次)>FileWatcherRunnable->EventBus->Application
    期间我们看到一个调度器调度了另一个调度器,而且间隔几秒一次,为什么没有出现多个重复任务实例被调度起来?

     supervisor.supervise(component,
                new SupervisorPolicy.AlwaysRestartPolicy(), LifecycleState.START);
    

    我们看到在LifecycleSupervisor执行调度的时候传入了一个LifecycleState.START值,这个值便是下面代码(MonitorRunnable的run函数)中的desiredState

    //省略若干行
      if (!lifecycleAware.getLifecycleState().equals(
                  supervisoree.status.desiredState)) {
    
                logger.debug("Want to transition {} from {} to {} (failures:{})",
                    new Object[] { lifecycleAware, supervisoree.status.lastSeenState,
                        supervisoree.status.desiredState,
                        supervisoree.status.failures });
    
                switch (supervisoree.status.desiredState) {
                  case START:
                    try {
                      lifecycleAware.start();
    //省略若干行
    

    实现lifecycleAware接口的PollingPropertiesFileConfigurationProvider类在首次调用start()函数的时候,就已经将lifecycleState的值变为START:lifecycleState = LifecycleState.START;
    所以调度器在之后的调度过程中,由于if (!lifecycleAware.getLifecycleState().equals( supervisoree.status.desiredState))if条件不成立,便不会有新的任务被调度起来。PollingPropertiesFileConfigurationProvider任务只有一个线程实例,又由于调度FileWatcherRunnable的是一个单线程调度器,FileWatcherRunnable任务也只有一个线程实例。同理,各个channel、source和sink也都没有重复实例被调度起来。

    相关文章

      网友评论

          本文标题:flume1.9源码分析(一)从编译到启动

          本文链接:https://www.haomeiwen.com/subject/vutcjqtx.html