Rxjava

作者: Jon_Snow09 | 来源:发表于2019-04-29 19:36 被阅读0次

    开始接触Rxjava是在16年年底,当时由于自身水平的局限,并没有理解到这个框架的优秀与强大。现在两年多时间过去了,辗转了几个项目之后,从最初只知道与Retrofit配合进行网络请求,到现在已经可以灵活的把Rxjava运用到几乎所有的异步操作中,下面我想结合Rxjava的源码,谈谈如何使用这个框架。

    首先,我们来看看Rxjava最基础的用法。

    Flowable.create(new FlowableOnSubscribe<Integer>() {
        @Override
        public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
            int i = 0;
    
            while (i < 10) {
                SystemClock.sleep(1000);
                emitter.onNext(i++);
            }
    
            emitter.onComplete();
        }
    }, BackpressureStrategy.BUFFER).subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    s.request(Long.MAX_VALUE);
                }
    
                @Override
                public void onNext(Integer integer) {
                    Log.e(TAG, "onNext: " + integer);
                }
    
                @Override
                public void onError(Throwable t) {
                }
    
                @Override
                public void onComplete() {
                    Log.e(TAG, "onComplete: ");
                }
            });
    

    这段代码非常简单,开启了一个子线程,每隔一秒发射一个int数出来,回调方法触发后把这个int数打印到控制台,那么我们开始查看源码,看看整个过程到底发生了什么。

    1.create方法

    public static <T> Flowable<T> create(FlowableOnSubscribe<T> source, BackpressureStrategy mode) {
        ObjectHelper.requireNonNull(source, "source is null");
        ObjectHelper.requireNonNull(mode, "mode is null");
        return RxJavaPlugins.onAssembly(new FlowableCreate<T>(source, mode));
    }
    

    create方法的源码如下,现在我们把不重要的部分忽略,就可以剥离出关键的代码:

    • BackpressureStrategy是Rxjava2的新特性,忽略掉
    • ObjectHelper的静态方法,是用来判断是不是null的,忽略掉
    • onAssembly方法各位可以点过去看看,它的作用是「如果当前给Rxjava设置过一个Function,则此时把这个Funtion应用给Flowable」,不在我们此次研究的范围之内,忽略掉

    这样一来,结论就很清晰了,我们需要关注的就是:

    • FlowableOnSubscribe
    • FlowableCreate

    第一个关键点:

    public interface FlowableOnSubscribe<T> {
    
        /**
         * Called for each Subscriber that subscribes.
         * @param emitter the safe emitter instance, never null
         * @throws Exception on error
         */
        void subscribe(@NonNull FlowableEmitter<T> emitter) throws Exception;
    }
    

    FlowableOnSubscribe是一个接口,包含一个方法,这个方法在subscriber的subscribe方法执行时被调用

    第二个关键点:

    public final class FlowableCreate<T> extends Flowable<T> {
    
        final FlowableOnSubscribe<T> source;
    
        final BackpressureStrategy backpressure;
    
        public FlowableCreate(FlowableOnSubscribe<T> source, BackpressureStrategy backpressure){
            this.source = source;
            this.backpressure = backpressure;
        }
    
        @Override
        public void subscribeActual(Subscriber<? super T> t) {
            BaseEmitter<T> emitter;
    
            switch (backpressure) {
            case MISSING: {
                emitter = new MissingEmitter<T>(t);
                break;
            }
            case ERROR: {
                emitter = new ErrorAsyncEmitter<T>(t);
                break;
            }
            case DROP: {
                emitter = new DropAsyncEmitter<T>(t);
                break;
            }
            case LATEST: {
                emitter = new LatestAsyncEmitter<T>(t);
                break;
            }
            default: {
                emitter = new BufferAsyncEmitter<T>(t, bufferSize());
                break;
            }
            }
    
            t.onSubscribe(emitter);
            try {
                source.subscribe(emitter);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                emitter.onError(ex);
            }
        }
    }
    

    可以看到,FlowableCreate继承Flowable类,我们先来简单的研究一下Flowable这个类:

    Flowable是整个Rxjava中最重要的类,它实现了Publisher接口,这个接口中包含的其实就是Flowable最核心的subscribe方法,我们看看subscribe的源码:

    public final void subscribe(FlowableSubscriber<? super T> s) {
        ObjectHelper.requireNonNull(s, "s is null");
        try {
            Subscriber<? super T> z = RxJavaPlugins.onSubscribe(this, s);
    
            ObjectHelper.requireNonNull(z, "The RxJavaPlugins.onSubscribe hook returned a null FlowableSubscriber. Please check the handler provided to RxJavaPlugins.setOnFlowableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
    
            subscribeActual(z);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Subscription has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);
    
            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
    

    所以,subscribe方法基本等于subscribeActual方法,而subscribeActual方法是抽象的,也是整个Flowable中唯一一个抽象方法

    我们再回头看FlowableCreate,它继承了Flowable方法,并且实现了subscribeActual方法,那么具体是怎样实现的呢,在上面的源码中可以看到,其实这个方法做了三件事:

    1. 用Subscriber对象创建出了一个emitter对象
    2. 执行了t.onSubscribe(emitter)这句代码。这个就比较有意思了,根据这句代码执行的位置,我们不难得出结论:subscriber的onSubscibe这个回调,和其他的三个回调有本质区别,首先,这个回调是在当前线程执行的,不受线程切换的影响,其次,这个方法是在subscrbeActural中直接调用了,也就是说不受子线程中代码执行的影响。
    3. 执行了source.subscribe(emitter),也就是create时实现的那个接口的方法,换句话说,如果不调用Flowable的subscribe方法,最开始FlowableOnSubscribe接口中的那个方法中的代码是不会执行的

    分析到了这里,我们可以给create方法下一个结论:

    create方法,传入了一个FlowableOnSubscribe接口的子类对象,通过这个对象实现了Flowable的subscribeActual方法,从而得到了一个Flowable的对象。

    2.线程是如何切换的

    a.如何切换订阅的线程

    下面是第一段源码

    public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler) {
            //第一句判断scheduler是否为空
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        //然后调用另一个重载方法,第二个参数是「当前Flowable对象是否属于刚刚创建的Flowable」
        return subscribeOn(scheduler, !(this instanceof FlowableCreate));
    }
    

    调用的重载方法里面如何处理的呢

    public final Flowable<T> subscribeOn(@NonNull Scheduler scheduler, boolean requestOn) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        //用传入的scheduler对象将当前Flowable对象转换成FlowableSubscribeOn对象
        return RxJavaPlugins.onAssembly(new FlowableSubscribeOn<T>(this, scheduler, requestOn));
    }
    

    那么FlowableSubscribeOn是怎么一回事呢,首先,这个类继承AbstractFlowableWithUpstream

    abstract class AbstractFlowableWithUpstream<T, R> extends Flowable<R> implements HasUpstreamPublisher<T> {
    
        /**
         * The upstream source Publisher.
         */
        protected final Flowable<T> source;
    
        /**
         * Constructs a FlowableSource wrapping the given non-null (verified)
         * source Publisher.
         * @param source the source (upstream) Publisher instance, not null (verified)
         */
        AbstractFlowableWithUpstream(Flowable<T> source) {
            this.source = ObjectHelper.requireNonNull(source, "source is null");
        }
    
        @Override
        public final Publisher<T> source() {
            return source;
        }
    }
    

    我们分析一下这个类,AbstractFlowableWithUpstream这个类有两个泛型,第二个泛型是给父类使用的,第一个泛型则是给自己的一个成员变量使用的,这个成员变量也是一个Flowable。

    总结一下:AbstractFlowableWithUpstream继承Flowable,但是它额外持有一个Flowable对象作为成员变量,两个Flowable分别对应两个泛型,AbstractFlowableWithUpstream自身的泛型是当前要输出的数据类型,持有的成员变量Flowable的代表的是上游的数据类型,所以AbstractFlowableWithUpstream应该理解为一个带有上游Flowable的Flowable。

    所以回头看subscribeOn方法,其实是创建了一个新的Flowable对象,原先的Flowable对象作为成员变量保留在新的Flowable中。

    这里最后一个需要注意的点,是FlowableSubscribeOn类,它的subscribeActual方法代码如下

    @Override
    public void subscribeActual(final Subscriber<? super T> s) {
        Scheduler.Worker w = scheduler.createWorker();
        final SubscribeOnSubscriber<T> sos = new SubscribeOnSubscriber<T>(s, w, source, nonScheduledRequests);
        s.onSubscribe(sos);
        
        w.schedule(sos);
    }
    

    这里有一个SubscribeOnSubscriber,它是FlowableSubscribeOn的一个静态内部类,源码如下

    static final class SubscribeOnSubscriber<T> extends AtomicReference<Thread>
    implements FlowableSubscriber<T>, Subscription, Runnable {
    
        private static final long serialVersionUID = 8094547886072529208L;
    
        final Subscriber<? super T> downstream;
    
        final Scheduler.Worker worker;
    
        final AtomicReference<Subscription> upstream;
    
        final AtomicLong requested;
    
        final boolean nonScheduledRequests;
    
        Publisher<T> source;
    
        SubscribeOnSubscriber(Subscriber<? super T> actual, Scheduler.Worker worker, Publisher<T> source, boolean requestOn) {
            this.downstream = actual;
            this.worker = worker;
            this.source = source;
            this.upstream = new AtomicReference<Subscription>();
            this.requested = new AtomicLong();
            this.nonScheduledRequests = !requestOn;
        }
    
        @Override
        public void run() {
            lazySet(Thread.currentThread());
            Publisher<T> src = source;
            source = null;
            src.subscribe(this);
        }
    
        @Override
        public void onSubscribe(Subscription s) {
            if (SubscriptionHelper.setOnce(this.upstream, s)) {
                long r = requested.getAndSet(0L);
                if (r != 0L) {
                    requestUpstream(r, s);
                }
            }
        }
    
        @Override
        public void onNext(T t) {
            downstream.onNext(t);
        }
    
        @Override
        public void onError(Throwable t) {
            downstream.onError(t);
            worker.dispose();
        }
    
        @Override
        public void onComplete() {
            downstream.onComplete();
            worker.dispose();
        }
    
        @Override
        public void request(final long n) {
            if (SubscriptionHelper.validate(n)) {
                Subscription s = this.upstream.get();
                if (s != null) {
                    requestUpstream(n, s);
                } else {
                    BackpressureHelper.add(requested, n);
                    s = this.upstream.get();
                    if (s != null) {
                        long r = requested.getAndSet(0L);
                        if (r != 0L) {
                            requestUpstream(r, s);
                        }
                    }
                }
            }
        }
    
        void requestUpstream(final long n, final Subscription s) {
            if (nonScheduledRequests || Thread.currentThread() == get()) {
                s.request(n);
            } else {
                worker.schedule(new Request(s, n));
            }
        }
    
        @Override
        public void cancel() {
            SubscriptionHelper.cancel(upstream);
            worker.dispose();
        }
    
        static final class Request implements Runnable {
            final Subscription upstream;
            final long n;
    
            Request(Subscription s, long n) {
                this.upstream = s;
                this.n = n;
            }
    
            @Override
            public void run() {
                upstream.request(n);
            }
        }
    }
    

    首先,SubscribeOnSubscriber实现了三个接口,分别是FlowableSubscriber,Subscription和Runnable,实现Subscription接口,是为了作为s.onSubscribe(sos)的参数,在subscribe方法执行时被调用,实现Runnable,是为了作为w.schedule(sos)的参数,也就是把run方法中的代码放到指定线程中执行,那么我们看看run方法中是什么代码,没错,是上游的Flowable的subscribe方法,也就是FlowableCreate的subscribe方法,也就是我们最开始分析的FlowableOnSubscribe中的subscribe方法中的代码。

    这里还有一个细节:FlowableSubscribeOn的subscribeActual方法中我们调用了s.onSubscribe(sos),经过前面的分析,我们也知道上游的Flowable的subscribeActual方法中也有调用Subscriber的onSubscribe方法,那么这个方法会被调用两次吗,其实是不会的,因为在FlowableSubscribeOn的subscribeActual方法中,SubscribeOnSubscriber其实是一个特殊的Subscriber它实现的接口如下:

    public interface FlowableSubscriber<T> extends Subscriber<T> {
    
        /**
         * Implementors of this method should make sure everything that needs
         * to be visible in {@link #onNext(Object)} is established before
         * calling {@link Subscription#request(long)}. In practice this means
         * no initialization should happen after the {@code request()} call and
         * additional behavior is thread safe in respect to {@code onNext}.
         *
         * {@inheritDoc}
         */
        @Override
        void onSubscribe(@NonNull Subscription s);
    }
    

    FlowableSubscriber继承了Subscriber,但是做了一点小小的改动,onSubscribe方法由public变成了默认的,也就意味着,上游的Flowable再去调用的时候,是不成功的。

    b.如何切换观察的线程

    这里我们追踪源码,会发现与上面有相似之处,也是新创建一个Flowable,当前flowable作为上游的flowable,当作参数保存在新创建的flowable中,那么这个新的flowable是如何改变观察线程的呢?

    我们来看看它的subscribeActual方法

    @Override
    public void subscribeActual(Subscriber<? super T> s) {
            //这里scheduler生成了一个worker对象
        Worker worker = scheduler.createWorker();
    
            //这里source(也就是上游的flowable)subscribe了新创建的ObserveOnSubscriber对象
        if (s instanceof ConditionalSubscriber) {
            source.subscribe(new ObserveOnConditionalSubscriber<T>(
                    (ConditionalSubscriber<? super T>) s, worker, delayError, prefetch));
        } else {
            source.subscribe(new ObserveOnSubscriber<T>(s, worker, delayError, prefetch));
        }
    }
    

    这里有一个非常重要的类,ObserveOnSubscriber,首先我们看看它的父类

    abstract static class BaseObserveOnSubscriber<T>
    extends BasicIntQueueSubscription<T>
    implements FlowableSubscriber<T>, Runnable {
        private static final long serialVersionUID = -8241002408341274697L;
    
        final Worker worker;
    
        final boolean delayError;
    
        final int prefetch;
    
        final int limit;
    
        final AtomicLong requested;
    
        Subscription upstream;
    
        SimpleQueue<T> queue;
    
        volatile boolean cancelled;
    
        volatile boolean done;
    
        Throwable error;
    
        int sourceMode;
    
        long produced;
    
        boolean outputFused;
    
        BaseObserveOnSubscriber(
                Worker worker,
                boolean delayError,
                int prefetch) {
            this.worker = worker;
            this.delayError = delayError;
            this.prefetch = prefetch;
            this.requested = new AtomicLong();
            this.limit = prefetch - (prefetch >> 2);
        }
    
        @Override
        public final void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode == ASYNC) {
                trySchedule();
                return;
            }
            if (!queue.offer(t)) {
                upstream.cancel();
    
                error = new MissingBackpressureException("Queue is full?!");
                done = true;
            }
            trySchedule();
        }
    
        @Override
        public final void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            trySchedule();
        }
    
        @Override
        public final void onComplete() {
            if (!done) {
                done = true;
                trySchedule();
            }
        }
    
        @Override
        public final void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                BackpressureHelper.add(requested, n);
                trySchedule();
            }
        }
    
        @Override
        public final void cancel() {
            if (cancelled) {
                return;
            }
    
            cancelled = true;
            upstream.cancel();
            worker.dispose();
    
            if (getAndIncrement() == 0) {
                queue.clear();
            }
        }
    
        final void trySchedule() {
            if (getAndIncrement() != 0) {
                return;
            }
            worker.schedule(this);
        }
    
        @Override
        public final void run() {
            if (outputFused) {
                runBackfused();
            } else if (sourceMode == SYNC) {
                runSync();
            } else {
                runAsync();
            }
        }
    
        abstract void runBackfused();
    
        abstract void runSync();
    
        abstract void runAsync();
    
        final boolean checkTerminated(boolean d, boolean empty, Subscriber<?> a) {
            if (cancelled) {
                clear();
                return true;
            }
            if (d) {
                if (delayError) {
                    if (empty) {
                        cancelled = true;
                        Throwable e = error;
                        if (e != null) {
                            a.onError(e);
                        } else {
                            a.onComplete();
                        }
                        worker.dispose();
                        return true;
                    }
                } else {
                    Throwable e = error;
                    if (e != null) {
                        cancelled = true;
                        clear();
                        a.onError(e);
                        worker.dispose();
                        return true;
                    } else
                    if (empty) {
                        cancelled = true;
                        a.onComplete();
                        worker.dispose();
                        return true;
                    }
                }
            }
    
            return false;
        }
    
        @Override
        public final int requestFusion(int requestedMode) {
            if ((requestedMode & ASYNC) != 0) {
                outputFused = true;
                return ASYNC;
            }
            return NONE;
        }
    
        @Override
        public final void clear() {
            queue.clear();
        }
    
        @Override
        public final boolean isEmpty() {
            return queue.isEmpty();
        }
    }
    

    我们首先注意,这个Observer类的几个重要方法(onNext,onError,onComplete)中都没有做什么实际的逻辑,但是他们都调用了一个trySchedule方法。继续追踪,我们可以定位到run方法中,最后是在指定的线程调用了run方法,run方法中的几个方法都是抽象的,我们来看子类的实现,比如下面这个runSync方法

    @Override
    void runSync() {
        int missed = 1;
    
        final Subscriber<? super T> a = downstream;
        final SimpleQueue<T> q = queue;
    
        long e = produced;
    
        for (;;) {
    
            long r = requested.get();
    
            while (e != r) {
                T v;
    
                try {
                    v = q.poll();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    cancelled = true;
                    upstream.cancel();
                    //这里调用了下游subscriber的onError
                    a.onError(ex);
                    worker.dispose();
                    return;
                }
    
                if (cancelled) {
                    return;
                }
                if (v == null) {
                    cancelled = true;
                    //这里调用了下游subscriber的onComplete
                    a.onComplete();
                    worker.dispose();
                    return;
                }
    
                            //这里调用了下游subscriber的onNext
                a.onNext(v);
    
                e++;
            }
    
            if (cancelled) {
                return;
            }
    
            if (q.isEmpty()) {
                cancelled = true;
                a.onComplete();
                worker.dispose();
                return;
            }
    
            int w = get();
            if (missed == w) {
                produced = e;
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            } else {
                missed = w;
            }
        }
    }
    

    看到这里我们可以下结论了:这里通过封装了一个ObserveOnSubscriber类,每次调用这个类的onNext等等回调时,就会尝试在指定线程中触发下游的Subscriber的回调。

    3.subscribe方法

    前面已经分析过了,这里就不再赘述。

    相关文章

      网友评论

          本文标题:Rxjava

          本文链接:https://www.haomeiwen.com/subject/gamunqtx.html