美文网首页
[大白装逼]YEventBus事件总线的实现

[大白装逼]YEventBus事件总线的实现

作者: lewis_v | 来源:发表于2018-03-25 11:53 被阅读20次

    Y事件总线:基于java的Observe和Observable实现的事件总线
    github地址:https://github.com/lewis-v/YEventBus

    使用方式

    导入依赖

    Add it in your root build.gradle at the end of repositories:

        allprojects {
            repositories {
                ...
                maven { url 'https://jitpack.io' }
            }
        }
    

    Add the dependency

        dependencies {
                compile 'com.github.lewis-v:YEventBus:1.0.0'
        }
    

    使用方式

    定义事件类TestEvent2继承于IEvent,并注册事件

    YEventBus.getInstance().subscriber(TestEvent2.class, new YObserver<TestEvent2>() {//订阅事件,处理的所在的线程与分发的线程一致
                @Override
                public void onSuccess(TestEvent2 event) {
                    Log.i(TAG,event.toString());
                }
    
                @Override
                public void onFail(Exception e) {
                    Log.e(TAG,e.getMessage());
                }
            });
            
    YEventBus.getInstance().subscriber(TestEvent.class, new YMainThreadObserver<TestEvent>() {//订阅事件,会在主线程中处理
                @Override
                public void onSuccess(TestEvent event) {
                    Log.i(TAG,event.toString());
                }
    
                @Override
                public void onFail(Exception e) {
                    Log.e(TAG,e.getMessage());
                }
            });
    

    发布事件

     YEventBus.getInstance().postMainEvent(TestEvent.class,new TestEvent(TAG));//发布在主线程分发的事件
     
     YEventBus.getInstance().postEvent(TestEvent.class,new TestEvent(TAG));//发布在子线程分发的事件
    

    取消订阅

    YEventBus.getInstance().unSubscriber(TestEvent.class,observer);//取消某事件下的某个订阅者的订阅
    YEventBus.getInstance().unSubscriberEvent(TestEvent.class);//取消TestEvent整个系列事件的订阅
    YEventBus.getInstance().unSubscriberAll();//取消所有事件的订阅
    

    具体实现

    Observable与Observer

    首先是使用java的Observable,在发布事件时需要先setChanged()在进行发布,否者是发布不了的

    public class YObservable extends Observable {
    
        public <T extends IEvent> void postEvent(T data){
            setChanged();
            notifyObservers(data);
        }
    }
    

    然后是java的Observer,这里实现了OnGetEvent接口,主要是要在本来的Observer接口上加上成功与失败的调用方法,其中Observer接口需要实现updata方法,此方法是在事件分发时调用的方法

    interface OnGetEvent<E extends IEvent> extends Observer{
        void onSuccess(E event);
        void onFail(Exception e);
    }
    

    YObserver控制了事件的实际处理及异常的获取

    public abstract class YObserver<E extends IEvent> implements OnGetEvent<E> {
    
        @Override
        public void update(Observable o, Object arg) {
            try {
                onSuccess((E) arg);
            }catch (Exception e){
                onFail(e);
            }
        }
    }
    

    这里除了提供YObserver,还提供了YMainThreadObserver,此Observer的事件处理会在主线程中进行,添加此类的意义是,可以再发布时指定在主线程,也可以在订阅的时候指定在主线程,当然在订阅的时候指定的优先级比发布的时候指定优先级高.

    public abstract class YMainThreadObserver<E extends IEvent> implements OnGetEvent<E>{
        @Override
        public void update(final Observable o, final Object arg) {
            ThreadSchedule.getMainHandle().post(new Runnable() {
                @Override
                public void run() {
                    try {
                        onSuccess((E) arg);
                    }catch (Exception e){
                        onFail(e);
                    }
                }
            });
        }
    }
    

    Observable管理类

    YObservableManager用于管理Observable,内部定义了ConcurrentHashMap来存储Observable,其键值为对应事件的Class,在订阅和取消订阅会对map进行插入或遍历

    public class YObservableManager {
        private ConcurrentHashMap<Class,YObservable> mObservableMap;
        private IEventHandle handle;
    
    
        public YObservableManager() {
            mObservableMap = new ConcurrentHashMap<>();
            init();
        }
    
        public YObservableManager(ConcurrentHashMap<Class, YObservable> mObservableMap) {
            this.mObservableMap = mObservableMap;
            init();
        }
    
        public void init(){
            handle = new YEventHandle();
        }
    
        /**
         * 设置自定义的事件分发处理
         * @param handle
         */
        public void setHandle(IEventHandle handle) {
            this.handle = handle;
        }
    
        /**
         * 发布消息
         * @param event
         * @param <T>
         */
        public <T extends IEvent> void postEvent(Class<T> event,T data){
            YObservable observables = mObservableMap.get(event);
            if (handle == null){
                init();
            }
            handle.postEvent(observables,data);
        }
    
        /**
         * 发布主线程消息
         * @param event
         * @param <T>
         */
        public <T extends IEvent> void postMainEvent(Class<T> event,T data){
            YObservable observables = mObservableMap.get(event);
            if (handle == null){
                init();
            }
            handle.postMainEvent(observables,data);
        }
    
        /**
         * 订阅事件
         * @param event
         * @param observer
         * @param <T>
         */
        public <T extends IEvent> void subscriber(Class<T> event,OnGetEvent<T> observer){
            if (mObservableMap.containsKey(event)){
                mObservableMap.get(event).addObserver(observer);
            }else {
                YObservable observable = new YObservable();
                observable.addObserver(observer);
                mObservableMap.put(event, observable);
            }
        }
    
        /**
         * 解除订阅
         * @param event
         * @param observer
         * @param <T>
         */
        public <T extends IEvent> void unSubscriber(Class<T> event,YObserver<T> observer){
            if (mObservableMap.containsKey(event)){
                mObservableMap.get(event).deleteObserver(observer);
            }
        }
    
        /**
         * 解除一个事件系列的订阅
         * @param event
         */
        public void unSubscriberEvent(Class<? extends IEvent> event){
            if (mObservableMap.containsKey(event)) {
                mObservableMap.get(event).deleteObservers();
                mObservableMap.remove(event);
            }
        }
    
        /**
         * 解除所有事件订阅
         */
        public void unSubscriberAll(){
            for (Map.Entry<Class,YObservable> entry : mObservableMap.entrySet()){
                YObservable value = entry.getValue();
                if (value != null){
                    value.deleteObservers();
                }
            }
            mObservableMap.clear();
        }
    
        /**
         * 释放资源
         */
        public void destroy(){
            handle.destroy();
            handle = null;
            unSubscriberAll();
        }
    }
    

    事件的发布

    上述代码中,在发布消息的时候会调用IEventHandle的postEvent,其实际的实现为

    public class YEventHandle implements IEventHandle{
        private ExecutorService executorServiceHandle;//处理线程池
    
        public YEventHandle() {
            init();
        }
        private void init(){
            executorServiceHandle = Executors.newFixedThreadPool(2*Runtime.getRuntime().availableProcessors());
        }
    
        /**
         * 发布消息
         * @param observable
         * @param data
         * @param <T>
         * @throws InterruptedException
         */
        @Override
        public <T extends IEvent> void postEvent(YObservable observable, T data) {
            handle(observable,data);
        }
    
        /**
         * 发布主线程处理消息
         * @param observable
         * @param data
         * @param <T>
         */
        @Override
        public <T extends IEvent> void postMainEvent(YObservable observable, T data) {
            handleInMain(observable,data);
        }
    
        /**
         * 处理
         * @param observable
         * @param data
         * @param <T>
         */
        private <T extends IEvent>  void handle(final YObservable observable, final T data){
            executorServiceHandle.execute(new Runnable() {
                @Override
                public void run() {
                    if (observable != null) {
                        observable.postEvent(data);
                    }
                }
            });
        }
    
        /**
         * 在主线程处理
         * @param observable
         * @param data
         * @param <T>
         */
        private <T extends IEvent>  void handleInMain(final YObservable observable, final T data){
            executorServiceHandle.execute(new Runnable() {
                @Override
                public void run() {
                    if (observable != null) {
                        ThreadSchedule.getMainHandle().post(new Runnable() {
                            @Override
                            public void run() {
                                observable.postEvent(data);
                            }
                        });
    
                    }
                }
            });
        }
    
        /**
         * 释放资源
         */
        @Override
        public void destroy() {
            executorServiceHandle.shutdownNow();
            executorServiceHandle = null;
        }
    
    
    }
    

    事件的分发处理,会在一个线程池里进行,线程池的大小为Cpu核心数的2倍,当事件过多时会在线程池的队列中等待,需要注意的是对事件的处理尽量不要做太耗时的任务,不然把线程池中的所有线程都阻塞了会导致整个事件总线阻塞,后面的时间将无法继续发布.

    小结结

    Y事件总线的实现只要是使用了java的Observable和Observer,其内部也是使用一个Vector类保存Observer,在发布的时候,遍历这里列表进行发布,这也是设计模式中的观察与被观察者的模式.

    相关文章

      网友评论

          本文标题:[大白装逼]YEventBus事件总线的实现

          本文链接:https://www.haomeiwen.com/subject/lwehcftx.html