Cache操作符功能介绍
缓存前面操作通过onNext
所传递下来的参数,在下一次subscribe
订阅时,直接跳过前面的步骤,直接执行后面步骤。
用途
单纯用文字介绍Cache的功能,大家可能很难理解。大家可以想象下,比如http请求,我的前面操作是有延时的,如果没有用cache
,每次subscribe
都会去真正发起http
�请求。但是如果在尾部添加cache
操作符的话,那么下一次subscribe
的时候,其实就是直接从缓存中获取,而不会发起真正的http
请求。
来一段小代码
var handler = Handler()
//延时后 cache的作用会更明显
// observable = Observable.create(object : Observable.OnSubscribe<String> {
// override fun call(t: Subscriber<in String>) {
// t.onNext("Test1")
// t.onNext("Test2")
// t.onNext("Test3")
// }
// }).delay(4, TimeUnit.SECONDS).cache()
observable = Observable.create(object : Observable.OnSubscribe<String> {
override fun call(t: Subscriber<in String>) {
t.onNext("Test1")
t.onNext("Test2")
t.onNext("Test3")
}
}).cache()
btSub.setOnClickListener({
observable?.subscribe({ msg ->
handler.post(Runnable {
tvContent.text = tvContent.text.toString() + "\n" + msg
})
})
})
这里有一段注释的代码,是使用了delay
,使用了delay
之后cache
的作用会更加明显,第一次点击按钮触发subscribe
订阅后,会延迟4秒后,在界面上Test1,Test2,Test3
,第二次,第三次...界面上都会立刻打印Test1,Test2,Test3
,但是由于我还没有为大家介绍过delay
操作符,所以我这里也先不过多介绍,在以后的文章中我们会为大家进行delay
的源码分析。
有了前面这么一大串的介绍后,我想大家对cache
操作符有了一定的了解,那么我们就带着自己心中的疑问和猜想,按照demo的顺序一步步跟入源码。
看看源代码
Observable
public final Observable<T> cache() {
return CachedObservable.from(this);
}
CachedObservable
public static <T> CachedObservable<T> from(Observable<? extends T> source) {
return (CachedObservable<T>)from(source, 16);
}
public static <T> CachedObservable<T> from(Observable<? extends T> source, int capacityHint) {
if (capacityHint < 1) {
throw new IllegalArgumentException("capacityHint > 0 required");
}
CacheState<T> state = new CacheState<T>(source, capacityHint);
CachedSubscribe<T> onSubscribe = new CachedSubscribe<T>(state);
return new CachedObservable<T>(onSubscribe, state);
}
个人理解:因为前面其实已经介绍到了,cache其实是把前面传递下来的参数,保存起来了,那么肯定是要有个数组或者列表的,这里我看到
capacityHint
,我就认为CacheState
应该有可能是这个容器。
这倒不是最重要的,在Observable
的子类中,其实我们最重要看的就是2个东西OnSubscribe,Subscriber明白了这2个东西,基本上对这个操作符就了解的差不多了
CachedObservable.CacheState
static final class CacheState<T> extends LinkedArrayList implements Observer<T> {
...
}
CachedObservable.CachedSubscribe
static final class CachedSubscribe<T> extends AtomicBoolean implements OnSubscribe<T> {
...
}
OnSubscribe很明显,就是CachedSubscribe
。
那么CacheState
是不是Subscriber
?,在这里CacheState
实现了Observer
接口
Observer
public interface Observer<T> {
void onCompleted();
void onError(Throwable e);
void onNext(T t);
}
前面我们几篇文章都介绍了,Subscriber
其实就是onNext..
等方法的具体实现。所以我们可以直接把CacheState
当作是Subscriber
来看待,这样会很好的帮助我们理解。
public abstract class Subscriber<T> implements Observer<T>, Subscription {
...
}
个人理解:Subscriber和Observer关系
Subscriber实现了Observer接口,Subscriber可以subscribe
,unsubscribe
,还有onStart
方法。
在RxJava操作符中,其实我们关心的是Subscrber
的onNext
方法和OnSubscribe
的call
方法,能搞定这2个方法,那么我们就对这个操作符有了很深的了解了。
既然我们已经找到了2个重要的对象,那么我们继续往下深入。
Observable.cache调用后得到了一个CachedObservable
,然后点击按钮调用subscribe
Observable
...
public final Subscription subscribe(final Action1<? super T> onNext) {
if (onNext == null) {
throw new IllegalArgumentException("onNext can not be null");
}
Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
Action0 onCompleted = Actions.empty();
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}
...
public final Subscription subscribe(Subscriber<? super T> subscriber) {
return Observable.subscribe(subscriber, this);
}
...
static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
...
subscriber.onStart();
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
...
这里我贴出这块代码,其实在前面几篇文章我都贴过了,这里重新贴出希望大家加强记忆。调用subscribe
方法,其实就是obsevable.onSubscibe.call(subscriber)
关键的地方来了,这里的obsevable,onSubscribe,subscriber
分别是什么呢?
链式结构中,操作符使用的越多,到最后会越难判断,这里我们只有一个操作符,相对来说比较简单
这里的observable
很明显就是CachedObservable
而obsevable.onSubscibe
自然就是刚才我们分析得到的CachedSubscribe
,如果你想当然的以为subscriber
就是刚才的CacheState
那就错了。
那么subscriber
到底是什么呢?
我们从刚才的代码一步步往上推,其实就明白。
subscriber
是一个参数由外部传入的,
...
if (!(subscriber instanceof SafeSubscriber)) {
subscriber = new SafeSubscriber<T>(subscriber);
}
...
其实已经告诉我们,最终的subscriber
其实是一个SafeSubscriber
。而原来的subscriber
当作构造参数保存在了SafeSubscriber
中。
再往前看,看原来的subscriber
是什么。
...
public final Subscription subscribe(final Action1<? super T> onNext) {
...
return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
}
...
很明显的就是,我们在demo中传入的Action
被包装成了ActionSubscriber
。
看图说话
所以最终是这么一个subscriber
。
那么我们继续深入。
observable.onSubscriber.call(subscriber)
static final class CachedSubscribe<T> extends AtomicBoolean implements OnSubscribe<T> {
...
@Override
public void call(Subscriber<? super T> t) {
ReplayProducer<T> rp = new ReplayProducer<T>(t, state);
state.addProducer(rp);
t.add(rp);
t.setProducer(rp);
if (!get() && compareAndSet(false, true)) {
state.connect();
}
}
}
我们一句一句来分析。
- 创建
ReplayProducer
对象
static final class ReplayProducer<T> extends AtomicLong implements Producer, Subscription {
...
public ReplayProducer(Subscriber<? super T> child, CacheState<T> state) {
this.child = child;
this.state = state;
}
@Override
public void request(long n) {
...
}
...
ReplayProducer
是一个Producer
,前面介绍过Producer
,我们只需要看它request
方法就好了。我们先等下看它的request
。
- 把
ReplayProducer
添加到CacheState
对象中
public void addProducer(ReplayProducer<T> p) {
synchronized (connection) {
ReplayProducer<?>[] a = producers;
int n = a.length;
ReplayProducer<?>[] b = new ReplayProducer<?>[n + 1];
System.arraycopy(a, 0, b, 0, n);
b[n] = p;
producers = b;
}
}
这里用的是数组的形式,来添加一个新元素,比较简单,就不过多介绍了。
- 把
ReplyProducer
当作Subscription
和Subscriber
绑定在一起。
public final void add(Subscription s) {
subscriptions.add(s);
}
...
@Override
public final void unsubscribe() {
subscriptions.unsubscribe();
}
...
可以理解为,当subscriber.unsubscribe
,对应的Producer
也取消了订阅。这种操作在前面几个操作符中也有出现。
- 调用
Producer
的request
方法
setProducer的操作呢,我们在前面也介绍过,就是直接调用Producer
的request方法
注意:由于我们现在只使用一个操作符,比较简单,先这么理解,后续我们讲多个操作符混合使用就可能会出现比较复杂的情况,由浅入深,慢慢理解。
...
public void request(long n) {
for (;;) {
long r = get();
if (r < 0) {
return;
}
long u = r + n;
if (u < 0) {
u = Long.MAX_VALUE;
}
if (compareAndSet(r, u)) {
replay();
return;
}
}
}
public void replay() {
synchronized (this) {
if (emitting) {
missed = true;
return;
}
emitting = true;
}
boolean skipFinal = false;
try {
final Subscriber<? super T> child = this.child;
for (;;) {
long r = get();
if (r < 0L) {
skipFinal = true;
return;
}
int s = state.size();
if (s != 0) {
//这段代码当读提出来分析
...
}
synchronized (this) {
if (!missed) {
emitting = false;
skipFinal = true;
return;
}
missed = false;
}
}
} finally {
if (!skipFinal) {
synchronized (this) {
emitting = false;
}
}
}
}
}
...
request
方法中呢,其实主要调用了replay
方法。
里面最重要的一句是
int s = state.size();
if (s != 0) {
...
}
由于第一次调用subscribe
所以根本没有缓存东西,所以state.size()==0
,所以第一调用replay
方法基本上没有做什么操作。
- 第一次调用
subscribe
执行
if (!get() && compareAndSet(false, true)) {
state.connect();
}
我们先来看
!get() && compareAndSet(false, true)
,由于CachedSubscribe<T> extends AtomicBoolean
第一次进来get()==false
,然后再设置为true
,确保了下一次不调用state.connect
下面我们具体分析下connect方法,最重要的缓存操作在这里
...
public void connect() {
Subscriber<T> subscriber = new Subscriber<T>() {
@Override
public void onNext(T t) {
CacheState.this.onNext(t);
}
@Override
public void onError(Throwable e) {
CacheState.this.onError(e);
}
@Override
public void onCompleted() {
CacheState.this.onCompleted();
}
};
connection.set(subscriber);
source.unsafeSubscribe(subscriber);
isConnected = true;
}
...
这里呢,我们还是分步骤一步步来解析
- 创建一个
Subscriber
因为在前面,其实我们本身对cache
的功能已经有所了解,就是在第一次以后,再调用subscribe
其实就是直接把缓存数据拿过来直接传递下去。
那么在这里,其实我们就可以看出,这里新建的Subscriber
就是用来直接传递下去的Subscriber
。这里其实我们可以画个大致的示意图。
看图说话
第一次subscribe
走上面实现,第二次subscribe
直接走下面虚线,而中间的这个Subscriber
就是我们现在所提到的这个Subscriber
。第二次以后就直接越过了demo中的OnSubscribe
。
注意: 这里没有涉及到demo中的
Subscriber
,通过前面的分析我们知道demo中的Subscriber
已经被包装成了SafeSubscriber
保存在ReplyProducer
中的child
变量里
- 保存
subscriber
既然前面提到了,第一次之后的每次操作都会使用到这个subscriber
,那么我们肯定要把这个subscriber
保存起来。
public void set(Subscription s) {
...
state.update(s);
}
SequentialSubscription
...
public boolean update(Subscription next) {
for (;;) {
Subscription current = get();
if (current == Unsubscribed.INSTANCE) {
if (next != null) {
next.unsubscribe();
}
return false;
}
if (compareAndSet(current, next)) {
if (current != null) {
current.unsubscribe();
}
return true;
}
}
}
...
如我们所料,确实是把这个subscriber
保存起来了。
compareAndSet(current, next)
。
- 第一次
subscribe
,还是要调用前面的onSubscribe
的
source.unsafeSubscribe(subscriber);
其实就是
source.onSubscribe.call(subscriber);
source我们可以通过源码知道,就是创建CachedObservable
时传入的我们demo
中Observable.create()
得到的Observable
对象。
onSubscribe就是demo
中的OnSubscribe
。
下面我们还是重点,放在subscriber
上。
这里的subscriber
其实我们根据上面的代码很容易判断出就是刚刚new 出来的Subscriber
,并且在onNext
其实是直接调用CacheState
的onNext
。
CachedObservable.CacheState
...
public void onNext(T t) {
if (!sourceDone) {
Object o = NotificationLite.next(t);
add(o);
dispatch();
}
}
...
当我们在demo中OnSubscribe
调用一次subscriber.onNext
,其实就是进入CachedObservable.CacheState
的onNext
。
在这里我们再次一句一句分析。
- NotificationLite.next(t)
public static <T> Object next(T t) {
if (t == null) {
return ON_NEXT_NULL_SENTINEL;
} else {
return t;
}
}
很明显就是判断参数是否为null。
- add(o)
前面我们就提到了CacheState
用来保存上面传递下来的参数.
具体实现就是在这里,add(o)。
- dispatch()
void dispatch() {
ReplayProducer<?>[] a = producers;
for (ReplayProducer<?> rp : a) {
rp.replay();
}
}
很明显这里最重要的就是rp.replay()
我们先不管为什么这里为什么用了个for循环,我们先来看看rp.replay
到底做了什么。
public void replay() {
...
int s = state.size();
if (s != 0) {
...
if (NotificationLite.accept(child, o)) {
...
}
}
...
}
NotificationLite.accept
public static <T> boolean accept(Observer<? super T> o, Object n) {
...
o.onNext((T) n);
return false;
...
}
reply就是判断CacheState
的缓存是否已经有了,有了之后就直接调用child.onNext(o)
那么child
是什么呢?就是前面所分析的
这样在一层一层的调用onNext
最终到了我们自己的Action
。
第二次调用subscribe
第一次调用subscribe
的整个流程其实我们已经走完,下面我们直接简单的走一下第二次调用subscribe
的流程。
直接从CachedSubscribe
的call
方法看起。
public void call(Subscriber<? super T> t) {
ReplayProducer<T> rp = new ReplayProducer<T>(t, state);
state.addProducer(rp);
t.add(rp);
t.setProducer(rp);
if (!get() && compareAndSet(false, true)) {
state.connect();
}
}
- 又创建了一个
ReplayProducer
- 添加到
CacheState
中 - 把
ReplayProducer
加入到SafeSubscriber
中绑定在一起 - 调用
Producer
的request
方法,在这里其实就是调用ReplayProducer
的replay
方法(前面分析过了) - 因为第一次的时候,已经设置为
true
所以get()==true
直接跳过
所以综上分析,我们还是直接看ReplayProducer
的replay
方法。
public void replay() {
...
int s = state.size();
if (s != 0) {
...
if (NotificationLite.accept(child, o)) {
...
}
}
...
}
NotificationLite.accept
public static <T> boolean accept(Observer<? super T> o, Object n) {
...
o.onNext((T) n);
return false;
...
}
因为在第一次中我们已经在CacheState
中保存了所有发射的对象。以我们当前的demo
来看,state
中保存了Test1,Test2,Test3
3个字符串,然后就直接调用SafeSubsciber
的onNext方法。
总结
总的来说,cache
操作符与前2个操作符比较起来的话,在源码上其实看起来会更有难度,但是有了前2个操作符做铺垫,再来看,相对来说会比较容易。
附加
整体cache
的流程,其实我们已经有了大致的了解,但是刚才其实我们还是遗留了一个问题。我们再次来看这段代码。
void dispatch() {
ReplayProducer<?>[] a = producers;
for (ReplayProducer<?> rp : a) {
rp.replay();
}
}
这里为什么要用了一个for
循环呢?
其实我们从上面的图片中可以看出。重复输出了3次Test1,Test2,Test3
。
因为,我在demo中连续点击了3下SUBSCRIBE
按钮。
也就是调用了3次subscribe
,也就是往CacheState
中添加了
3个ReplayProducer
,所以上面调用dispatch
方法的时候,自然就for循环调用了3次replay
,然后就输出了3次。
网友评论