看看谁最快的操作符-----Amb

作者: javalong | 来源:发表于2018-04-15 19:24 被阅读29次
Amb操作符功能介绍

参数传入多个Observable,哪个Observable中的OnSubscribe执行的快就执行就执行哪个,而其他Observable都被取消

用途

这个操作符,其实我不常用,但是用途大家可以自己想象下。
有多个异步操作,你也不知道哪个会先执行完毕,你只想在某一个操作执行完毕后,执行一次某个方法,那么这个操作符就刚好适用。

来一段小代码
...
  observable = Observable.amb(Observable.create(object : Observable.OnSubscribe<String> {
            override fun call(t: Subscriber<in String>) {
                t.onNext("Test1")
            }
        }), Observable.create(object : Observable.OnSubscribe<String> {
            override fun call(t: Subscriber<in String>) {
                t.onNext("Test2")
            }
        }))
...
 observable?.subscribe({ msg ->
            tvContent.text = tvContent.text.toString() + "\n" + msg
        })

这里我们创建了2个Observable传入amb操作符。
然后subscribe订阅消息。
demo很简单,就不过多介绍了,下面直接看源码分析。


看看源代码

Observable

 public static <T> Observable<T> amb(Observable<? extends T> o1, Observable<? extends T> o2) {
        return unsafeCreate(OnSubscribeAmb.amb(o1, o2));
    }

 public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }

如果大家看过上一篇文章,会发现其实Observable.createObservable.unsafeCreate
是一致的,没有区别。
Observable

...
 public static <T> Observable<T> create(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }
...
public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }
...

从这几行代码中,我们可以看出,最重要的其实是这一行

OnSubscribeAmb.amb(o1, o2)

先不看源码,我们可以大致的了解,这句话的作用就是创建一个 OnSubscribe对象,根据上一篇文章的介绍,其实可以了解到我们最重要的就是看2个东西OnSubscribecall,还有就是Subscriber对象的onNext,onComplete...等方法

下面我们继续深入
OnSubscribeAmb

...
public static <T> OnSubscribe<T> amb(Observable<? extends T> o1, Observable<? extends T> o2) {
        List<Observable<? extends T>> sources = new ArrayList<Observable<? extends T>>();
        sources.add(o1);
        sources.add(o2);
        return amb(sources);
    }
...
public static <T> OnSubscribe<T> amb(final Iterable<? extends Observable<? extends T>> sources) {
        return new OnSubscribeAmb<T>(sources);
    }
...
public void call(final Subscriber<? super T> subscriber) {
        final Selection<T> selection = new Selection<T>();
        subscriber.add(Subscriptions.create(new Action0() {

            @Override
            public void call() {
                AmbSubscriber<T> c;
                if ((c = selection.get()) != null) {
                    c.unsubscribe();
                }
                unsubscribeAmbSubscribers(selection.ambSubscribers);
            }

        }));

        for (Observable<? extends T> source : sources) {
            if (subscriber.isUnsubscribed()) {
                break;
            }
            AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(0, subscriber, selection);
            selection.ambSubscribers.add(ambSubscriber);
        
            AmbSubscriber<T> c;
            if ((c = selection.get()) != null) {
                // Already chose one, the rest can be skipped and we can clean up
                selection.unsubscribeOthers(c);
                return;
            }
            source.unsafeSubscribe(ambSubscriber);
        }
        // while subscribing unsubscription may have occurred so we clean up after
        if (subscriber.isUnsubscribed()) {
            unsubscribeAmbSubscribers(selection.ambSubscribers);
        }
      ...
    }

这段代码有点长,最主要的当然是call方法,其中amb重载方法中,主要是new 了就是new了一个OnSubscribe没什么好说的。

最终呢还是会执行call,所以我们来重点分析下call方法。

这里我又要提出一个个人的阅读源码的方法,如果已经了解了这个方法或者这个类的功能,那么就尝试着想象自己大概会怎么去实现。

