我是用来缓存的操作符-----Cache

作者: javalong | 来源:发表于2018-04-22 16:47 被阅读26次
Cache操作符功能介绍

缓存前面操作通过onNext所传递下来的参数,在下一次subscribe订阅时,直接跳过前面的步骤,直接执行后面步骤。

用途

单纯用文字介绍Cache的功能,大家可能很难理解。大家可以想象下,比如http请求,我的前面操作是有延时的,如果没有用cache,每次subscribe都会去真正发起http�请求。但是如果在尾部添加cache操作符的话,那么下一次subscribe的时候,其实就是直接从缓存中获取,而不会发起真正的http请求。

来一段小代码
  var handler = Handler()
        //延时后 cache的作用会更明显
//        observable = Observable.create(object : Observable.OnSubscribe<String> {
//            override fun call(t: Subscriber<in String>) {
//                t.onNext("Test1")
//                t.onNext("Test2")
//                t.onNext("Test3")
//            }
//        }).delay(4, TimeUnit.SECONDS).cache()
        observable = Observable.create(object : Observable.OnSubscribe<String> {
            override fun call(t: Subscriber<in String>) {
                t.onNext("Test1")
                t.onNext("Test2")
                t.onNext("Test3")
            }
        }).cache()

        btSub.setOnClickListener({
            observable?.subscribe({ msg ->
                handler.post(Runnable {
                    tvContent.text = tvContent.text.toString() + "\n" + msg

                })

            })
        })

这里有一段注释的代码,是使用了delay,使用了delay之后cache的作用会更加明显,第一次点击按钮触发subscribe订阅后,会延迟4秒后,在界面上Test1,Test2,Test3,第二次,第三次...界面上都会立刻打印Test1,Test2,Test3,但是由于我还没有为大家介绍过delay操作符,所以我这里也先不过多介绍,在以后的文章中我们会为大家进行delay的源码分析。
有了前面这么一大串的介绍后,我想大家对cache操作符有了一定的了解,那么我们就带着自己心中的疑问和猜想,按照demo的顺序一步步跟入源码。


看看源代码

Observable

public final Observable<T> cache() {
        return CachedObservable.from(this);
    }

CachedObservable

public static <T> CachedObservable<T> from(Observable<? extends T> source) {
        return (CachedObservable<T>)from(source, 16);
    }
public static <T> CachedObservable<T> from(Observable<? extends T> source, int capacityHint) {
        if (capacityHint < 1) {
            throw new IllegalArgumentException("capacityHint > 0 required");
        }
        CacheState<T> state = new CacheState<T>(source, capacityHint);
        CachedSubscribe<T> onSubscribe = new CachedSubscribe<T>(state);
        return new CachedObservable<T>(onSubscribe, state);
    }

个人理解:因为前面其实已经介绍到了,cache其实是把前面传递下来的参数,保存起来了,那么肯定是要有个数组或者列表的,这里我看到capacityHint,我就认为CacheState应该有可能是这个容器。
这倒不是最重要的,在Observable的子类中,其实我们最重要看的就是2个东西OnSubscribe,Subscriber明白了这2个东西,基本上对这个操作符就了解的差不多了

CachedObservable.CacheState

static final class CacheState<T> extends LinkedArrayList implements Observer<T> {
...
}

CachedObservable.CachedSubscribe

static final class CachedSubscribe<T> extends AtomicBoolean implements OnSubscribe<T> {
  ...
}

OnSubscribe很明显,就是CachedSubscribe
那么CacheState是不是Subscriber?,在这里CacheState实现了Observer接口
Observer

public interface Observer<T> {    
   void onCompleted();   
   void onError(Throwable e);    
   void onNext(T t);
}

前面我们几篇文章都介绍了,Subscriber其实就是onNext..等方法的具体实现。所以我们可以直接把CacheState当作是Subscriber来看待,这样会很好的帮助我们理解。

 public abstract class Subscriber<T> implements Observer<T>, Subscription {
    ...
}

个人理解:Subscriber和Observer关系
Subscriber实现了Observer接口,Subscriber可以subscribe,unsubscribe,还有onStart方法。
在RxJava操作符中,其实我们关心的是SubscrberonNext方法和OnSubscribecall方法,能搞定这2个方法,那么我们就对这个操作符有了很深的了解了。

既然我们已经找到了2个重要的对象,那么我们继续往下深入。
Observable.cache调用后得到了一个CachedObservable,然后点击按钮调用subscribe
Observable

...
 public final Subscription subscribe(final Action1<? super T> onNext) {
        if (onNext == null) {
            throw new IllegalArgumentException("onNext can not be null");
        }

        Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
        Action0 onCompleted = Actions.empty();
        return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
    }
