美文网首页
RxJava 连接操作符

RxJava 连接操作符

作者: 三流之路 | 来源:发表于2018-06-09 22:18 被阅读0次

    ReactiveX 系列文章目录


    cache/cacheWithInitialCapacity

    看注释意思是将所有数据按原来的顺序缓存起来,就是不知道观察者什么时候订阅,什么时候解除订阅,所以缓存起来,以后直接用。

    val list = arrayListOf(1,2,3)
    val ob = Observable.fromIterable(list)
    //      .cache()
    list.clear()
    ob.subscribe(observerInt)
    

    假设向上面这般用法,无论有没有 cache,Observer 收到的都是只有一个 onComplete。

    val list = arrayListOf(1,2,3)
    val ob = Observable.fromIterable(list)
            .cache()
    ob.subscribe(observerInt)
    list.clear()
    ob.subscribe(observerInt)
    

    现在如果没有 cache,第一个会收到三次 onNext 和一次 onComplete,clear 之后由于数据清空,只会收到 onComplete。而有了 cache,两个订阅得到的结果都是三次 onNext 和一次 onComplete。

    这说明是在有了一个观察者订阅之后,会把被观察者发射的数据缓存起来,这适合多个观察者存在时,其它还没有立刻订阅的观察者也能通过缓存拿到最初的数据。


    cacheWithInitialCapacity 的参数表示内部用的缓冲区大小,对外界使用没区别,cache 方法用的是 16.

    publish

    将普通的 Observable 变成可连接的 ConnectableObservable,它不会在被订阅时发射数据,而是直到使用了connect 操作符时才开始。用这种方法,可以控制在任何时候让 Observable 开始发射数据。

    public final ConnectableObservable<T> publish()
    // 用 Function 转换源 Observable 发射的数据
    public final <R> Observable<R> publish(Function<? super Observable<T>, ? extends ObservableSource<R>> selector)
    
    val ob = Observable.just(1,2,3)
            .doOnSubscribe { Log.e("RX", "onSubscribe")}
            .publish()
    Log.e("RX", "subscribe")
    ob.subscribe(observerInt)
    Thread.sleep(2000)
    ob.connect()
    

    2 秒后执行 connect 在 onSubscribe。

    replay

    ConnectableObservable 和普通的 Observable 最大的区别就是,调用 connect 操作符开始发射数据,后面的订阅者会丢失之前发射过的数据。

    var ob = Observable.interval(1, 100, TimeUnit.MILLISECONDS).take(6)
    ob = ob.publish()
    
    ob.subscribe{ Log.e("RX", "observer 1 onNext $it") }
    ob.connect()
    
    Thread.sleep(400)
    ob.subscribe{ Log.e("RX", "observer 2 onNext $it") }
    ob.connect()
    

    日志:

    observer 1 onNext 0
    observer 1 onNext 1
    observer 1 onNext 2
    observer 1 onNext 3
    observer 1 onNext 4
    observer 1 onNext 5
    observer 2 onNext 5
    

    可见 observer2 丢了 0-4,使用 replay 返回的 ConnectableObservable 会缓存订阅者订阅之前已经发射的数据,可以指定缓存的大小或者时间,这样能避免耗费太多内存。

    public final ConnectableObservable<T> replay()
    public final ConnectableObservable<T> replay(final int bufferSize)
    public final ConnectableObservable<T> replay(int bufferSize, long time, TimeUnit unit)
    public final ConnectableObservable<T> replay(final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler)
    public final ConnectableObservable<T> replay(final int bufferSize, final Scheduler scheduler)
    public final ConnectableObservable<T> replay(long time, TimeUnit unit)
    public final ConnectableObservable<T> replay(final long time, final TimeUnit unit, final Scheduler scheduler)
    public final ConnectableObservable<T> replay(final Scheduler scheduler)
    
    public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector)
    public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final int bufferSize)
    public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, int bufferSize, long time, TimeUnit unit)
    public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final int bufferSize, final long time, final TimeUnit unit, final Scheduler scheduler)
    public final <R> Observable<R> replay(final Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final int bufferSize, final Scheduler scheduler)
    public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, long time, TimeUnit unit)
    public final <R> Observable<R> replay(Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final long time, final TimeUnit unit, final Scheduler scheduler)
    public final <R> Observable<R> replay(final Function<? super Observable<T>, ? extends ObservableSource<R>> selector, final Scheduler scheduler)
    

    重载方法很多,但大致可分为两类,一类返回 ConnectableObservable,一类有参数 selector,可以变换源 Observable 发射的数据,然后将这些数据放到一个 Observable 中,方法返回 Observable。

    缓存 2 个数

    ob = ob.replay(2)
    

    日志

    observer 1 onNext 0
    observer 1 onNext 1
    observer 1 onNext 2
    observer 1 onNext 3
    observer 1 onNext 4
    observer 2 onNext 3
    observer 2 onNext 4
    observer 1 onNext 5
    observer 2 onNext 5
    

    可见 observer2 还取到了被缓存的 3 和 4。

    缓存 300ms 内的数据

    ob = ob.replay(300, TimeUnit.MILLISECONDS)
    

    日志:

    observer 1 onNext 0
    observer 1 onNext 1
    observer 1 onNext 2
    observer 1 onNext 3
    observer 1 onNext 4
    observer 2 onNext 2
    observer 2 onNext 3
    observer 2 onNext 4
    observer 1 onNext 5
    observer 2 onNext 5
    

    收到了前 300ms 缓存的 3,4,5。


    其中第二类看源码内部也调用了第一种的 replay,subscribe 时内部会自动执行 connect。

    val ob2 = ob.replay({
        it.map { it*10 }
    }, 2)
    
    ob2.subscribe{ Log.e("RX", "observer 1 onNext $it") }
    Thread.sleep(400)
    ob2.subscribe{ Log.e("RX", "----------------observer 2 onNext $it") }
    

    但两个观察者都收到了所有数据,和想象不同。

    它不像第一类,它是每次 subscribe 时内部都对普通的 Observable 执行第一类的 replay,再往内部走是 new 了一个 ConnectableObservable。所以两次 subscribe 内部用的是两个 ConnectableObservable 对象。

    暂不清楚它的应用场景在哪里。

    share

    public final Observable<T> share() {
        return publish().refCount();
    }
    

    refCount() 把 ConnectableObservable 变为一个普通的 Observable 但又保持了 ConnectableObservable 的特性。如果出现第一个 Observer,它就会自动调用 connect(),如果所有的 Observer 全部 dispose,那么它也会停止接受上游 Observable 的数据。

    相关文章

      网友评论

          本文标题:RxJava 连接操作符

          本文链接:https://www.haomeiwen.com/subject/ovuxeftx.html