比如这里,如果是我,我会这么去实现,遍历我的List< Observable >,然后一个个调用subscribe去运行,当第一个OnSubscribe调用完call后,把其他Observable都取消。

下面我们继续深入,call方法中有个Selection,我们来看下具体是什么东西

OnSubscribeAmb

  static final class Selection<T> extends AtomicReference<AmbSubscriber<T>> {
        final Collection<AmbSubscriber<T>> ambSubscribers = new ConcurrentLinkedQueue<AmbSubscriber<T>>();

        public void unsubscribeLosers() {
            AmbSubscriber<T> winner = get();
            if (winner != null) {
                unsubscribeOthers(winner);
            }
        }

        public void unsubscribeOthers(AmbSubscriber<T> notThis) {
            for (AmbSubscriber<T> other : ambSubscribers) {
                if (other != notThis) {
                    other.unsubscribe();
                }
            }
            ambSubscribers.clear();
        }

    }

前面介绍了了Amb操作符可能使用到的场景就是有很多的异步操作,那么当然也会有多线程的问题,所以这里使用了Atomic**,这里就不深入去介绍了,有兴趣的自己去百度。

我们可以看下Selection的主要作用

  1. 有一个变量ambSubscribers,用来存储Subscriber对象,在这具体的实现是AmbSubscriber
  2. 取消订阅其他的Subscriber
    看完了Selection,那么我们再回过去看call方法中的代码。
//第一部分
final Selection<T> selection = new Selection<T>();
//第二部分
 subscriber.add(Subscriptions.create...));
//第三部分
 for (Observable<? extends T> source : sources) {}
//第四部分
if (subscriber.isUnsubscribed()) {
            unsubscribeAmbSubscribers(selection.ambSubscribers);
        }
//第五部分
subscriber.setProducer(...)

这样子标明代码后,其实我们只要一个一个去了解它的意思就好了
第一部分只是创建了一个Selection,就不过多介绍了,下面我们来看第二部分。

第二部分

第二部分呢其实也非常的简单,只是往Subscriber中添加了一个Subscription

Subscriber

...
 public final void add(Subscription s) {
        subscriptions.add(s);
    }

    @Override
    public final void unsubscribe() {
        subscriptions.unsubscribe();
    }
...

Subscriber中本身保存了一个List<Subscription>,当这个Subscriber取消注册的时候,把List<Subscription>中的所有的Subscription取消。

第三部分

这一部分,其实跟我前面所做的猜想是一致的。所以大家有时间的时候不妨也试一试,也是很锻炼的。
我们来详细的看下第三部分的代码。

for (Observable<? extends T> source : sources) {
            if (subscriber.isUnsubscribed()) {
                break;
            }
            AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(0, subscriber, selection);
            selection.ambSubscribers.add(ambSubscriber);
            AmbSubscriber<T> c;
            if ((c = selection.get()) != null) {
                selection.unsubscribeOthers(c);
                return;
            }
            source.unsafeSubscribe(ambSubscriber);
        }

源代码中本身有很多注释,为了节省空间,我把注释都去掉了,
在这段代码中,最重要的一句是

AmbSubscriber<T> ambSubscriber = new AmbSubscriber<T>(0, subscriber, selection);

前面我一直在强调,其实源码的阅读,最重要的是看OnSubscribeSubsciber的子类。源代码中经常会在原来的OnSubscribeSubsciber给你外面套上一层,你要是没仔细看的话,就不能完全看懂。

所以这里也是,使用了AmbSubscriber给原来的Subsciber包了一层,所以当我们subscribe订阅消息的时候,是先调用了AmbSubscriber中的onNext等方法,然后才会调用了我们自己的Subscriber

下面一句是

if ((c = selection.get()) != null) {
                selection.unsubscribeOthers(c);
                return;
            }

