RxBus

作者: 721d739b6619 | 来源:发表于2017-11-25 21:42 被阅读68次

    RxBus这个词应该是RxJava和EventBus组合的派生词;其实功能就是通过RxJava实现EventBux类似的功能。
    这里先看看RxJava是基于什么背景出现和是什么?

    响应式编程

    什么是响应式编程

    和平常经常听说的面向对象编程和函数式编程一样,响应式编程(Reactive Programming)就是一个编程范式,但是与其他编程范式不同的是它是基于数据流和变化传播的。我们经常在程序中这样写

    A = B + C
    

    A被赋值为B和C的值。这时,如果我们改变B的值,A的值并不会随之改变。而如果我们运用一种机制,当B或者C的值发现变化的时候,A的值也随之改变,这样就实现了”响应式“。

    而响应式编程的提出,其目的就是简化类似的操作,因此它在用户界面编程领域以及基于实时系统的动画方面都有广泛的应用。另一方面,在处理嵌套回调的异步事件,复杂的列表过滤和变换的时候也都有良好的表现。

    RxJava知识背景

    RxJava官网 github地址
    官网介绍

    RxJava is a Java VM implementation of Reactive Extensions: a library
    for composing asynchronous and event-based programs by using
    observable sequences.

    扔物线老哥给出的解释:

    一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库

    二字高度概括就是:异步

    官方还有这么一段话:

    It extends the observer pattern to support sequences of
    data/events and adds operators that allow you to compose
    sequences together declaratively while abstracting
    away concerns about things like low-level threading, synchronization,
    thread-safety and concurrent data structures

    它扩展了观察者模式以支持序列数据/事件,并添加运算符,让您编写
    在抽象的时候声明性地一起序列化,消除了对低级线程,同步,线程安全和并发数据结构。

    RxBus实现

    通过RxJava就可以实现EventBus的功能,看下面代码:

    public class RxBus {
    //单例  volatile为了下面的getDefault()方法正常运行即禁止指令重排序优化
                //详细看这个:http://www.jianshu.com/p/7053217b73cc
        private static volatile RxBus mDefaultInstance;
    
        private RxBus() {
        }
    
        public static RxBus getDefault() {
            if (mDefaultInstance == null) {
                synchronized (RxBus.class) {
                    if (mDefaultInstance == null) {
                        mDefaultInstance = new RxBus();
                    }
                }
            }
            return mDefaultInstance;
        }
    
        /**
         * Subject 事件
         * */
        private final Subject<Object, Object> _bus = new SerializedSubject<>(PublishSubject.create());
    
        public void send(Object o) {
            _bus.onNext(o);
        }
    
        /**
         * Observable 可观察者(即被观察者)
         * */
        public Observable<Object> toObservable() {
            return _bus;
        }
        /**
         * 根据传递的 eventType 类型返回特定类型(eventType)的 被观察者
         * @param eventType 事件类型
         * @param <T>
         * @return
         */
        public <T> Observable<T> toObservable(Class<T> eventType) {
            return _bus.ofType(eventType);
        }
    
        /**
         * 提供了一个新的事件,根据code进行分发
         * @param code 事件code
         * @param o
         */
        public void post(int code, Object o){
            _bus.onNext(new RxBusBaseMessage(code,o));
    
        }
    
    
        /**
         * 根据传递的code和 eventType 类型返回特定类型(eventType)的 被观察者
         * 对于注册了code为0,class为voidMessage的观察者,那么就接收不到code为0之外的voidMessage。
         * @param code 事件code
         * @param eventType 事件类型
         * @param <T>
         * @return
         */
        public <T> Observable<T> toObservable(final int code, final Class<T> eventType) {
            return _bus.ofType(RxBusBaseMessage.class)
                    .filter(new Func1<RxBusBaseMessage,Boolean>() {
                        @Override
                        public Boolean call(RxBusBaseMessage o) {
                            //过滤code和eventType都相同的事件
                            return o.getCode() == code && eventType.isInstance(o.getObject());
                        }
                    }).map(new Func1<RxBusBaseMessage,Object>() {
                        @Override
                        public Object call(RxBusBaseMessage o) {
                            return o.getObject();
                        }
                    }).cast(eventType);
        }
        /**
         * 判断是否有订阅者
         */
        public boolean hasObservers() {
            return _bus.hasObservers();
        }
    }
    

    核心思想

    其实RxJava要解决就是这类型的问题,所以顺理成章就诞生RxBus;
    其核心思想就是:

    • 构建一个事件序列(被观察者)


      创建事件
    • 在事件序列标识该事件(被观察者)


      image.png
    • 观察者或订阅者订阅属于自己标识的事件序列的部分和处理事件序列(观察者)


      观察者或订阅者订阅事件序列和处理事件序列
    • 过滤事件序列(根据业务需要设置可有可无)(观察者)


      image.png

    透过源码看看其实现

    现在看看源码整个流程,顺带熟悉当前火热的RxJava部分类

    创建事件序列

    private final Subject<Object, Object> _bus 
    = new SerializedSubject<>(PublishSubject.create());
    
    • SerializedSubject

    Wraps a {@link Subject} so that it is safe to call its various {@code on} methods from different threads.
    类解释就是一个包装类,可以通过不同线程调用各种方法

    • PublishSubject

    Subject that, once an {@link Observer} has subscribed, emits all subsequently observed items to the subscriber.
    一个事件,一旦{@link Observer}订阅,就会将所有随后观察到的item发送给订阅者

    image.png
    • 首先创建一个SubjectSubscriptionManager (事件订阅管理者)
    • state.onTerminated 值为事件的最终状态;现在将第一个Action处理给它。
    • new PublishSubject<T>(state, state);
      将事件订阅管理者SubjectSubscriptionManager 一个作为OnSubscribe接口给最终父类Observable实现即订阅事件的观察者,另一个为当前PublishSubject(事件)的SubjectSubscriptionManager(事件订阅管理者)
    image.png

    SerializedSubject 序列事件创建,一个给当前Subject,一个作为SerializedObserver创建对象的参数。

    发送序列事件(实际准确来时是事件的下一个Action即动作交由匹配该动作的订阅者处理)

    image.png

    这里的_bus.onNext()其实是在SubjectSubscriptionManager内部类中的SubjectObserver里面的参数Subscriber的onNext()

    image.png image.png

    这个actual.onNext(t)方法其实就是我们订阅者要处理的那个方法。

    即:


    image.png

    红色框框部分即上面所说的部分,典型的面向接口编程。
    到底他们是怎么关联起来的呢?看看下面的截图,就一清二楚了:

    PublishSubject创建的时候也创建了一个SubjectSubscriptionManager对象,就是通过此对象实现事件与订阅者关联的。
    SubjectSubscriptionManager实现OnSubscribe接口;

    image.png

    当Observable.subscribe调用,此接口的方法就会被调用:


    image.png

    而SubjectSubscriptionManager的call()方法:


    image.png

    这里的child其实就是Observable的ActionSubscriber:


    image.png

    我们订阅者一订阅就会等待合适的事件action调用onNext()方法,实现该方法的订阅者就会被执行onNext();

    整个流程就是这样,当然里面还有一些细节部分;如刚才的SubjectSubscriptionManager的call方法有些地方我还没有搞弄就别误人了。实际上里面就是操作Observer数组或队列的。

    最后送上关于该RxBus涉及到的Rx类的UML:


    RxBuxUML.png

    一句话总结我们每次处理的是事件序列的一次变化及Action的onNext()

    相关文章

      网友评论

        本文标题:RxBus

        本文链接:https://www.haomeiwen.com/subject/brpqbxtx.html