...
public final Subscription subscribe(Subscriber<? super T> subscriber) {
        return Observable.subscribe(subscriber, this);
    }
...
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
        subscriber.onStart();
        if (!(subscriber instanceof SafeSubscriber)) {
            subscriber = new SafeSubscriber<T>(subscriber);
        }
            RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
           ...

这里我贴出这块代码,其实在前面几篇文章我都贴过了,这里重新贴出希望大家加强记忆。调用subscribe方法,其实就是obsevable.onSubscibe.call(subscriber)
关键的地方来了,这里的obsevable,onSubscribe,subscriber分别是什么呢?
链式结构中,操作符使用的越多,到最后会越难判断,这里我们只有一个操作符,相对来说比较简单
这里的observable很明显就是CachedObservableobsevable.onSubscibe自然就是刚才我们分析得到的CachedSubscribe,如果你想当然的以为subscriber就是刚才的CacheState那就错了。
那么subscriber到底是什么呢?
我们从刚才的代码一步步往上推,其实就明白。
subscriber是一个参数由外部传入的,

...
 if (!(subscriber instanceof SafeSubscriber)) {
            subscriber = new SafeSubscriber<T>(subscriber);
        }
...

其实已经告诉我们,最终的subscriber其实是一个SafeSubscriber。而原来的subscriber当作构造参数保存在了SafeSubscriber中。

再往前看,看原来的subscriber是什么。

...
public final Subscription subscribe(final Action1<? super T> onNext) {
       ...
        return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
    }
...

很明显的就是,我们在demo中传入的Action被包装成了ActionSubscriber

看图说话

image.png

所以最终是这么一个subscriber

那么我们继续深入。

observable.onSubscriber.call(subscriber)

static final class CachedSubscribe<T> extends AtomicBoolean implements OnSubscribe<T> {
 ...
        @Override
        public void call(Subscriber<? super T> t) {
            ReplayProducer<T> rp = new ReplayProducer<T>(t, state);
            state.addProducer(rp);
            t.add(rp);
            t.setProducer(rp);
            if (!get() && compareAndSet(false, true)) {
                state.connect();
            }
        }
    }

我们一句一句来分析。

  1. 创建ReplayProducer对象
   static final class ReplayProducer<T> extends AtomicLong implements Producer, Subscription {
       ...
        public ReplayProducer(Subscriber<? super T> child, CacheState<T> state) {
            this.child = child;
            this.state = state;
        }
        @Override
        public void request(long n) {
        ...
        }
...

ReplayProducer是一个Producer,前面介绍过Producer,我们只需要看它request方法就好了。我们先等下看它的request

  1. ReplayProducer添加到CacheState对象中
 public void addProducer(ReplayProducer<T> p) {
            synchronized (connection) {
                ReplayProducer<?>[] a = producers;
                int n = a.length;
                ReplayProducer<?>[] b = new ReplayProducer<?>[n + 1];
                System.arraycopy(a, 0, b, 0, n);
                b[n] = p;
                producers = b;
            }
        }

这里用的是数组的形式,来添加一个新元素,比较简单,就不过多介绍了。

  1. ReplyProducer当作SubscriptionSubscriber绑定在一起。
public final void add(Subscription s) {
        subscriptions.add(s);
    }
...
@Override
    public final void unsubscribe() {
        subscriptions.unsubscribe();
    }
...

可以理解为,当subscriber.unsubscribe,对应的Producer也取消了订阅。这种操作在前面几个操作符中也有出现。

  1. 调用Producerrequest方法
    setProducer的操作呢,我们在前面也介绍过,就是直接调用Producer的request方法

注意:由于我们现在只使用一个操作符,比较简单,先这么理解,后续我们讲多个操作符混合使用就可能会出现比较复杂的情况,由浅入深,慢慢理解。

...
public void request(long n) {
            for (;;) {
                long r = get();
                if (r < 0) {
                    return;
                }
                long u = r + n;
                if (u < 0) {
                    u = Long.MAX_VALUE;
                }
                if (compareAndSet(r, u)) {
                    replay();
                    return;
                }
            }
        }
 public void replay() {
            synchronized (this) {
                if (emitting) {
                    missed = true;
                    return;
                }
                emitting = true;
            }
            boolean skipFinal = false;
            try {
                final Subscriber<? super T> child = this.child;

                for (;;) {

                    long r = get();

                    if (r < 0L) {
                        skipFinal = true;
                        return;
                    }
                    int s = state.size();
                    if (s != 0) {
                      //这段代码当读提出来分析
                        ...
  
                    }
                    synchronized (this) {
                        if (!missed) {
                            emitting = false;
                            skipFinal = true;
                            return;
                        }
                        missed = false;
                    }
                }
            } finally {
                if (!skipFinal) {
                    synchronized (this) {
                        emitting = false;
                    }
                }
            }
        }
    }
