美文网首页
RxBus:用RxJava实现EventBus

RxBus:用RxJava实现EventBus

作者: 魔方宫殿 | 来源:发表于2016-10-10 11:55 被阅读0次

    项目引入了RxJava之后,想减少包的依赖,决定使用RxJava实现EventBus。先来一个简单的实现:

    public class RxBus {
        private final Subject<Object, Object> bus;
    
        private RxBus() {
            //把非线程安全的PublishSubject包装成线程安全的SerializedSubject
            bus = new SerializedSubject<>(PublishSubject.create());
        }
    
        public static RxBus getDefault() {
            return SingletonHolder.INSTANCE;
        }
    
        private static class SingletonHolder {
            public static volatile RxBus INSTANCE = new RxBus();
        }
    
        /**
         * 发送事件
         *
         * @param event 事件对象
         */
        public void post(Object event) {
            if (bus.hasObservers()) {
                bus.onNext(event);
            }
        }
    
        /**
         * 监听事件
         *
         * @param event 事件对象
         * @param <T>       事件类型
         * @return 特定类型的Observable
         */
        public <T> Observable<T> observe(Class<T> event) {
            return bus.ofType(event);
        }
    }
    

    发送事件:

    RxBus.getDefault().post(new TestEvent ());
    

    接收事件:

    Action1 action = new Action1<TestEvent>() {
        @Override
        public void call(TestEvent event) {
        // TODO 事件处理
        }
    };
    RxBus.getDefault().observe(TestEvent.class)
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(action);
    

    OK,完成!功能是完成了,不过使用的时候感觉没有EventBus简洁,不熟悉RxJava的可能也不好理解。稍微做下改动,加个Callback:

    public abstract class Callback<T> implements Action1<T> {
        /**
         * 监听到事件时回调接口
         *
         * @param t 返回结果
         */
        public abstract void onEvent(T t);
    
        @Override
        public void call(T t) {
            onEvent(t);
        }
    }
    

    添加subscribe方法:

    /**
     * 订阅事件
     *
     * @param eventType 事件对象
     * @param callback  事件回调
     * @param <T>       事件类型
     */
    public <T> Subscription subscribe(Class<T> eventType,Callback<T> callback) {
        return bus.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread()) 
            .ofType(eventType)
            .subscribe(callback);
    }
    

    发送事件的地方不用动,接收事件的地方就清爽多了:

    RxBus.getDefault().subscribe(TestEvent.class, new Callback<TestEvent>() {
        @Override
        public void onEvent(TestEvent event) {
            // TODO 事件处理
        }
    });
    

    取消订阅

    接收事件方便清爽了,下面就要考虑下取消订阅的事了。Subscription可以通过unsubscribe()方法取消订阅,但是需要逐个取消。用过EventBus的可能习惯通过unregister统一取消订阅,可以如下修改:
    首先添加一个Map存储订阅对象数组:

    private Map<Object, List<Subscription>> subscriptions;
    

    修改订阅方法:

    /**
     * 订阅事件
     *
     * @param subscriber 订阅者
     * @param eventType 事件对象
     * @param callback  事件回调
     * @param <T>       事件类型
     */
    public <T> void subscribe(Object subscriber, Class<T> eventType, Callback<T> callback) {
        Subscription subscription = bus.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .ofType(eventType)
            .subscribe(callback);
        add(subscriber, subscription);
    }
    
    /**
     * 添加订阅协议
     *
     * @param subscriber   订阅者
     * @param subscription 订阅协议
     */
    private synchronized void add(Object subscriber, Subscription subscription) {
        if (subscriptions == null) {
            subscriptions = new HashMap<>();
        }
        List<Subscription> list = subscriptions.get(subscriber);
        if (list == null) {
            list = new ArrayList<>();
        }
        list.add(subscription);
        subscriptions.put(subscriber, list);
    }
    

    添加取消订阅的方法:

    /**
     * 取消事件订阅
     *
     * @param subscriber 订阅者
     */
    public synchronized void unsubscribe(Object subscriber) {
        if (subscriptions == null) return;
    
        List<Subscription> list = subscriptions.get(subscriber);
        if (list == null || list.isEmpty()) return;
    
        for (Subscription subscription : list) {
            subscription.unsubscribe();
        }
        list.clear();
        subscriptions.remove(subscriber);
    }
    
    /**
     * 取消所有订阅
     */
    public void unsubscribeAll() {
        Set<Map.Entry<Object, List<Subscription>>> set = subscriptions.entrySet();
        for (Map.Entry<Object, List<Subscription>> entry : set) {
            List<Subscription> list = entry.getValue();
            for (Subscription subscription : list) {
                 subscription.unsubscribe();
            }
            list.clear();
        }
        subscriptions = null;
    }
    

    OK,发送事件的地方还是不用动,接收事件修改如下:

    RxBus.getDefault().subscribe(this, TestEvent.class, new Callback<TestEvent>() {
        @Override
        public void onEvent(TestEvent event) {
            // TODO 事件处理
        }
    });
    

    取消事件订阅:

    RxBus.getDefault().unsubscribe(this);
    

    后续

    如果嫌订阅方法参数过多,可以通过反射方法获取事件类型。接受事件的地方就可以精简为:

    RxBus.getDefault().subscribe(this, new Callback<TestEvent>() {
        @Override
        public void onEvent(TestEvent event) {
            // TODO 事件处理
        }
    });
    

    订阅方法修改:

    /**
     * 订阅事件
     *
     * @param subscriber 订阅者
     * @param callback   事件回调
     * @param <T>        事件类型
     */
    public <T> void subscribe(Object subscriber, Callback<T> callback) {
        Class<T> eventType = RawType.getRawType(callback);
        Subscription subscription = bus.subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread()) 
            .ofType(eventType)
            .subscribe(callback);
        add(subscriber, subscription);
    }
    

    项目地址:https://github.com/sslcs/rxbus

    相关文章

      网友评论

          本文标题:RxBus:用RxJava实现EventBus

          本文链接:https://www.haomeiwen.com/subject/duupyttx.html