美文网首页
放弃RxBus,拥抱RxJava(二):Observable究竟

放弃RxBus,拥抱RxJava(二):Observable究竟

作者: W_BinaryTree | 来源:发表于2017-04-02 23:22 被阅读5881次

    上篇简单讲到了一些关于Event/Rx bus的优缺点。并且提到了如何“正确”使用RxJava,而不是使用RxBus来自己重新发明轮子。

    放弃RxBus,拥抱RxJava(一):为什么避免使用EventBus/RxBus

    其中也讲到了一个简单使用 create() 方法来进行封装Observable。但也留下了许多坑,比如内存泄漏,不能Multicast(多个Subscriber订阅同一个Observable) 等问题。所以这篇,我们接着通过这个例子,来具体了解下,如何封装Observable。

    1. Observable提供的静态方法都做了什么?

    首先我们来简单看一下Observable的静态方法,just/from/create都怎么为你提供Observable。
    我们先看just:

    public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }
    

    我们暂时不需要纠结 RxJavaPlugins.onAssembly() 这个方法。比较重要的是 just(T item) 方法会为你提供一个 ObservableJust<T>(item) 的实例,而这个 ObservableJust 类,就是一个RxJava内部的实现类。
    在 RxJava 2.x 中 Observable 是一个抽象类,只有一个抽象方法,subscribeActual(Observer observer);(但是Observable的源码足足有13518行!!!)

    public abstract class Observable<T> implements ObservableSource<T>{
      //implemented methods
      
      protected abstract void subscribeActual(Observer<? super T> observer);
      
      //other implements/operators
    }
    

    那么ObservableJust这个类究竟什么样呢?

    public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    
        private final T value;
        public ObservableJust(final T value) {
            this.value = value;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> s) {
            ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
            s.onSubscribe(sd);
            sd.run();
        }
    
        @Override
        public T call() {
            return value;
        }
    }
    

    我们首先看到构造方法里,直接把value赋给了ObservableJust的成员。这也就是为什么Observable.just()里的代码会直接运行,而不是像create()方法,有Subscriber时候才能运行(Observable.create的初始化方法在subscribeAcutal里执行)。
    再来看看两个item的just(T item1,T item2):

    public static <T> Observable<T> just(T item1, T item2) {
        ObjectHelper.requireNonNull(item1, "The first item is null");
        ObjectHelper.requireNonNull(item2, "The second item is null");
    
        return fromArray(item1, item2);
    }
    

    诶?怎么画风突变?不是ObservableJust了?其实除了只有一个item的just,其他的just方法也都是调用了这个fromArray。那我们来看看这个fromArray:

    public static <T> Observable<T> fromArray(T... items) {
        //NullCheck
        return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
    }
    

    前面一些check我们忽略,这里我们发现一些熟悉的身影了ObservableFromArray<T>(items)。又一个Observable的实现类。

    public final class ObservableFromArray<T> extends Observable<T> {
        final T[] array;
        public ObservableFromArray(T[] array) {
            this.array = array;
        }
        @Override
        public void subscribeActual(Observer<? super T> s) {
            FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
            s.onSubscribe(d);
            d.run();
        }
    
        static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
          //implements
        }
    }
    

    是不是更熟悉?其实Observable几乎所有的静态方法和操作符都是这样,甚至包括一些著名的RxJava库比如RxBinding,也都是使用这种封装方法。内部实现Observable的subscribeActual()方法。对外只提供静态方法来为你生成Observable。为什么这么做,我们来了解一下subscribeActual()方法。

    2. subscribeActual() 究竟是什么?

    subscribeActual()其实就是Observable和Observer沟通的桥梁。这个Observer(Subscriber)就是你在Observable.subscribe()方法里写的那个类,或者是Consumer(只处理onNext方法)。

    public final void subscribe(Observer<? super T> observer) {
            //NullCheck&Apply plugin
            subscribeActual(observer);
    
    }
    

    我们看到其实这个方法除了Check和Apply就只有这一行subscribeActual(observer),连接了Observable和Observer。所以我们知道了,subscribeActual()方法里的代码,只有在subscribe()调用后,才回调用。

    那么他们是如何链接的呢?其实很简单,根据你的逻辑一句一句的调用observer.onXX()方法就可以了。比如刚才我们看到的ObservableJust:

    @Override
    public void run() {
        if (get() == START && compareAndSet(START, ON_NEXT)) {
            observer.onNext(value);
            if (get() == ON_NEXT) {
                lazySet(ON_COMPLETE);
                observer.onComplete();
            }
        }
    }
    

    再比如我们的ObservableFromArray:

    void run() {
        T[] a = array;
        int n = a.length;
    
        for (int i = 0; i < n && !isDisposed(); i++) {
            T value = a[i];
            if (value == null) {
                actual.onError(new NullPointerException("The " + i + "th element is null"));
                return;
            }
            actual.onNext(value);
        }
        if (!isDisposed()) {
            actual.onComplete();
        }
    }
    

    复杂点的例子,比如如何封装button的OnClick事件:

    @Override protected void subscribeActual(Observer<? super Object> observer) {
      if (!checkMainThread(observer)) {
        return;
      }
      Listener listener = new Listener(view, observer);
      observer.onSubscribe(listener);
      view.setOnClickListener(listener);
    }
    
    static final class Listener extends MainThreadDisposable implements OnClickListener {
      private final View view;
      private final Observer<? super Object> observer;
    
      Listener(View view, Observer<? super Object> observer) {
        this.view = view;
        this.observer = observer;
      }
    
      @Override public void onClick(View v) {
        if (!isDisposed()) {
          observer.onNext(Notification.INSTANCE);
        }
      }
    
      @Override protected void onDispose() {
        view.setOnClickListener(null);
      }
      }
    }
    

    但是细心的同学应该看到了,每个subscribeActual()方法里,都会有 observer.onSubscribe(disposable)这句。那么这句又是做什么的呢?根据Observable Contract,onSubscribe是告知已经准备好接收item。而且通过这个方法将Disposable传回给Subscriber。
    Disposable其实就是控制你取消订阅的。他只有两个方法 dispose() 取消订阅,和 isDisposed() 来通知是否已经取消了订阅。
    取消订阅时,要根据需求释放资源。
    在subscribeActual()里逻辑要严谨,比如onComplete()之后不要有onNext()。需要注意的点很多,所以可能这也就是为什么RxJava推荐用户使用静态方法生成Observable吧。如果有兴趣,可以直接阅读

    Observable Contract

    3 Observable.create()

    create()方法是一个历史遗留问题了。由于这个命名,很多人都觉得Observable.create()不就应该是生成Obseravble最先想到的方法吗? 在 RxJava 1.x 这是错误的,Observable.create()在 1.x 版本几乎饱受诟病。不是他不好,而是他太难操控。 RxJava一定要遵循Observable Contract才会按照预期执行,而使用create()你可以完全无视这个规则。你可以在onComplete之后继续发送onNext事件,下游仍会收到事件。如果在1.x想正确的使用Observable.create()你必须首先了解几乎所有的规则。所以一直以来 RxJava 1.x 版本使用Observable.create是不推荐的。(在新版的RxJava 1.3中,create()方法已经标记@deprecated

    在经历了1.x的失败后,RxJava 2.x 提供了安全的create()方法。他通过ObservableEmitter作为中间人,代替处理。使得即便你在Emitter中没有参照ObservableContract,下游仍会按照预期的进行。

    4 关于操作符

    我们上文说到的just,from,create等等是生成Observable的操作符,那么如map,filter等等的操作符会有什么区别吗?
    我们来看下源码:
    map:

    public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        ObjectHelper.requireNonNull(mapper, "mapper is null");
        return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
    }
    

    filter:

    public final Observable<T> filter(Predicate<? super T> predicate) {
        ObjectHelper.requireNonNull(predicate, "predicate is null");
        return RxJavaPlugins.onAssembly(new ObservableFilter<T>(this, predicate));
    }
    

    我们看到,这个区别就是在生成新Observable的时候,会需要两个参数,一个是这个Observable本身,也就是代码中的this,另一个就是需要进行操作的接口实现(当然也有更多参数的比如Schduler等等,大同小异,不再赘述)。而这个Observable本身,也就是我们口中常说的上游。上游下游是根据操作符的来说,对于一个操作符,在这个操作符之前的就是上游,而这个操作符之后的就是下游。
    比如我们的map:

    public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
        @Override
        public void subscribeActual(Observer<? super U> t) {
            source.subscribe(new MapObserver<T, U>(t, function));
        }
    }
    

    source就是我们的上游。而这个MapObserver就是我们的中间人(其实也算是操作符本身),将数据根据需求,处理后发给下游。
    操作符原理非常复杂,map可以说是最简单的了。如果有兴趣我推荐可以看一下publish(selector)等等复杂的操作符。更深入理解操作符。当然,有毅力的同学也可以关注RxJava 主要负责人的系列博客(纯英文,而且很难懂,不是英语难懂,是原理很难懂)。

    Advanced Reactive Java

    关于lift

    读过扔物线大神文章入门的同学应该对lift有一个了解。RxJava 1.x 几乎所有操作符都是基于lift完成的。但是RxJava 2.x 可以说几乎看不到lift。 目前lift仅仅作为提供自定义操作符的一个接口(虽然更推荐使用简单好用的compose,因为lift需要复写七个抽象方法。)。
    最后再说一下几点:

    • Flowable:Floawble其实在实现上和Observable类似,区别是Observable同过 Disposable控制取消订阅。而Flowable同过Subscription。其中还需要request()方法控制流量。具体关于这个问题,我推荐这篇文章

    给初学者的RxJava2.0教程

    总结:

    • 我们从源码分析角度来说,RxJava 2.x 也是同过subscribeActual来链接Observable和Observer(Subscriber)。本质上和Listener没什么太大区别。但是,RxJava的确是诸多一线Java/Android开发者的结晶。丰富的操作符,线程调度等等诸多优势。而且保证类型安全。这里再次感谢他们,毕竟我们还是站在他们肩膀上编程

    小彩蛋:关于Reactive Streams 和 RxJava

    其实 Reactive Programming在Java上的实现不止 RxJava 一个。比较出名的还有Project Reactor和 google 的 agera 等等。 但是综合考虑,无论是性能,扩展性上RxJava在Android平台上是最优秀的。 由于都在JVM上,大家都决定统一接口所以推出 Reactive Streams定义了这一套的几个基本接口:

    包括了 :

    //对应RxJava中的Flowable
    public interface Publisher<T> {
        public void subscribe(Subscriber<? super T> s);
    }
    
    //RxJava并没有直接对应,而是各种形式的实现类。
    public interface Subscriber<T> {
        public void onSubscribe(Subscription s);
        public void onNext(T t);
        public void onError(Throwable t);
        public void onComplete();
    }
    
    //同上,RxJava在flowable中直接使用Subscription
    public interface Subscription {
        public void request(long n);
        public void cancel();
    }
    
    //Flowable版本的Subject
    public interface Processor<T, R> extends Subscriber<T>, Publisher<R> {
    }
    

    正因为这四个接口的命名关系。本在RxJava 1.x 的Observable改名为Flowable。而RxJava 2.x的 Observable是完全没有backpressure支持。因为起名冲突的原因,将本来的Subscription改为Disposable,Subscriber改为Observer。

    相关文章

      网友评论

          本文标题:放弃RxBus,拥抱RxJava(二):Observable究竟

          本文链接:https://www.haomeiwen.com/subject/ldtbottx.html