...

request方法中呢,其实主要调用了replay方法。
里面最重要的一句是

int s = state.size();
if (s != 0) {
...
}

由于第一次调用subscribe所以根本没有缓存东西,所以state.size()==0,所以第一调用replay方法基本上没有做什么操作。

  1. 第一次调用subscribe执行
 if (!get() && compareAndSet(false, true)) {
                state.connect();
            }

我们先来看
!get() && compareAndSet(false, true),由于CachedSubscribe<T> extends AtomicBoolean

第一次进来get()==false,然后再设置为true,确保了下一次不调用state.connect

下面我们具体分析下connect方法,最重要的缓存操作在这里

...
public void connect() {
            Subscriber<T> subscriber = new Subscriber<T>() {
                @Override
                public void onNext(T t) {
                    CacheState.this.onNext(t);
                }
                @Override
                public void onError(Throwable e) {
                    CacheState.this.onError(e);
                }
                @Override
                public void onCompleted() {
                    CacheState.this.onCompleted();
                }
            };
            connection.set(subscriber);
            source.unsafeSubscribe(subscriber);
            isConnected = true;
        }
...

这里呢,我们还是分步骤一步步来解析

  1. 创建一个Subscriber
    因为在前面,其实我们本身对cache的功能已经有所了解,就是在第一次以后,再调用subscribe其实就是直接把缓存数据拿过来直接传递下去。

那么在这里,其实我们就可以看出,这里新建的Subscriber就是用来直接传递下去的Subscriber。这里其实我们可以画个大致的示意图。

看图说话

image.png

第一次subscribe走上面实现,第二次subscribe直接走下面虚线,而中间的这个Subscriber就是我们现在所提到的这个Subscriber。第二次以后就直接越过了demo中的OnSubscribe

注意: 这里没有涉及到demo中的Subscriber,通过前面的分析我们知道demo中的Subscriber已经被包装成了SafeSubscriber保存在ReplyProducer中的child变量里

  1. 保存subscriber
    既然前面提到了,第一次之后的每次操作都会使用到这个subscriber,那么我们肯定要把这个subscriber保存起来。
 public void set(Subscription s) {
      ...
        state.update(s);
    }

SequentialSubscription

...
public boolean update(Subscription next) {
        for (;;) {
            Subscription current = get();

            if (current == Unsubscribed.INSTANCE) {
                if (next != null) {
                    next.unsubscribe();
                }
                return false;
            }

            if (compareAndSet(current, next)) {
                if (current != null) {
                    current.unsubscribe();
                }
                return true;
            }
        }
    }
...

如我们所料,确实是把这个subscriber保存起来了。
compareAndSet(current, next)

  1. 第一次subscribe,还是要调用前面的onSubscribe
source.unsafeSubscribe(subscriber);

其实就是

source.onSubscribe.call(subscriber);

source我们可以通过源码知道,就是创建CachedObservable时传入的我们demoObservable.create()得到的Observable对象。
onSubscribe就是demo中的OnSubscribe
下面我们还是重点,放在subscriber上。

这里的subscriber其实我们根据上面的代码很容易判断出就是刚刚new 出来的Subscriber,并且在onNext其实是直接调用CacheStateonNext

CachedObservable.CacheState

...
public void onNext(T t) {
            if (!sourceDone) {
                Object o = NotificationLite.next(t);
                add(o);
                dispatch();
            }
        }
...

当我们在demo中OnSubscribe调用一次subscriber.onNext,其实就是进入CachedObservable.CacheStateonNext

在这里我们再次一句一句分析。

  1. NotificationLite.next(t)
public static <T> Object next(T t) {
        if (t == null) {
            return ON_NEXT_NULL_SENTINEL;
        } else {
            return t;
        }
    }

很明显就是判断参数是否为null。

  1. add(o)

前面我们就提到了CacheState用来保存上面传递下来的参数.
具体实现就是在这里,add(o)。

  1. dispatch()
 void dispatch() {
            ReplayProducer<?>[] a = producers;
            for (ReplayProducer<?> rp : a) {
                rp.replay();
            }
        }

很明显这里最重要的就是rp.replay()
我们先不管为什么这里为什么用了个for循环,我们先来看看rp.replay到底做了什么。

 public void replay() {
...
  int s = state.size();
  if (s != 0) {
      ...
    if (NotificationLite.accept(child, o)) {
      ...
    }
  }
...
}

