美文网首页
Retrofit2+rxjava2源码解析(二):rxjava2

Retrofit2+rxjava2源码解析(二):rxjava2

作者: CDF_cc7d | 来源:发表于2018-08-08 22:14 被阅读0次

    上一篇讲了retrofit2的原理,这一篇咱们重点讲讲rxjava2的实现原理。不过呢,由于rxjava2博大精深,这里篇幅有限,而且精力有限,所以这里只讲以下几个点:
    1.上游被观察者Observable的创建
    2.subscribeOn如何初始化调度线程者
    3.observeOn如何初始化调度线程者
    4.下游观察者的创建,上游被观察者的订阅事件,subscribeOn如何调度上游被观察者线程以及observeOn如何调度下游观察者线程

            //第一步
            Observable.create(new ObservableOnSubscribe<String>() {
                @Override
                public void subscribe(ObservableEmitter<String> e) throws Exception {
                    //log1
                    LogUtil.d("subscribe : " + Thread.currentThread().getName());
                    e.onNext("a");
                    e.onComplete();
                }
            }).subscribeOn(Schedulers.io())//第二步
            .observeOn(AndroidSchedulers.mainThread())//第三步
            .subscribe(new Observer<String>() {
    
                @Override
                public void onSubscribe(Disposable d) {
                     //log2
                    LogUtil.d("onSubscribe : " + Thread.currentThread().getName());
                }
    
                @Override
                public void onNext(String str) {
                     //log3
                    LogUtil.d("onSubscribe : " + Thread.currentThread().getName());
                }
    
                @Override
                public void onError(Throwable e) {
                     //log4
                    LogUtil.d("onSubscribe : " + Thread.currentThread().getName());
                }
    
                @Override
                public void onComplete() {
                     //log5
                    LogUtil.d("onSubscribe : " + Thread.currentThread().getName());
                }
            });//第四步
    

    一、上游被观察者Observable的创建:

    首先进入Observable的create方法里面去:

        public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
            ObjectHelper.requireNonNull(source, "source is null");
            return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
        }
    
    public final class RxJavaPlugins {
        ...代码省略...
        /**
         * Calls the associated hook function.
         * @param <T> the value type
         * @param source the hook's input value
         * @return the value returned by the hook
         */
        @SuppressWarnings({ "rawtypes", "unchecked" })
        @NonNull
        public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
            Function<? super Observable, ? extends Observable> f = onObservableAssembly;
            if (f != null) {
                return apply(f, source);
            }
            return source;
        }
    
        ...代码省略...
    }
    

    RxJavaPlugins.onAssembly 是个hook方法,在这里不做详细介绍,我们可以

    RxJavaPlugins.onAssembly(new ObservableCreate<T>(source))
    

    直接看成

    new ObservableCreate<T>(source)
    

    就可以了。
    下面很多地方会有类似的调用,都可以以这种形式看待,就不在说明了。
    这里就返回了ObservableCreate的对象,然后把source传了进来,这里的source便是一开始new出来的ObservableOnSubscribe对象。

    二、subscribeOn如何初始化调度线程者

        public final Observable<T> subscribeOn(Scheduler scheduler) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
    

    1.我们可以看出此时返回的是一个ObservableSubscribeOn对象,参数一个是this说明是第一步new出来的ObservableCreate对象,这个就是当前对象里面的source变量,第二个参数是scheduler即Schedulers.io()。
    2.那么Schedulers.io()到底是什么呢,一起去看看吧:

        @NonNull
        public static Scheduler io() {
            return RxJavaPlugins.onIoScheduler(IO);
        }
    
    public final class RxJavaPlugins {
        ...代码省略...
    
         /**
         * Calls the associated hook function.
         * @param defaultScheduler the hook's input value
         * @return the value returned by the hook
         */
        @NonNull
        public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
            Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
            if (f == null) {
                return defaultScheduler;
            }
            return apply(f, defaultScheduler);
        }
    
        ...代码省略...
    }
    

    其实这也是个hook方法,所以返回的就是IO。再来看下IO是个什么东西

        static {
            SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
    
            COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
    
            IO = RxJavaPlugins.initIoScheduler(new IOTask());
    
            TRAMPOLINE = TrampolineScheduler.instance();
    
            NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
        }
    

    上图所知,这是个静态块,在程序加载的时候就会初始化出来

        /**
         * Calls the associated hook function.
         * @param defaultScheduler a {@link Callable} which returns the hook's input value
         * @return the value returned by the hook, not null
         * @throws NullPointerException if the callable parameter or its result are null
         */
        @NonNull
        public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
            ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
            Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
            if (f == null) {
                return callRequireNonNull(defaultScheduler);
            }
            return applyRequireNonNull(f, defaultScheduler);
        }
    

    然后根据hook function可以得知返回的Scheduler就是
    callRequireNonNull(defaultScheduler)返回的对象。这个defaultScheduler是个IOTask对象,看下面代码

        static final class IOTask implements Callable<Scheduler> {
            @Override
            public Scheduler call() throws Exception {
                return IoHolder.DEFAULT;
            }
        }
    
        static final class IoHolder {
            static final Scheduler DEFAULT = new IoScheduler();
        }
    

    IOTask有个重写方法call,返回的是IoHolder.DEFAULT即IoScheduler。那么这个call方法是什么时候调用的呢,其实就是上面的callRequireNonNull方法里面调用。

    所以可以得出Schedulers.io()获取到的就是IoSchedulers对象。

    三、observeOn如何初始化调度线程者

        @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> observeOn(Scheduler scheduler) {
            return observeOn(scheduler, false, bufferSize());
        }
      
        @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            ObjectHelper.verifyPositive(bufferSize, "bufferSize");
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
        }
    
    

    1.可以看出来这里返回的是一个ObservableObserveOn对象,这里的this参数就是第二步的ObservableSubscribeOn对象,这个就是当前对象里面的source变量。而这个scheduler参数就是传入的AndroidSchedulers.mainThread(),看名字大概能猜出来这是个主线程调度者
    2.那么就随我进入AndroidSchedulers的代码里一探究竟吧:

    public final class AndroidSchedulers {
    
        private static final class MainHolder {
    
            static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
        }
    
        private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
                new Callable<Scheduler>() {
                    @Override public Scheduler call() throws Exception {
                        return MainHolder.DEFAULT;
                    }
                });
    
        /** A {@link Scheduler} which executes actions on the Android main thread. */
        public static Scheduler mainThread() {
            return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
        }
      ...代码省略...
    }
    

    mainThread方法里面返回的是MAIN_THREAD,而这个MAIN_THREAD是通过RxAndroidPlugins.initMainThreadScheduler返回的MainHolder.DEFAULT对象,也就是HandlerScheduler对象。这个对象传入了主线程的handler,用来调度线程。

    四、下游观察者的创建,上游被观察者的订阅事件,subscribeOn如何调度上游被观察者线程以及observeOn如何调度下游观察者线程

    最关键的一步来了,先上代码:

        @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    

    1.ObjectHelper.requireNonNull已经出现了很多次,其实就是为了判断是否为空用的
    2.这里的observer就是传入的观察者,即刚刚在最外层new出来的observer。
    3.这个时候进入subscribeActual方法,发现这是个抽象方法,那么是谁实现了这个方法呢。还记得我们现在的subscribe是谁调用的嘛,没错就是第三步返回的ObservableObserveOn对象,所以ObservableObserveOn实现了subscribeActual方法:

        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                Scheduler.Worker w = scheduler.createWorker();
    
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    

    这里的source就是之前步骤3.1保存的ObservableSubscribeOn对象,而observer就是最外层传入的观察者对象,由此可以看出观察者被内部类ObserveOnObserver进行了装饰。
    w就是handler线程调度者创建的工作者主要用来将ObserveOnObserver(实现了runnable接口)通过handler发送到主线程上面去,从而实现线程调度
    4.这个时候要调用source的subscribe了,跟ObservableObserveOn类似,之后进入到ObservableSubscribeOn的subscribeActual方法中:

        @Override
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
            s.onSubscribe(parent);
    
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    

    这里的s就是步骤4.3new出来的ObserveOnObserver对象,这里将s装饰到SubscribeOnObserver对象中,然后调用ObserveOnObserver的onSubscribe方法 ↓↓↓

            @Override
            public void onSubscribe(Disposable s) {
                    ...代码省略...
                    actual.onSubscribe(this);
                }
            }
    

    这里的actual其实就是最外层的观察者,然后就会调用最外层观察者的onSubscribe方法,此时由于并没有做任何的线程调度,所以当前的操作处于主线程中。因此log5出打出来的线程应该是主线程

    好了,我们再回到subscribeActual方法中,此时调用scheduler.scheduleDirect方法,将SubscribeTask对象传入:

        @NonNull
        public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            final Worker w = createWorker();
    
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            DisposeTask task = new DisposeTask(decoratedRun, w);
    
            w.schedule(task, delay, unit);
    
            return task;
        }
    

    run就是传入SubscribeTask的对象,将SubscribeTask的对象装饰到DisposeTask中,
    w是IO线程调度者创建的切换线程的实际工作者,此时调用createWorker就会执行IoScheduler的createWork方法new 出EventLoopWorker对象:

        @NonNull
        @Override
        public Worker createWorker() {
            return new EventLoopWorker(pool.get());
        }
    

    而w.schedule也就是执行了EventLoopWorker的schedule方法,将DisposeTask传入:

            @NonNull
            @Override
            public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
                if (tasks.isDisposed()) {
                    // don't schedule, we are unsubscribed
                    return EmptyDisposable.INSTANCE;
                }
    
                return threadWorker.scheduleActual(action, delayTime, unit, tasks);
            }
    

    接着调用threadWorker里面的scheduleActual方法:

       @NonNull
        public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
            if (parent != null) {
                if (!parent.add(sr)) {
                    return sr;
                }
            }
    
            Future<?> f;
            try {
                if (delayTime <= 0) {
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                if (parent != null) {
                    parent.remove(sr);
                }
                RxJavaPlugins.onError(ex);
            }
    
            return sr;
        }
    

    因为之前传入的delayTime是0,所以此时会走 f = executor.submit((Callable<Object>)sr);这条线。然后我们看下executor是什么时候初始化的:

        public NewThreadWorker(ThreadFactory threadFactory) {
            executor = SchedulerPoolFactory.create(threadFactory);
        }
        ...代码省略...
          /**
         * Creates a ScheduledExecutorService with the given factory.
         * @param factory the thread factory
         * @return the ScheduledExecutorService
         */
        public static ScheduledExecutorService create(ThreadFactory factory) {
            final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
            if (exec instanceof ScheduledThreadPoolExecutor) {
                ScheduledThreadPoolExecutor e = (ScheduledThreadPoolExecutor) exec;
                POOLS.put(e, exec);
            }
            return exec;
        }
    

    由此可以得出这个executor是ScheduledExecutorService对象,然后调用ScheduledExecutorService的submit方法:

       public Future<?> submit(Runnable task) {
            return schedule(task, 0, NANOSECONDS);
        }
    ...代码省略...
        /**
         * @throws RejectedExecutionException {@inheritDoc}
         * @throws NullPointerException       {@inheritDoc}
         */
        public ScheduledFuture<?> schedule(Runnable command,
                                           long delay,
                                           TimeUnit unit) {
            if (command == null || unit == null)
                throw new NullPointerException();
            RunnableScheduledFuture<Void> t = decorateTask(command,
                new ScheduledFutureTask<Void>(command, null,
                                              triggerTime(delay, unit),
                                              sequencer.getAndIncrement()));
            delayedExecute(t);
            return t;
        }
    

    这里先是ScheduledFutureTask的初始化,将command传入,然后调用其父类的构造方法->调用Executors.callable方法返回一个RunnableAdapter对象(将command传入构造方法内,赋值给task),然后赋值给callable对象。
    然后调用delayedExecute以后就会执行ScheduledFutureTask的run方法,调用父类的run方法:

        public void run() {
            if (state != NEW ||
                !U.compareAndSwapObject(this, RUNNER, null, Thread.currentThread()))
                return;
            try {
                Callable<V> c = callable;
                if (c != null && state == NEW) {
                    V result;
                    boolean ran;
                    try {
                        result = c.call();
                        ran = true;
                    } catch (Throwable ex) {
                        result = null;
                        ran = false;
                        setException(ex);
                    }
                    if (ran)
                        set(result);
                }
            } finally {
                // runner must be non-null until state is settled to
                // prevent concurrent calls to run()
                runner = null;
                // state must be re-read after nulling runner to prevent
                // leaked interrupts
                int s = state;
                if (s >= INTERRUPTING)
                    handlePossibleCancellationInterrupt(s);
            }
        }
    

    接着就调用callable的call方法,这个callable就是上面提到的RunnableAdapter对象,调用RunnableAdapter的call方法->task.run,此处的task便是传入的command。
    那么往前追溯到NewThreadWorker的schedule中,发现传入到ScheduledThreadPoolExecutor中的command是ScheduledRunnable对象,也就是说调用了ScheduledRunnable.run->DisposeTask.run->SubscribeTask.run->SubscribeOnObserver.run->source.subscribe(parent);
    所以上面的一系列操作全部都在子线程中执行

    SubscribeOnObserver里面的source是之前保存的ObservableSubscribeOn的ObservableCreate对象。
    parent是装饰了下游ObservableObserveOn的内部类对象(装饰了外部观察者)的ObservableSubscribeOn的内部类,这句话比较拗口,多读几次理解一下。

    5.这时候就执行到ObservableCreate的subscribeActual方法了:

        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            CreateEmitter<T> parent = new CreateEmitter<T>(observer);
            observer.onSubscribe(parent);
    
            try {
                source.subscribe(parent);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                parent.onError(ex);
            }
        }
    

    observer是装饰了下游ObservableSubscribeOn的内部类SubscribeOnObserver对象,该对象装饰了他下游ObservableObserveOn的内部类ObserveOnObserver对象(该对象装饰了外部观察者)如下图所示:


    observer.png

    source是一开始在创建ObservableCreate时就传入的外部被观察者对象
    因此,最终source.subscribe(parent)会调用最外层被观察者的subscribe方法,因此log1打印出来的线程应该处于子线程中,

                    //log1
                    LogUtil.d("subscribe : " + Thread.currentThread().getName());
                    e.onNext("a");
                    e.onComplete();
    

    e就是传入的CreateEmitter对象,调用createEmitter的onNext方法:

            @Override
            public void onNext(T t) {
                if (t == null) {
                    onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                    return;
                }
                if (!isDisposed()) {
                    observer.onNext(t);
                }
            }
    

    会依次调用observer.png所展示的对象的onNext方法,重点看下ObservableObserveOn.ObserveOnObserver的onNext方法:

            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != QueueDisposable.ASYNC) {
                    queue.offer(t);
                }
                schedule();
            }
    
            void schedule() {
                if (getAndIncrement() == 0) {
                    worker.schedule(this);
                }
            }
    

    worker就是上面步骤4.3里面调用HandlerScheduler.createWorker方法new出来的HandlerWorker对象,调用HandlerWorker的schedule方法:

            @Override
            public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
                if (run == null) throw new NullPointerException("run == null");
                if (unit == null) throw new NullPointerException("unit == null");
    
                if (disposed) {
                    return Disposables.disposed();
                }
    
                run = RxJavaPlugins.onSchedule(run);
    
                ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    
                Message message = Message.obtain(handler, scheduled);
                message.obj = this; // Used as token for batch disposal of this worker's runnables.
    
                handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
    
                // Re-check disposed state for removing in case we were racing a call to dispose().
                if (disposed) {
                    handler.removeCallbacks(scheduled);
                    return Disposables.disposed();
                }
    
                return scheduled;
            }
    

    将runnable封装到消息对象里面发送给主线程,主线程执行run方法,从而完成了线程调度,这时操作处理已经进入到了主线程当中执行ScheduledRunnable的run方法:

            @Override
            public void run() {
                try {
                    delegate.run();
                } catch (Throwable t) {
                    IllegalStateException ie =
                        new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
                    RxJavaPlugins.onError(ie);
                    Thread thread = Thread.currentThread();
                    thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
                }
            }
    

    这里的delegate就是ObservableObserveOn.ObserveOnObserver对象,所以重新进入ObservableObserveOn.ObserveOnObserver代码的run方法中:

            @Override
            public void run() {
                if (outputFused) {
                    drainFused();
                } else {
                    drainNormal();
                }
            }
    
            void drainNormal() {
                int missed = 1;
    
                final SimpleQueue<T> q = queue;
                final Observer<? super T> a = actual;
    
                for (;;) {
                    if (checkTerminated(done, q.isEmpty(), a)) {
                        return;
                    }
    
                    for (;;) {
                        boolean d = done;
                        T v;
    
                        try {
                            v = q.poll();
                        } catch (Throwable ex) {
                            Exceptions.throwIfFatal(ex);
                            s.dispose();
                            q.clear();
                            a.onError(ex);
                            worker.dispose();
                            return;
                        }
                        boolean empty = v == null;
    
                        if (checkTerminated(d, empty, a)) {
                            return;
                        }
    
                        if (empty) {
                            break;
                        }
    
                        a.onNext(v);
                    }
    
                    missed = addAndGet(-missed);
                    if (missed == 0) {
                        break;
                    }
                }
            }
    

    run-> drainNormal->actual.onNext(v);
    这里的actual便是最外层的观察者,所以观察者里面的log3,log4,log5都会是在主线程中打印出来。

    接下来我们打印出来看看:

    08-08 14:09:03.138 4928-4928/com.zkyl.zkyl D/(MainActivity.java:57): onSubscribe  onSubscribe : main
    08-08 14:09:03.142 4928-4974/com.zkyl.zkyl D/(MainActivity.java:47): subscribe  subscribe : RxCachedThreadScheduler-1
    08-08 14:09:03.162 4928-4928/com.zkyl.zkyl D/(MainActivity.java:62): onNext  onSubscribe : main
    08-08 14:09:03.162 4928-4928/com.zkyl.zkyl D/(MainActivity.java:72): onComplete  onSubscribe : main
    

    确实正如代码分析一般只有log1是处于子线程中,其他都处于主线程当中。

    感想

    rxjava2的设计实在是过于庞大,精髓远远不止这些。研究到这里已经非常累了,所以rxjava2的研究道路还是非常遥远漫长的

    相关文章

      网友评论

          本文标题:Retrofit2+rxjava2源码解析(二):rxjava2

          本文链接:https://www.haomeiwen.com/subject/xwztbftx.html