美文网首页
RxJava3.0源码解读

RxJava3.0源码解读

作者: Li李萌 | 来源:发表于2021-01-25 21:55 被阅读0次

    最近我准备在年后离职,所以就看了看RxJava的源码,相信我会加入到年后的求职大军中23333。其实现在突然离开苏州还有点不舍,从18年来到苏州我在这里呆了2-3年了,去过很多地方,有很多的朋友都在这里,也对这个公司很熟悉。现在开始做年后的面试准备,我会和大家一起看看源码,今天我们就从RxJava开始。之前RxJava用过很多次但是没有怎么看过它的源码,今天就好好看。

    首先我们要弄清楚RxJava中的几个类:

    • Observable:被订阅者,是事件的来源,通过Emitter发射数据给Observer。
    • Observer:订阅者,通过注册(onSubscribe)过程传给被订阅者,订阅者监听开始订阅,监听订阅过程中会把Disposable传给订阅者,然后在被订阅者中的发射器(Emitter)发射数据给订阅者(Observer)。
    • Disposable:释放器,通常有两种方式会返回Disposable,一个是在Observer的onSubscribe方法回调回来,第二个是在subscribe订阅方法传consumer的时候会返回。
    • Emitter:发射器,在发射器中会接收下游的订阅者(Observer),然后在发射器相应的方法把数据传给订阅者(Observer)。
    • Scheduler:调度器,用于切换线程,不同的调度器(Scheduler)可以将代码放入到不同线程去执行和观察。
    • Consumer:消费器,消费器其实是Observer的一种变体,Observer的每一个方法都会对应一个Consumer,比如Observer的onNext、onError、onComplete、onSubscribe都会对应一个Consumer。

    1、操作符create源码分析

    下面是一个简单的实例代码:

    
    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@io.reactivex.rxjava3.annotations.NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(@io.reactivex.rxjava3.annotations.NonNull Disposable d) {
                    Log.d(TAG, "onSubscribe: ");
                }
    
                @Override
                public void onNext(@io.reactivex.rxjava3.annotations.NonNull Integer s) {
                    Log.d(TAG, "onNext: " + s);
                }
    
                @Override
                public void onError(@io.reactivex.rxjava3.annotations.NonNull Throwable e) {
                    Log.d(TAG, "onError: " + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete: ");
                }
            });
    
    

    Observable.create()方法的实现是这样子的

     public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
            Objects.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
        }
    

    里面new了一个ObservableCreate对象传到了RxJavaPlugins.onAssembly()中,而RxJavaPlugins.onAssembly()返回了什么呢?我们看一下下面的代码:

        @NonNull
        public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
            Function<? super Observable, ? extends Observable> f = onObservableAssembly;
            if (f != null) {
                return apply(f, source);
            }
            return source;
        }
    

    这段代码中因为我们没有初始化f所以它就是null,所以RxJavaPlugins.onAssembly()返回的就是传入的对象的本身,返回的就是source

    2、subscribe()操作符的源码分析

    以下代码有省略,我只放出来关键部分的代码

    public abstract class Observable<@NonNull T> implements ObservableSource<T> {
    
        public final void subscribe(@NonNull Observer<? super T> observer) {
            observer = RxJavaPlugins.onSubscribe(this, observer); 
            subscribeActual(observer);
        }
    
        protected abstract void subscribeActual(@NonNull Observer<? super T> observer);
    }
    

    其中RxJavaPlugins.onSubscribe()和上面的RxJavaPlugins.onAssembly()一样,返回的就是我们传入的observersubscribeActual()调用的就是我们上游的 Observable.create()创建的ObservableCreate对象,它是Observable的子类。大家记住这个subscribeActual()方法它是之后我们要讲的其它操作符的关键。我们再看看subscribeActual()具体实现中做了什么?

    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
    
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<>(observer);
            observer.onSubscribe(parent);
            source.subscribe(parent);
        }
    
        static final class CreateEmitter<T>
            extends AtomicReference<Disposable>
            implements ObservableEmitter<T>, Disposable {
            final Observer<? super T> observer;
    
            CreateEmitter(Observer<? super T> observer) {
                this.observer = observer;
            }
    
            @Override
            public void onNext(T t) {
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    
            @Override
            public void onError(Throwable t) {
                if (!tryOnError(t)) {
                    RxJavaPlugins.onError(t);
                }
            }
    
            @Override
            public boolean tryOnError(Throwable t) {
                if (!isDisposed()) {
                    try {
                        observer.onError(t);
                    } finally {
                        dispose();
                    }
                    return true;
                }
                return false;
            }
    
            @Override
            public void onComplete() {
                if (!isDisposed()) {
                    try {
                        observer.onComplete();
                    } finally {
                        dispose();
                    }
                }
            }
        }
    }
    

    subscribeActual()实际上它创建了一个发射器new CreateEmitter<>(observer)并且把我们在下游创建的Observer对面给传入进去了。而这个source就是我们一开始Observable.create()时我们自己创建的匿名对象,如下代码所示:

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@io.reactivex.rxjava3.annotations.NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            })
    

    实际上source.subscribe(parent)就会调用到我们new的匿名对象里面,我们调用发射器的时候如emitter.onNext(1)就会执行发射器的中的onNext(),下面代码中的observer就是我们在subscribe()订阅时我们创建的Observer(观察者)匿名对象

    
     @Override
            public void onNext(T t) {
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    

    小总结:

    • 当我们Observable.create()时就会new ObservableCreate<>(source)这个source就是我们创建的匿名对象,ObservableCreate作为它的成员变量保存起来。
    • 我们去调用subscribe()方法时,会执行上游的Observable中的subscribeActual()方法,而我们上游创建的是ObservableCreate对象,所以接下来它会执行ObservableCreate类中的subscribeActual(observer)方法。
      -subscribeActual()方法会把我们创建的Observer对象传入到CreateEmitter(发射器)中。并且执行source.subscribe(parent)。此时就会调用我们Observable.create()时创建的匿名对象中的subscribe()方法。
    • 当我们调用emitter.onNext(1)就会调用我们创建的Observer中的onNext()方法

    3、map操作符源码解读

    我们先看一段示例代码:

            Observable<Integer> createObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@io.reactivex.rxjava3.annotations.NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            });
            Observable<String> mapObservable = createObservable.map(new Function<Integer, String>() {
                @Override
                public String apply(Integer integer) throws Throwable {
                    return String.valueOf(integer);
                }
            });
            Observer<String> observer = new Observer<String>() {
                @Override
                public void onSubscribe(@io.reactivex.rxjava3.annotations.NonNull Disposable d) {
                    Log.d(TAG, "onSubscribe: ");
                }
    
                @Override
                public void onNext(@io.reactivex.rxjava3.annotations.NonNull String s) {
                    Log.d(TAG, "onNext: " + s);
                }
    
                @Override
                public void onError(@io.reactivex.rxjava3.annotations.NonNull Throwable e) {
                    Log.d(TAG, "onError: " + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete: ");
                }
            };
            mapObservable.subscribe(observer);
    

    通过上面我们知道create()操作符会创建一个ObservableCreate对象,同理map操作符会创建一个ObservableMap,他们都是被观察者都继承了Observable,他们能完成不同的功能其实就是通过subscribeActual()方法实现的。下面我们将具体来看一下。

        public final <R> Observable<R> map(@NonNull Function<? super T, ? extends R> mapper) {
            Objects.requireNonNull(mapper, "mapper is null");
            return RxJavaPlugins.onAssembly(new ObservableMap<>(this, mapper));
        }
    

    在调用map()时会创建一个ObservableMap对象。这个this就是createObservable对象,mapper就是我们创建的匿名对象。同理当我们调用subscribe()订阅方法的时候就会执行ObservableMap类中的subscribeActual()方法。

    public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
     final Function<? super T, ? extends U> function;
    
        public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
            super(source);//上游的被观察者对象,如ObservableCreate
            this.function = function;//我们在map()方法中传入的匿名对象
        }
    
        @Override
        public void subscribeActual(Observer<? super U> t) {
            source.subscribe(new MapObserver<T, U>(t, function));//创建一个观察者对象,再订阅上游的被观察者
        }
        static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
            final Function<? super T, ? extends U> mapper;//我们在map()方法中传入的匿名对象
    
            MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
                super(actual);
                this.mapper = mapper;
            }
    
            @Override
            public void onNext(T t) {
                if (done) {//当执行了onComplete()或者onError()就不再执行
                    return;
                }
    
                if (sourceMode != NONE) {
                    downstream.onNext(null);
                    return;
                }
    
                U v;
    
                try {
                  //调用了我们创建的匿名对象中的applp()方法
                    v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
                } catch (Throwable ex) {
                    fail(ex);
                    return;
                }
              //downstream:下游的Observer对象
                downstream.onNext(v);
            }
    
            @Override
            public int requestFusion(int mode) {
                return transitiveBoundaryFusion(mode);
            }
        }
    }
    

    subscribeActual()方法中创建了一个观察者对象(MapObserver),同时去订阅上游的被观察者(ObservableCreate)。

    小总结

    • map操作符在subscribeActual()中创建了一个观察者对象MapObserver,通过subscribe()去订阅上游的被观察者。
    • 当上游的被观察者ObservableCreate发生变化之后(如调用了emitter.onNext(1))就会执行MapObserver类中的onNext()方法。
    • 然后再调用我们传入的Function对象中的apply()方法。并将返回(v)值通过onNext()传入到下游的Observer(我们创建的匿名对象)中的onNext()方法中。

    4、subscribeOn操作符(线程切换)

    如下示例代码,我们通过subscribeOn()操作符去切换线程,从而使被观察者在哪个线程去执行。

            Observable<Integer> createObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(@io.reactivex.rxjava3.annotations.NonNull ObservableEmitter<Integer> emitter) throws Throwable {
                    emitter.onNext(1);
                    emitter.onNext(2);
                    emitter.onNext(3);
                    emitter.onComplete();
                }
            });
            
            Observable<Integer> observableSubscribeOn = createObservable.subscribeOn(Schedulers.io());
    
            Observer<Integer> observer = new Observer<Integer>() {
                @Override
                public void onSubscribe(@io.reactivex.rxjava3.annotations.NonNull Disposable d) {
                    Log.d(TAG, "onSubscribe: ");
                }
    
                @Override
                public void onNext(@io.reactivex.rxjava3.annotations.NonNull Integer s) {
                    Log.d(TAG, "onNext: " + s);
                }
    
                @Override
                public void onError(@io.reactivex.rxjava3.annotations.NonNull Throwable e) {
                    Log.d(TAG, "onError: " + e.getMessage());
                }
    
                @Override
                public void onComplete() {
                    Log.d(TAG, "onComplete: ");
                }
            };
            observableSubscribeOn.subscribe(observer);
    

    下面我们看一下subscribeOn()的具体实现:

        public final Observable<T> subscribeOn(@NonNull Scheduler scheduler) {
            Objects.requireNonNull(scheduler, "scheduler is null");
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<>(this, scheduler));
        }
    

    subscribeOn()中创建了一个ObservableSubscribeOn(被观察者)的对象

    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
    
        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            super(source);
            this.scheduler = scheduler;
        }
    
        @Override
        public void subscribeActual(final Observer<? super T> observer) {
            //创建一个新的观察者(SubscribeOnObserver)并将下游的观察者observer传入进去
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
    
            //调用下游的observer的onSubscribe方法,这里我们可以看见,下游的onSubscribe还是在当前线程执行的
            observer.onSubscribe(parent);
            //这句代码的本质就是将一个实现了Runnable接口的SubscribeTask放到线程池去执行
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    
        static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    
            final Observer<? super T> downstream;
    
            final AtomicReference<Disposable> upstream;
    
            SubscribeOnObserver(Observer<? super T> downstream) {
                this.downstream = downstream;
                this.upstream = new AtomicReference<>();
            }
    
            @Override
            public void onSubscribe(Disposable d) {
                DisposableHelper.setOnce(this.upstream, d);
            }
    
            @Override
            public void onNext(T t) {
                downstream.onNext(t);
            }
    
            @Override
            public void onError(Throwable t) {
                downstream.onError(t);
            }
    
            @Override
            public void onComplete() {
                downstream.onComplete();
            }
    
            @Override
            public void dispose() {
                DisposableHelper.dispose(upstream);
                DisposableHelper.dispose(this);
            }
    
            @Override
            public boolean isDisposed() {
                return DisposableHelper.isDisposed(get());
            }
    
            void setDisposable(Disposable d) {
                DisposableHelper.setOnce(this, d);
            }
        }
    
        final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver<T> parent;
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }
    }
    

    当执行了subscribeActual()方法时里面包装了一个SubscribeOnObserver(观察者),并将下游的observer作为成员变量。并创建一个SubscribeTask对象,将这个对象交由scheduler去执行。scheduler就是Schedulers.io()SubscribeTask 本身就一个实现了 Runnable接口的类,当线程开始实行的时候就会执行run()方法中的 source.subscribe(parent),所以我们才可以将其放到IO线程去执行。剩下的onNext(),__ onError()__,onComplete()类似都是调用下游的observer(观察者对象)。
    从上面的源代码中我们明白了,不管subscribeOn()执行了多少次只会以第一次为准。

    下面我们在看一下scheduler

        public static Scheduler io() {
            return RxJavaPlugins.onIoScheduler(IO);
        }
    

    当我们在调用Schedulers.io()时返回是一个IO的成员变成,它其实是在静态代码块中进行初始化的。

        static {
            SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
    
            COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
    
            //初始化IO
            IO = RxJavaPlugins.initIoScheduler(new IOTask());
    
            TRAMPOLINE = TrampolineScheduler.instance();
    
            NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
        }
    

    下面的initIoScheduler()就是返回一个SchedulercallRequireNonNull(defaultScheduler);方法的实质就是获取Scheduler,最后通过new了一个IoScheduler对象。

        @NonNull
        public static Scheduler initIoScheduler(@NonNull Supplier<Scheduler> defaultScheduler) {
            Objects.requireNonNull(defaultScheduler, "Scheduler Supplier can't be null");
            Function<? super Supplier<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
            if (f == null) {
                return callRequireNonNull(defaultScheduler);
            }
            return applyRequireNonNull(f, defaultScheduler);
        }
    
     static final class IOTask implements Supplier<Scheduler> {
            @Override
            public Scheduler get() {
                return IoHolder.DEFAULT;
            }
        }
    
       static final class IoHolder {
            static final Scheduler DEFAULT = new IoScheduler();
        }
    

    小总结

    • subscribeOn(Schedulers.io())的实质就是将被观察者(Observable)放入到线程池中去执行订阅source.subscribe(parent)

    observeOn操作符源码解读

     Observable<Integer> observableObserveOn = observableSubscribeOn.observeOn(AndroidSchedulers.mainThread());
    

    通过上面我们关于subscribeOn()源码解读我们知道其实质就是将subscribe()放入到子线程去执行,所以也能猜到observeOn()就是将onNext()放入到主线程去执行,下面我们来看一下源码。

        public final Observable<T> observeOn(@NonNull Scheduler scheduler, boolean delayError, int bufferSize) {
            Objects.requireNonNull(scheduler, "scheduler is null");
            ObjectHelper.verifyPositive(bufferSize, "bufferSize");
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<>(this, scheduler, delayError, bufferSize));
        }
    

    observeOn()里面创建了一个ObservableObserveOn对象,它同样也是一个被观察者继承至Observable,并且显示了Runnable接口。

    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
        final boolean delayError;
        final int bufferSize;
        public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
            super(source);
            this.scheduler = scheduler;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                //通过我们传入的scheduler,获取Worker 
                Scheduler.Worker w = scheduler.createWorker();
                //订阅是在当前线程执行的
                source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
            }
        }
    
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
        implements Observer<T>, Runnable {
       ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
                this.downstream = actual;
                this.worker = worker;
                this.delayError = delayError;
                this.bufferSize = bufferSize;
            }
    
            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != QueueDisposable.ASYNC) {
                    queue.offer(t);
                }
                schedule();
            }
            @Override
            public void onError(Throwable t) {
                if (done) {
                    RxJavaPlugins.onError(t);
                    return;
                }
                error = t;
                done = true;
                schedule();
            }
    
            @Override
            public void onComplete() {
                if (done) {
                    return;
                }
                done = true;
                schedule();
            }
            void schedule() {
                if (getAndIncrement() == 0) {
                    worker.schedule(this);
                }
            }
            @Override
            public void run() {
                if (outputFused) {
                    drainFused();
                } else {
                    drainNormal();
                }
            }
      }
    }
    

    ObserveOnObserver收到上游的被观察者信息,调用onNext(),__ onError()__等方法时,就会将其放入到线程中去执行。关于drainFused()drainNormal()的本质就下游的onNext()方法放入到主线程去执行(因为这两个方法行数有点多,我没有放入到上面的代码中去)。

    小总结

    • 我们看见observeOn()操作符和前面的一样都是将其封装成一个继承至ObservableObservableObserveOn对象。
    • 在其内部通过subscribeActual()订阅上游的被观察者,并且将下游的Observer(观察者)和Scheduler.Worker包装到它的静态内部类中(ObserveOnObserver)。
    • 当上游的被观察发生变化的时候就会调用schedule()方法,将下游的onNext()onError()等回调放入到observeOn()中传入的线程中去执行。

    最后

    RxJava的本质就是观察者模式,不同的操作符都是返回一个被观察者(Observable),并且将下游的观察者封装到另一个Observer中(如:将下游的Observer封装到MapObserver中),操作符能完成特定的功能是因为特定的ObservableObserversubscribeActual()onNext()onSubscribe(),onError()等方法中完成了具体的实现。

    相关文章

      网友评论

          本文标题:RxJava3.0源码解读

          本文链接:https://www.haomeiwen.com/subject/fhfmzktx.html