美文网首页
【ElasticJob源码解析】事件

【ElasticJob源码解析】事件

作者: 农蓝 | 来源:发表于2018-02-03 01:25 被阅读0次

    ElasticJob有一套自己的事件发布机制,核心部件使用的Guava的EventBus,如果不是很了解,出门左转看我的关于Guava的EventBus文章;

    1,事件配置接口-JobEventConfiguration

    public interface JobEventConfiguration extends JobEventIdentity {
        JobEventListener createJobEventListener() throws JobEventListenerConfigurationException;
    }
    
    • 该接口非常简单,只是为了获取JobEventListener;
    • JobEventListener的本质就是观察者,它的实现类将会使用EventBus的register方法注册到事件总线中;

    2,事件的观察者-JobEventRdbListener

    先来看看这个接口中定义了什么;

    public interface JobEventListener extends JobEventIdentity {
        
        @Subscribe
        @AllowConcurrentEvents
        void listen(JobExecutionEvent jobExecutionEvent);
        
        @Subscribe
        @AllowConcurrentEvents
        void listen(JobStatusTraceEvent jobStatusTraceEvent);
    }
    
    • 接口中定义了两个方法,也就意味着ElasticJob只会对两个事件作出反应,这个接口没有拓展性,已经固定死只能处理两种事件,但是没有关系,ElasticJob只关心这两个事件,如果后期有更多的事件需要处理,只能在接口继续增加方法;
    • 这两个方法都使用了 @Subscribe 和 @AllowConcurrentEvents 注解,也就意味着,这两个方法可以在多线程中并发执行,需要注意的一点是,该接口的实现类上最好不要再写@Subscribe注解,否则,因为实现类的方法上没有 @AllowConcurrentEvents注解,而实现类上的注解又是优先被解读,就会导致解读不到父类方法上的注解,从而导致方法失去了@AllowConcurrentEvents注解,也就是不能在多线程中同时调用,多线程需要排队获取同步锁;

    再来看看该接口的实现类:

    public final class JobEventRdbListener extends JobEventRdbIdentity implements JobEventListener {
        
        private final JobEventRdbStorage repository;
        
        public JobEventRdbListener(final DataSource dataSource) throws SQLException {
            repository = new JobEventRdbStorage(dataSource);
        }
        
        @Override
        public void listen(final JobExecutionEvent executionEvent) {
            repository.addJobExecutionEvent(executionEvent);
        }
        
        @Override
        public void listen(final JobStatusTraceEvent jobStatusTraceEvent) {
            repository.addJobStatusTraceEvent(jobStatusTraceEvent);
        }
    }
    
    • 可以看到该实现类复写的方法上没有使用任何注解;
    • 该事件监听器主要用来持久化数据,使用到了JobEventRdbStorage类,该类在初始化时,会创建表,具体逻辑可自行看代码深入了解;

    3,事件总线-JobEventBus

    有了事件监听器(JobEventListener),并且有了能够生产事件监听器的工厂(JobEventConfiguration),那么下一步就是将事件监听器注册到事件总线中了,先来看一下ElasticJob的事件总线;

    public final class JobEventBus {
        private final JobEventConfiguration jobEventConfig;
        private final ExecutorServiceObject executorServiceObject;
        private final EventBus eventBus;
        private boolean isRegistered;
        
        public JobEventBus() {
            jobEventConfig = null;
            executorServiceObject = null;
            eventBus = null;
        }
        
        public JobEventBus(final JobEventConfiguration jobEventConfig) {
            this.jobEventConfig = jobEventConfig;
            executorServiceObject = new ExecutorServiceObject("job-event", Runtime.getRuntime().availableProcessors() * 2);
            eventBus = new AsyncEventBus(executorServiceObject.createExecutorService());
            register();
        }
        
        private void register() {
            try {
                eventBus.register(jobEventConfig.createJobEventListener());
                isRegistered = true;
            } catch (final JobEventListenerConfigurationException ex) {
                log.error("Elastic job: create JobEventListener failure, error is: ", ex);
            }
        }
        
        public void post(final JobEvent event) {
            if (isRegistered && !executorServiceObject.isShutdown()) {
                eventBus.post(event);
            }
        }
    }
    

    解读:

    • 首先来看一下空构造函数,所有值都为空,之所以这么写,是因为所有的事件的发布代码,都已经写到代码中,不可分离,此时又没有配置事件总线需要的数据,那么唯一的方法就是不发布事件,所以上面的post方法中,使用了isRegistered,当没有配置事件总线时,不发布任何事件;
    • 即使不是使用空构造函数,如果在构建事件总线时报错,那么也一并认为事件总线不可用,捕获异常,isRegistered还是false,事件还是不发布;
    • 这里的ExecutorServiceObject是获取线程池的工具类,默认线程数量为cpu核心的两倍;
    • 当成功构建事件总线时,将事件监听器注册到EventBus中,并设置isRegistered为true,表示已注册,事件可用;

    相关文章

      网友评论

          本文标题:【ElasticJob源码解析】事件

          本文链接:https://www.haomeiwen.com/subject/xbvyzxtx.html