selection.get()出来的是AmbSubscriber对象,如果这个对象不为空,就说明,已经有一个OnSubscibe执行完成了,这其实是我的一个结论,就前面的代码我们还无法分析出这个结论,但是这里我先把结论给出,在后面我们可以再回过来看。会更加明白。

最后一句

source.unsafeSubscribe(ambSubscriber);

source其实就是我们调用amb操作符的时候传入的Observable。调用用了unsafeSubscribe方法。

Observable

public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
            subscriber.onStart();
            RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
...

这个方法跟前面一篇文章提到的subscribe是很像的,区别就在于没有另外再把参数Subscriber包一层SafeSubscriber

所以最后也是一样的调用的是

observable. onSubscribe.call(subscriber)

而这里的subscriber其实是AmbSubscriber,onSubscribe其实就是我们在demo中传入的OnSubscribe对象。下面我们再来看下demo的代码

...
  observable = Observable.amb(Observable.create(object : Observable.OnSubscribe<String> {
            override fun call(t: Subscriber<in String>) {
                t.onNext("Test1")
            }
        }), Observable.create(object : Observable.OnSubscribe<String> {
            override fun call(t: Subscriber<in String>) {
                t.onNext("Test2")
            }
        }))
...

demo中传入的OnSubscribe对象中的call方法写的很简单,直接调用Subscriber中的onNext方法。也就是AmbSubscriberonNext方法

AmbSubscriber

...
public void onNext(T t) {
            if (isSelected()) {
                subscriber.onNext(t);
            }
        }
...
private boolean isSelected() {
            if (chosen) {
                return true;
            }
            if (selection.get() == this) {
                // fast-path
                chosen = true;
                return true;
            } else {
                if (selection.compareAndSet(null, this)) {
                    selection.unsubscribeOthers(this);
                    chosen = true;
                    return true;
                } else {
                    // we lost so unsubscribe ... and force cleanup again due to possible race conditions
                    selection.unsubscribeLosers();
                    return false;
                }
            }
        }
    }
...

AmbSubscriber代码写的非常的简单,我这里简单做个翻译。
先判断是否被选中了,如果被选中了,就直接执行SubscriberonNext,这里的Subscriber也就是我们自己demo中写的onNextSubscriber(其实是被源码包了一层的ActionSubscriber,这不是重点)。

最重要的代码是这一段

 if (selection.compareAndSet(null, this)) {
                    selection.unsubscribeOthers(this);
                    chosen = true;
                    return true;
                } 

selection.get()获取的AmbSubscriber不是this,不是的话,就selection.compareAndSet(null, this),前面介绍了Selection其实是一个Atomic**的对象,这种对象有个常用的方法就是compareAndSet(null, this),意思就是如果selection的值null就把this设置为它的值,如果为null 返回true否者返回false

所以这一段代码其实意思很好理解,第一个AmbSubscriber调用了onNext方法,就会把selection的值设置为this,下一个AmbSubscriber再来执行的话就晚了。并且在第一个AmbSubscriberselection的值设置为this之后,还会对其他AmbSubscriber取消订阅。

第四部分

直接可以看出,如果被取消订阅的话,就直接把就List< AmbSubscriber >中所有的Subscriber取消订阅,然后清空。这一步主要是防止用户可能会取消订阅。

第五部分

这几部分其实第三部分是最重要的,没有4,5部分,我们也对Amb操作符有了深入的理解了。但是第五部分对理解RxJava源码本身,有重要的作用。

OnSubscribeAmb

 subscriber.setProducer(new Producer() {
            @Override
            public void request(long n) {
            ...
            }
})

Subscriber

public void setProducer(Producer p) {
       ...
            if (toRequest == NOT_SET) {
                producer.request(Long.MAX_VALUE);
            } else {
                producer.request(toRequest);
            }
        }
    }

其他不需要看,我只看到了producer.request,也就是说,其实setProducer就是直接调用了request方法,当然本身没那么简单,但是如果我们是第一次去理解的话,就按最简单的去理解,由浅入深,是很好的一种学习方式。

