Amb操作符功能介绍
参数传入多个Observable
,哪个Observable
中的OnSubscribe
执行的快就执行就执行哪个,而其他Observable
都被取消
用途
这个操作符,其实我不常用,但是用途大家可以自己想象下。
有多个异步操作,你也不知道哪个会先执行完毕,你只想在某一个操作执行完毕后,执行一次某个方法,那么这个操作符就刚好适用。
来一段小代码
...
observable = Observable.amb(Observable.create(object : Observable.OnSubscribe<String> {
override fun call(t: Subscriber<in String>) {
t.onNext("Test1")
}
}), Observable.create(object : Observable.OnSubscribe<String> {
override fun call(t: Subscriber<in String>) {
t.onNext("Test2")
}
}))
...
observable?.subscribe({ msg ->
tvContent.text = tvContent.text.toString() + "\n" + msg
})
这里我们创建了2个Observable
传入amb
操作符。
然后subscribe
订阅消息。
demo很简单,就不过多介绍了,下面直接看源码分析。
看看源代码
Observable
public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2) {
return unsafeCreate(OnSubscribeAmb.amb(o1, o2));
}
public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
如果大家看过上一篇文章,会发现其实Observable.create
和Observable.unsafeCreate
是一致的,没有区别。
Observable
...
public static <T> Observable<T> create(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
...
public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
return new Observable<T>(RxJavaHooks.onCreate(f));
}
...
从这几行代码中,我们可以看出,最重要的其实是这一行
OnSubscribeAmb.amb(o1, o2)
先不看源码,我们可以大致的了解,这句话的作用就是创建一个 OnSubscribe
对象,根据上一篇文章的介绍,其实可以了解到我们最重要的就是看2个东西OnSubscribe
的call
,还有就是Subscriber
对象的onNext
,onComplete
...等方法
下面我们继续深入
OnSubscribeAmb
...
public static <T> OnSubscribe<T> amb(Observable<? extends T> o1, Observable<? extends T> o2) {
List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
sources.add(o1);
sources.add(o2);
return amb(sources);
}
...
public static <T> OnSubscribe<T> amb(final Iterable<? extends Observable<? extends T>> sources) {
return new OnSubscribeAmb<T>(sources);
}
...
public void call(final Subscriber<? super T> subscriber) {
final Selection<T> selection = new Selection<T>();
subscriber.add(Subscriptions.create(new Action0() {
@Override
public void call() {
AmbSubscriber<T> c;
if ((c = selection.get()) != null) {
c.unsubscribe();
}
unsubscribeAmbSubscribers(selection.ambSubscribers);
}
}));
for (Observable<? extends T> source : sources) {
if (subscriber.isUnsubscribed()) {
break;
}
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(0, subscriber, selection);
selection.ambSubscribers.add(ambSubscriber);
AmbSubscriber<T> c;
if ((c = selection.get()) != null) {
// Already chose one, the rest can be skipped and we can clean up
selection.unsubscribeOthers(c);
return;
}
source.unsafeSubscribe(ambSubscriber);
}
// while subscribing unsubscription may have occurred so we clean up after
if (subscriber.isUnsubscribed()) {
unsubscribeAmbSubscribers(selection.ambSubscribers);
}
...
}
这段代码有点长,最主要的当然是call
方法,其中amb
重载方法中,主要是new 了就是new了一个OnSubscribe
没什么好说的。
最终呢还是会执行call
,所以我们来重点分析下call
方法。
这里我又要提出一个个人的阅读源码的方法,如果已经了解了这个方法或者这个类的功能,那么就尝试着想象自己大概会怎么去实现。
比如这里,如果是我,我会这么去实现,遍历我的List< Observable >,然后一个个调用subscribe
去运行,当第一个OnSubscribe
调用完call
后,把其他Observable
都取消。
下面我们继续深入,call方法中有个Selection
,我们来看下具体是什么东西
OnSubscribeAmb
static final class Selection<T> extends AtomicReference<AmbSubscriber<T>> {
final Collection<AmbSubscriber<T>> ambSubscribers = new ConcurrentLinkedQueue<AmbSubscriber<T>>();
public void unsubscribeLosers() {
AmbSubscriber<T> winner = get();
if (winner != null) {
unsubscribeOthers(winner);
}
}
public void unsubscribeOthers(AmbSubscriber<T> notThis) {
for (AmbSubscriber<T> other : ambSubscribers) {
if (other != notThis) {
other.unsubscribe();
}
}
ambSubscribers.clear();
}
}
前面介绍了了Amb操作符可能使用到的场景就是有很多的异步操作,那么当然也会有多线程的问题,所以这里使用了Atomic**
,这里就不深入去介绍了,有兴趣的自己去百度。
我们可以看下Selection
的主要作用
- 有一个变量
ambSubscribers
,用来存储Subscriber
对象,在这具体的实现是AmbSubscriber
。 - 取消订阅其他的
Subscriber
看完了Selection
,那么我们再回过去看call
方法中的代码。
//第一部分
final Selection<T> selection = new Selection<T>();
//第二部分
subscriber.add(Subscriptions.create...));
//第三部分
for (Observable<? extends T> source : sources) {}
//第四部分
if (subscriber.isUnsubscribed()) {
unsubscribeAmbSubscribers(selection.ambSubscribers);
}
//第五部分
subscriber.setProducer(...)
这样子标明代码后,其实我们只要一个一个去了解它的意思就好了
第一部分只是创建了一个Selection
,就不过多介绍了,下面我们来看第二部分。
第二部分
第二部分呢其实也非常的简单,只是往Subscriber
中添加了一个Subscription
Subscriber
...
public final void add(Subscription s) {
subscriptions.add(s);
}
@Override
public final void unsubscribe() {
subscriptions.unsubscribe();
}
...
Subscriber
中本身保存了一个List<Subscription>
,当这个Subscriber
取消注册的时候,把List<Subscription>
中的所有的Subscription
取消。
第三部分
这一部分,其实跟我前面所做的猜想是一致的。所以大家有时间的时候不妨也试一试,也是很锻炼的。
我们来详细的看下第三部分的代码。
for (Observable<? extends T> source : sources) {
if (subscriber.isUnsubscribed()) {
break;
}
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(0, subscriber, selection);
selection.ambSubscribers.add(ambSubscriber);
AmbSubscriber<T> c;
if ((c = selection.get()) != null) {
selection.unsubscribeOthers(c);
return;
}
source.unsafeSubscribe(ambSubscriber);
}
源代码中本身有很多注释,为了节省空间,我把注释都去掉了,
在这段代码中,最重要的一句是
AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(0, subscriber, selection);
前面我一直在强调,其实源码的阅读,最重要的是看OnSubscribe
和Subsciber
的子类。源代码中经常会在原来的OnSubscribe
和Subsciber
给你外面套上一层,你要是没仔细看的话,就不能完全看懂。
所以这里也是,使用了AmbSubscriber
给原来的Subsciber
包了一层,所以当我们subscribe
订阅消息的时候,是先调用了AmbSubscriber
中的onNext
等方法,然后才会调用了我们自己的Subscriber
。
下面一句是
if ((c = selection.get()) != null) {
selection.unsubscribeOthers(c);
return;
}
selection.get()
出来的是AmbSubscriber
对象,如果这个对象不为空,就说明,已经有一个OnSubscibe
执行完成了,这其实是我的一个结论,就前面的代码我们还无法分析出这个结论,但是这里我先把结论给出,在后面我们可以再回过来看。会更加明白。
最后一句
source.unsafeSubscribe(ambSubscriber);
source其实就是我们调用amb
操作符的时候传入的Observable
。调用用了unsafeSubscribe
方法。
Observable
public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
try {
subscriber.onStart();
RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
return RxJavaHooks.onObservableReturn(subscriber);
...
这个方法跟前面一篇文章提到的subscribe
是很像的,区别就在于没有另外再把参数Subscriber
包一层SafeSubscriber
。
所以最后也是一样的调用的是
observable. onSubscribe.call(subscriber)
而这里的subscriber
其实是AmbSubscriber
,onSubscribe
其实就是我们在demo中传入的OnSubscribe
对象。下面我们再来看下demo的代码
...
observable = Observable.amb(Observable.create(object : Observable.OnSubscribe<String> {
override fun call(t: Subscriber<in String>) {
t.onNext("Test1")
}
}), Observable.create(object : Observable.OnSubscribe<String> {
override fun call(t: Subscriber<in String>) {
t.onNext("Test2")
}
}))
...
demo中传入的OnSubscribe
对象中的call
方法写的很简单,直接调用Subscriber
中的onNext
方法。也就是AmbSubscriber
的onNext
方法
AmbSubscriber
...
public void onNext(T t) {
if (isSelected()) {
subscriber.onNext(t);
}
}
...
private boolean isSelected() {
if (chosen) {
return true;
}
if (selection.get() == this) {
// fast-path
chosen = true;
return true;
} else {
if (selection.compareAndSet(null, this)) {
selection.unsubscribeOthers(this);
chosen = true;
return true;
} else {
// we lost so unsubscribe ... and force cleanup again due to possible race conditions
selection.unsubscribeLosers();
return false;
}
}
}
}
...
AmbSubscriber
代码写的非常的简单,我这里简单做个翻译。
先判断是否被选中了,如果被选中了,就直接执行Subscriber
的onNext
,这里的Subscriber
也就是我们自己demo中写的onNext
的Subscriber
(其实是被源码包了一层的ActionSubscriber
,这不是重点)。
最重要的代码是这一段
if (selection.compareAndSet(null, this)) {
selection.unsubscribeOthers(this);
chosen = true;
return true;
}
selection.get()获取的AmbSubscriber
不是this,不是的话,就selection.compareAndSet(null, this)
,前面介绍了Selection
其实是一个Atomic**
的对象,这种对象有个常用的方法就是compareAndSet(null, this)
,意思就是如果selection
的值null就把this
设置为它的值,如果为null
返回true
否者返回false
。
所以这一段代码其实意思很好理解,第一个AmbSubscriber
调用了onNext
方法,就会把selection
的值设置为this,下一个AmbSubscriber
再来执行的话就晚了。并且在第一个AmbSubscriber
把selection
的值设置为this之后,还会对其他AmbSubscriber
取消订阅。
第四部分
直接可以看出,如果被取消订阅的话,就直接把就List< AmbSubscriber >
中所有的Subscriber
取消订阅,然后清空。这一步主要是防止用户可能会取消订阅。
第五部分
这几部分其实第三部分是最重要的,没有4,5部分,我们也对Amb
操作符有了深入的理解了。但是第五部分对理解RxJava
源码本身,有重要的作用。
OnSubscribeAmb
subscriber.setProducer(new Producer() {
@Override
public void request(long n) {
...
}
})
Subscriber
public void setProducer(Producer p) {
...
if (toRequest == NOT_SET) {
producer.request(Long.MAX_VALUE);
} else {
producer.request(toRequest);
}
}
}
其他不需要看,我只看到了producer.request
,也就是说,其实setProducer就是直接调用了request
方法,当然本身没那么简单,但是如果我们是第一次去理解的话,就按最简单的去理解,由浅入深,是很好的一种学习方式。
那么我们回过去再看
OnSubscribeAmb
subscriber.setProducer(new Producer() {
@Override
public void request(long n) {
AmbSubscriber<T> c;
if ((c = selection.get()) != null) {
// propagate the request to that single Subscriber that won
c.requestMore(n);
} else {
//propagate the request to all the amb subscribers
for (AmbSubscriber<T> ambSubscriber: selection.ambSubscribers) {
if (!ambSubscriber.isUnsubscribed()) {
// make a best endeavours check to not waste requests
// if first emission has already occurred
if (selection.get() == ambSubscriber) {
ambSubscriber.requestMore(n);
// don't need to request from other subscribers because choice has been made
// and request has gone to choice
return;
} else {
ambSubscriber.requestMore(n);
}
}
}
}
}
});
AmbSubscriber
...
private void requestMore(long n) {
request(n);
}
...
可以看到AmbSubscriber
的requestMore
就是去调用父类Subscriber
的request
方法。
OnSubscribe
...
protected final void request(long n) {
if (n < 0) {
throw new IllegalArgumentException("number requested cannot be negative: " + n);
}
// if producer is set then we will request from it
// otherwise we increase the requested count by n
Producer producerToRequestFrom;
synchronized (this) {
if (producer != null) {
producerToRequestFrom = producer;
} else {
addToRequested(n);
return;
}
}
// after releasing lock (we should not make requests holding a lock)
producerToRequestFrom.request(n);
}
...
Subscriber
的request
方法呢 其实主要就做1件事情,
- addToRequested
private void addToRequested(long n) {
if (requested == NOT_SET) {
requested = n;
} else {
final long total = requested + n;
// check if overflow occurred
if (total < 0) {
requested = Long.MAX_VALUE;
} else {
requested = total;
}
}
}
其实就是记录了下总的请求数
- producerToRequestFrom.request(n);
由于我们现在demo很简单,只是单个操作符,所以如果跟入代码的话,可能就不会运行到这里,producerToRequestFrom大家可以看出来其实也是一个 Producer
。这就我们就先不深入,先留个大概的印象,RxJava
的源码有很多的这样的嵌套,OnSubscribe
再包一层OnSubscribe
,Subscriber
再包一层Subscriber
。
总结
其实总体下来,已经把Amb
操作符的源码过了一遍,但是很可能大家还存在一些问题。比如我自己,我刚开始在看的时候就发现了一个问题。就是第五部分代码执行完毕后会怎么样呢?
Amb
操作符在前面介绍了,很可能是在异步的情况下操作,也就是说当call
方法都执行完毕了,AmbSubscriber
的onNext
方法还是没有被执行,通过前面的代码我们可以看到,只有当第一个的AmbSubscriber
的onNext
方法执行后,其他AmbSubscriber
才会被取消订阅。所以如果是异步操作的话,多个AmbSubscriber
其实是一直存在于内存等待运行的。
所以在这里其实我们很容易看出,为什么我们最好要手动的去调用unsubscribe
方法,因为在异步的情况下,是很容易造成内存泄漏的。
由于这篇文章有很多地方引用了上一篇文章的内容,如果没有看过上一篇文章的话,请先阅读下上一篇文章,这样能更好理解
网友评论