我是用来缓存的操作符-----Cache

作者: javalong | 来源:发表于2018-04-22 16:47 被阅读26次
    Cache操作符功能介绍

    缓存前面操作通过onNext所传递下来的参数,在下一次subscribe订阅时,直接跳过前面的步骤,直接执行后面步骤。

    用途

    单纯用文字介绍Cache的功能,大家可能很难理解。大家可以想象下,比如http请求,我的前面操作是有延时的,如果没有用cache,每次subscribe都会去真正发起http�请求。但是如果在尾部添加cache操作符的话,那么下一次subscribe的时候,其实就是直接从缓存中获取,而不会发起真正的http请求。

    来一段小代码
      var handler = Handler()
            //延时后 cache的作用会更明显
    //        observable = Observable.create(object : Observable.OnSubscribe<String> {
    //            override fun call(t: Subscriber<in String>) {
    //                t.onNext("Test1")
    //                t.onNext("Test2")
    //                t.onNext("Test3")
    //            }
    //        }).delay(4, TimeUnit.SECONDS).cache()
            observable = Observable.create(object : Observable.OnSubscribe<String> {
                override fun call(t: Subscriber<in String>) {
                    t.onNext("Test1")
                    t.onNext("Test2")
                    t.onNext("Test3")
                }
            }).cache()
    
            btSub.setOnClickListener({
                observable?.subscribe({ msg ->
                    handler.post(Runnable {
                        tvContent.text = tvContent.text.toString() + "\n" + msg
    
                    })
    
                })
            })
    

    这里有一段注释的代码,是使用了delay,使用了delay之后cache的作用会更加明显,第一次点击按钮触发subscribe订阅后,会延迟4秒后,在界面上Test1,Test2,Test3,第二次,第三次...界面上都会立刻打印Test1,Test2,Test3,但是由于我还没有为大家介绍过delay操作符,所以我这里也先不过多介绍,在以后的文章中我们会为大家进行delay的源码分析。
    有了前面这么一大串的介绍后,我想大家对cache操作符有了一定的了解,那么我们就带着自己心中的疑问和猜想,按照demo的顺序一步步跟入源码。


    看看源代码

    Observable

    public final Observable<T> cache() {
            return CachedObservable.from(this);
        }
    

    CachedObservable

    public static <T> CachedObservable<T> from(Observable<? extends T> source) {
            return (CachedObservable<T>)from(source, 16);
        }
    public static <T> CachedObservable<T> from(Observable<? extends T> source, int capacityHint) {
            if (capacityHint < 1) {
                throw new IllegalArgumentException("capacityHint > 0 required");
            }
            CacheState<T> state = new CacheState<T>(source, capacityHint);
            CachedSubscribe<T> onSubscribe = new CachedSubscribe<T>(state);
            return new CachedObservable<T>(onSubscribe, state);
        }
    

    个人理解:因为前面其实已经介绍到了,cache其实是把前面传递下来的参数,保存起来了,那么肯定是要有个数组或者列表的,这里我看到capacityHint,我就认为CacheState应该有可能是这个容器。
    这倒不是最重要的,在Observable的子类中,其实我们最重要看的就是2个东西OnSubscribe,Subscriber明白了这2个东西,基本上对这个操作符就了解的差不多了

    CachedObservable.CacheState

    static final class CacheState<T> extends LinkedArrayList implements Observer<T> {
    ...
    }
    

    CachedObservable.CachedSubscribe

    static final class CachedSubscribe<T> extends AtomicBoolean implements OnSubscribe<T> {
      ...
    }
    

    OnSubscribe很明显,就是CachedSubscribe
    那么CacheState是不是Subscriber?,在这里CacheState实现了Observer接口
    Observer

    public interface Observer<T> {    
       void onCompleted();   
       void onError(Throwable e);    
       void onNext(T t);
    }
    

    前面我们几篇文章都介绍了,Subscriber其实就是onNext..等方法的具体实现。所以我们可以直接把CacheState当作是Subscriber来看待,这样会很好的帮助我们理解。

     public abstract class Subscriber<T> implements Observer<T>, Subscription {
        ...
    }
    

    个人理解:Subscriber和Observer关系
    Subscriber实现了Observer接口,Subscriber可以subscribe,unsubscribe,还有onStart方法。
    在RxJava操作符中,其实我们关心的是SubscrberonNext方法和OnSubscribecall方法,能搞定这2个方法,那么我们就对这个操作符有了很深的了解了。

    既然我们已经找到了2个重要的对象,那么我们继续往下深入。
    Observable.cache调用后得到了一个CachedObservable,然后点击按钮调用subscribe
    Observable

    ...
     public final Subscription subscribe(final Action1<? super T> onNext) {
            if (onNext == null) {
                throw new IllegalArgumentException("onNext can not be null");
            }
    
            Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
            Action0 onCompleted = Actions.empty();
            return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
        }
    ...
    public final Subscription subscribe(Subscriber<? super T> subscriber) {
            return Observable.subscribe(subscriber, this);
        }
    ...
    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
    ...
            subscriber.onStart();
            if (!(subscriber instanceof SafeSubscriber)) {
                subscriber = new SafeSubscriber<T>(subscriber);
            }
                RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
               ...
    

    这里我贴出这块代码,其实在前面几篇文章我都贴过了,这里重新贴出希望大家加强记忆。调用subscribe方法,其实就是obsevable.onSubscibe.call(subscriber)
    关键的地方来了,这里的obsevable,onSubscribe,subscriber分别是什么呢?
    链式结构中,操作符使用的越多,到最后会越难判断,这里我们只有一个操作符,相对来说比较简单
    这里的observable很明显就是CachedObservableobsevable.onSubscibe自然就是刚才我们分析得到的CachedSubscribe,如果你想当然的以为subscriber就是刚才的CacheState那就错了。
    那么subscriber到底是什么呢?
    我们从刚才的代码一步步往上推,其实就明白。
    subscriber是一个参数由外部传入的,

    ...
     if (!(subscriber instanceof SafeSubscriber)) {
                subscriber = new SafeSubscriber<T>(subscriber);
            }
    ...
    

    其实已经告诉我们,最终的subscriber其实是一个SafeSubscriber。而原来的subscriber当作构造参数保存在了SafeSubscriber中。

    再往前看,看原来的subscriber是什么。

    ...
    public final Subscription subscribe(final Action1<? super T> onNext) {
           ...
            return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
        }
    ...
    

    很明显的就是,我们在demo中传入的Action被包装成了ActionSubscriber

    看图说话

    image.png

    所以最终是这么一个subscriber

    那么我们继续深入。

    observable.onSubscriber.call(subscriber)

    static final class CachedSubscribe<T> extends AtomicBoolean implements OnSubscribe<T> {
     ...
            @Override
            public void call(Subscriber<? super T> t) {
                ReplayProducer<T> rp = new ReplayProducer<T>(t, state);
                state.addProducer(rp);
                t.add(rp);
                t.setProducer(rp);
                if (!get() && compareAndSet(false, true)) {
                    state.connect();
                }
            }
        }
    

    我们一句一句来分析。

    1. 创建ReplayProducer对象
       static final class ReplayProducer<T> extends AtomicLong implements Producer, Subscription {
           ...
            public ReplayProducer(Subscriber<? super T> child, CacheState<T> state) {
                this.child = child;
                this.state = state;
            }
            @Override
            public void request(long n) {
            ...
            }
    ...
    

    ReplayProducer是一个Producer,前面介绍过Producer,我们只需要看它request方法就好了。我们先等下看它的request

    1. ReplayProducer添加到CacheState对象中
     public void addProducer(ReplayProducer<T> p) {
                synchronized (connection) {
                    ReplayProducer<?>[] a = producers;
                    int n = a.length;
                    ReplayProducer<?>[] b = new ReplayProducer<?>[n + 1];
                    System.arraycopy(a, 0, b, 0, n);
                    b[n] = p;
                    producers = b;
                }
            }
    

    这里用的是数组的形式,来添加一个新元素,比较简单,就不过多介绍了。

    1. ReplyProducer当作SubscriptionSubscriber绑定在一起。
    public final void add(Subscription s) {
            subscriptions.add(s);
        }
    ...
    @Override
        public final void unsubscribe() {
            subscriptions.unsubscribe();
        }
    ...
    

    可以理解为,当subscriber.unsubscribe,对应的Producer也取消了订阅。这种操作在前面几个操作符中也有出现。

    1. 调用Producerrequest方法
      setProducer的操作呢,我们在前面也介绍过,就是直接调用Producer的request方法

    注意:由于我们现在只使用一个操作符,比较简单,先这么理解,后续我们讲多个操作符混合使用就可能会出现比较复杂的情况,由浅入深,慢慢理解。

    ...
    public void request(long n) {
                for (;;) {
                    long r = get();
                    if (r < 0) {
                        return;
                    }
                    long u = r + n;
                    if (u < 0) {
                        u = Long.MAX_VALUE;
                    }
                    if (compareAndSet(r, u)) {
                        replay();
                        return;
                    }
                }
            }
     public void replay() {
                synchronized (this) {
                    if (emitting) {
                        missed = true;
                        return;
                    }
                    emitting = true;
                }
                boolean skipFinal = false;
                try {
                    final Subscriber<? super T> child = this.child;
    
                    for (;;) {
    
                        long r = get();
    
                        if (r < 0L) {
                            skipFinal = true;
                            return;
                        }
                        int s = state.size();
                        if (s != 0) {
                          //这段代码当读提出来分析
                            ...
      
                        }
                        synchronized (this) {
                            if (!missed) {
                                emitting = false;
                                skipFinal = true;
                                return;
                            }
                            missed = false;
                        }
                    }
                } finally {
                    if (!skipFinal) {
                        synchronized (this) {
                            emitting = false;
                        }
                    }
                }
            }
        }
    ...
    

    request方法中呢,其实主要调用了replay方法。
    里面最重要的一句是

    int s = state.size();
    if (s != 0) {
    ...
    }
    

    由于第一次调用subscribe所以根本没有缓存东西,所以state.size()==0,所以第一调用replay方法基本上没有做什么操作。

    1. 第一次调用subscribe执行
     if (!get() && compareAndSet(false, true)) {
                    state.connect();
                }
    

    我们先来看
    !get() && compareAndSet(false, true),由于CachedSubscribe<T> extends AtomicBoolean

    第一次进来get()==false,然后再设置为true,确保了下一次不调用state.connect

    下面我们具体分析下connect方法,最重要的缓存操作在这里

    ...
    public void connect() {
                Subscriber<T> subscriber = new Subscriber<T>() {
                    @Override
                    public void onNext(T t) {
                        CacheState.this.onNext(t);
                    }
                    @Override
                    public void onError(Throwable e) {
                        CacheState.this.onError(e);
                    }
                    @Override
                    public void onCompleted() {
                        CacheState.this.onCompleted();
                    }
                };
                connection.set(subscriber);
                source.unsafeSubscribe(subscriber);
                isConnected = true;
            }
    ...
    

    这里呢,我们还是分步骤一步步来解析

    1. 创建一个Subscriber
      因为在前面,其实我们本身对cache的功能已经有所了解,就是在第一次以后,再调用subscribe其实就是直接把缓存数据拿过来直接传递下去。

    那么在这里,其实我们就可以看出,这里新建的Subscriber就是用来直接传递下去的Subscriber。这里其实我们可以画个大致的示意图。

    看图说话

    image.png

    第一次subscribe走上面实现,第二次subscribe直接走下面虚线,而中间的这个Subscriber就是我们现在所提到的这个Subscriber。第二次以后就直接越过了demo中的OnSubscribe

    注意: 这里没有涉及到demo中的Subscriber,通过前面的分析我们知道demo中的Subscriber已经被包装成了SafeSubscriber保存在ReplyProducer中的child变量里

    1. 保存subscriber
      既然前面提到了,第一次之后的每次操作都会使用到这个subscriber,那么我们肯定要把这个subscriber保存起来。
     public void set(Subscription s) {
          ...
            state.update(s);
        }
    

    SequentialSubscription

    ...
    public boolean update(Subscription next) {
            for (;;) {
                Subscription current = get();
    
                if (current == Unsubscribed.INSTANCE) {
                    if (next != null) {
                        next.unsubscribe();
                    }
                    return false;
                }
    
                if (compareAndSet(current, next)) {
                    if (current != null) {
                        current.unsubscribe();
                    }
                    return true;
                }
            }
        }
    ...
    

    如我们所料,确实是把这个subscriber保存起来了。
    compareAndSet(current, next)

    1. 第一次subscribe,还是要调用前面的onSubscribe
    source.unsafeSubscribe(subscriber);
    

    其实就是

    source.onSubscribe.call(subscriber);
    

    source我们可以通过源码知道,就是创建CachedObservable时传入的我们demoObservable.create()得到的Observable对象。
    onSubscribe就是demo中的OnSubscribe
    下面我们还是重点,放在subscriber上。

    这里的subscriber其实我们根据上面的代码很容易判断出就是刚刚new 出来的Subscriber,并且在onNext其实是直接调用CacheStateonNext

    CachedObservable.CacheState

    ...
    public void onNext(T t) {
                if (!sourceDone) {
                    Object o = NotificationLite.next(t);
                    add(o);
                    dispatch();
                }
            }
    ...
    

    当我们在demo中OnSubscribe调用一次subscriber.onNext,其实就是进入CachedObservable.CacheStateonNext

    在这里我们再次一句一句分析。

    1. NotificationLite.next(t)
    public static <T> Object next(T t) {
            if (t == null) {
                return ON_NEXT_NULL_SENTINEL;
            } else {
                return t;
            }
        }
    

    很明显就是判断参数是否为null。

    1. add(o)

    前面我们就提到了CacheState用来保存上面传递下来的参数.
    具体实现就是在这里,add(o)。

    1. dispatch()
     void dispatch() {
                ReplayProducer<?>[] a = producers;
                for (ReplayProducer<?> rp : a) {
                    rp.replay();
                }
            }
    

    很明显这里最重要的就是rp.replay()
    我们先不管为什么这里为什么用了个for循环,我们先来看看rp.replay到底做了什么。

     public void replay() {
    ...
      int s = state.size();
      if (s != 0) {
          ...
        if (NotificationLite.accept(child, o)) {
          ...
        }
      }
    ...
    }
    

    NotificationLite.accept

    public static <T> boolean accept(Observer<? super T> o, Object n) {
             ...
                o.onNext((T) n);
                return false;
             ...
        }
    

    reply就是判断CacheState的缓存是否已经有了,有了之后就直接调用child.onNext(o)

    那么child是什么呢?就是前面所分析的

    image.png

    这样在一层一层的调用onNext最终到了我们自己的Action

    第二次调用subscribe

    第一次调用subscribe的整个流程其实我们已经走完,下面我们直接简单的走一下第二次调用subscribe的流程。

    直接从CachedSubscribecall方法看起。

     public void call(Subscriber<? super T> t) {
                ReplayProducer<T> rp = new ReplayProducer<T>(t, state);
                state.addProducer(rp);
                t.add(rp);
                t.setProducer(rp);
                if (!get() && compareAndSet(false, true)) {
                    state.connect();
                }
            }
    
    1. 又创建了一个ReplayProducer
    2. 添加到CacheState
    3. ReplayProducer加入到SafeSubscriber中绑定在一起
    4. 调用Producerrequest方法,在这里其实就是调用ReplayProducerreplay方法(前面分析过了)
    5. 因为第一次的时候,已经设置为true所以get()==true直接跳过

    所以综上分析,我们还是直接看ReplayProducerreplay方法。

     public void replay() {
    ...
      int s = state.size();
      if (s != 0) {
          ...
        if (NotificationLite.accept(child, o)) {
          ...
        }
      }
    ...
    }
    

    NotificationLite.accept

    public static <T> boolean accept(Observer<? super T> o, Object n) {
             ...
                o.onNext((T) n);
                return false;
             ...
        }
    

    因为在第一次中我们已经在CacheState中保存了所有发射的对象。以我们当前的demo来看,state中保存了Test1,Test2,Test33个字符串,然后就直接调用SafeSubsciber的onNext方法。

    总结

    总的来说,cache操作符与前2个操作符比较起来的话,在源码上其实看起来会更有难度,但是有了前2个操作符做铺垫,再来看,相对来说会比较容易。

    附加

    整体cache的流程,其实我们已经有了大致的了解,但是刚才其实我们还是遗留了一个问题。我们再次来看这段代码。

    void dispatch() {
                ReplayProducer<?>[] a = producers;
                for (ReplayProducer<?> rp : a) {
                    rp.replay();
                }
            }
    

    这里为什么要用了一个for循环呢?

    image.png

    其实我们从上面的图片中可以看出。重复输出了3次Test1,Test2,Test3
    因为,我在demo中连续点击了3下SUBSCRIBE按钮。
    也就是调用了3次subscribe,也就是往CacheState中添加了
    3个ReplayProducer,所以上面调用dispatch方法的时候,自然就for循环调用了3次replay,然后就输出了3次。


    大家喜欢的话就点个赞哦,给我点鼓励嘛

    相关文章

      网友评论

        本文标题:我是用来缓存的操作符-----Cache

        本文链接:https://www.haomeiwen.com/subject/kiqclftx.html