NotificationLite.accept

public static <T> boolean accept(Observer<? super T> o, Object n) {
         ...
            o.onNext((T) n);
            return false;
         ...
    }

reply就是判断CacheState的缓存是否已经有了,有了之后就直接调用child.onNext(o)

那么child是什么呢?就是前面所分析的

image.png

这样在一层一层的调用onNext最终到了我们自己的Action

第二次调用subscribe

第一次调用subscribe的整个流程其实我们已经走完,下面我们直接简单的走一下第二次调用subscribe的流程。

直接从CachedSubscribecall方法看起。

 public void call(Subscriber<? super T> t) {
            ReplayProducer<T> rp = new ReplayProducer<T>(t, state);
            state.addProducer(rp);
            t.add(rp);
            t.setProducer(rp);
            if (!get() && compareAndSet(false, true)) {
                state.connect();
            }
        }
  1. 又创建了一个ReplayProducer
  2. 添加到CacheState
  3. ReplayProducer加入到SafeSubscriber中绑定在一起
  4. 调用Producerrequest方法,在这里其实就是调用ReplayProducerreplay方法(前面分析过了)
  5. 因为第一次的时候,已经设置为true所以get()==true直接跳过

所以综上分析,我们还是直接看ReplayProducerreplay方法。

 public void replay() {
...
  int s = state.size();
  if (s != 0) {
      ...
    if (NotificationLite.accept(child, o)) {
      ...
    }
  }
...
}

NotificationLite.accept

public static <T> boolean accept(Observer<? super T> o, Object n) {
         ...
            o.onNext((T) n);
            return false;
         ...
    }

因为在第一次中我们已经在CacheState中保存了所有发射的对象。以我们当前的demo来看,state中保存了Test1,Test2,Test33个字符串,然后就直接调用SafeSubsciber的onNext方法。

总结

总的来说,cache操作符与前2个操作符比较起来的话,在源码上其实看起来会更有难度,但是有了前2个操作符做铺垫,再来看,相对来说会比较容易。

附加

整体cache的流程,其实我们已经有了大致的了解,但是刚才其实我们还是遗留了一个问题。我们再次来看这段代码。

void dispatch() {
            ReplayProducer<?>[] a = producers;
            for (ReplayProducer<?> rp : a) {
                rp.replay();
            }
        }

这里为什么要用了一个for循环呢?

image.png

其实我们从上面的图片中可以看出。重复输出了3次Test1,Test2,Test3
因为,我在demo中连续点击了3下SUBSCRIBE按钮。
也就是调用了3次subscribe,也就是往CacheState中添加了
3个ReplayProducer,所以上面调用dispatch方法的时候,自然就for循环调用了3次replay,然后就输出了3次。


大家喜欢的话就点个赞哦,给我点鼓励嘛

相关文章

  • 我是用来缓存的操作符-----Cache

    Cache操作符功能介绍 缓存前面操作通过onNext所传递下来的参数,在下一次subscribe订阅时,直接跳过...

  • 类(三)-- cache_t分析

    类(一)-- 底层探索类(二)-- method归属类(三)-- cache_t分析 cache_t作用 用来缓存...

  • ecjia_cache函数使用

    通过ecjia_cache函数获取缓存实例 调用缓存方法 get 方法可以用来取出缓存中的项目,缓存不存在的话返回...

  • 方法缓存Cache_t分析

    缓存Cache_t bucket_t cache_t 缓存的是 方法 method_t: 缓存流程 缓存的入口是:...

  • iOS原理探索06--cache_t分析

    概要 前面文章我们分析了isa、bits,本文主要分析一下cache_t和类的关系。我们知道cache是用来缓存指...

  • WEB缓存初探

    WEB缓存初探 概念理解 缓存——缓存就是数据交换的缓冲区(称作Cache) 缓存 的作用说白了就是用来就近获取东...

  • WebView

    WebView是一个用来展示web页面的控件。 1.WebSetting 缓存模式: LOAD_CACHE_ONL...

  • Elasticsearch使用过滤器优化查询

    elasticsearch提供了一种特殊的缓存,即过滤器缓存(filter cache),用来储存过滤器的结果 欢...

  • HBase读流程

    Meta Cache是客户端缓存元数据的Block Cache是读缓存,缓存实际数据1)Clinet先访问Zook...

  • Redis的5个常见使用场景

    会话缓存(Session Cache) 最常用的一种使用Redis的情景是会话缓存(session cache)。...

网友评论

    本文标题:我是用来缓存的操作符-----Cache

    本文链接:https://www.haomeiwen.com/subject/kiqclftx.html