那么我们回过去再看

OnSubscribeAmb

subscriber.setProducer(new Producer() {

            @Override
            public void request(long n) {
                AmbSubscriber<T> c;
                if ((c = selection.get()) != null) {
                    // propagate the request to that single Subscriber that won
                    c.requestMore(n);
                } else {
                    //propagate the request to all the amb subscribers
                    for (AmbSubscriber<T> ambSubscriber: selection.ambSubscribers) {
                        if (!ambSubscriber.isUnsubscribed()) {
                            // make a best endeavours check to not waste requests
                            // if first emission has already occurred
                            if (selection.get() == ambSubscriber) {
                                ambSubscriber.requestMore(n);
                                // don't need to request from other subscribers because choice has been made
                                // and request has gone to choice
                                return;
                            } else {
                                ambSubscriber.requestMore(n);
                            }
                        }
                    }
                }
            }
        });

AmbSubscriber

...
private void requestMore(long n) {
            request(n);
        }
...

可以看到AmbSubscriberrequestMore就是去调用父类Subscriberrequest方法。

OnSubscribe

...
 protected final void request(long n) {
        if (n < 0) {
            throw new IllegalArgumentException("number requested cannot be negative: " + n);
        }

        // if producer is set then we will request from it
        // otherwise we increase the requested count by n
        Producer producerToRequestFrom;
        synchronized (this) {
            if (producer != null) {
                producerToRequestFrom = producer;
            } else {
                addToRequested(n);
                return;
            }
        }
        // after releasing lock (we should not make requests holding a lock)
        producerToRequestFrom.request(n);
    }
...

Subscriberrequest方法呢 其实主要就做1件事情,

  1. addToRequested
  private void addToRequested(long n) {
        if (requested == NOT_SET) {
            requested = n;
        } else {
            final long total = requested + n;
            // check if overflow occurred
            if (total < 0) {
                requested = Long.MAX_VALUE;
            } else {
                requested = total;
            }
        }
    }

其实就是记录了下总的请求数

  1. producerToRequestFrom.request(n);

由于我们现在demo很简单,只是单个操作符,所以如果跟入代码的话,可能就不会运行到这里,producerToRequestFrom大家可以看出来其实也是一个 Producer。这就我们就先不深入,先留个大概的印象,RxJava的源码有很多的这样的嵌套,OnSubscribe再包一层OnSubscribe,Subscriber再包一层Subscriber

总结

其实总体下来,已经把Amb操作符的源码过了一遍,但是很可能大家还存在一些问题。比如我自己,我刚开始在看的时候就发现了一个问题。就是第五部分代码执行完毕后会怎么样呢?

Amb操作符在前面介绍了,很可能是在异步的情况下操作,也就是说当call方法都执行完毕了,AmbSubscriberonNext方法还是没有被执行,通过前面的代码我们可以看到,只有当第一个的AmbSubscriberonNext方法执行后,其他AmbSubscriber才会被取消订阅。所以如果是异步操作的话,多个AmbSubscriber其实是一直存在于内存等待运行的。

所以在这里其实我们很容易看出,为什么我们最好要手动的去调用unsubscribe方法,因为在异步的情况下,是很容易造成内存泄漏的。

由于这篇文章有很多地方引用了上一篇文章的内容,如果没有看过上一篇文章的话,请先阅读下上一篇文章,这样能更好理解

我是最简单的操作符-----Create

相关文章

网友评论

  • wsxiaoluob:厉害啊,之前不懂这一块的东西,看了这篇文章简直茅塞顿开,作者写的真是太好了,很详细解释的很到位。并且还有源码的思考,简直太完美了!!
  • ZZombiee:不明觉厉
    javalong:@ZZombiee 不要附和的太明显

本文标题:看看谁最快的操作符-----Amb

本文链接:https://www.haomeiwen.com/subject/hrzdkftx.html