美文网首页
Rxjava的线程调度源码解析

Rxjava的线程调度源码解析

作者: nmssdmf | 来源:发表于2019-03-26 10:57 被阅读0次

    代码调用

            Observable.just(1)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
    
                        }
                    });
    

    直接进入主题,先看subscribe中调用了哪些方法

        //Observable.java
        public final Disposable subscribe(Consumer<? super T> onNext) {
            return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
        }
    
        public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
                Action onComplete, Consumer<? super Disposable> onSubscribe) {
            ObjectHelper.requireNonNull(onNext, "onNext is null");
            ObjectHelper.requireNonNull(onError, "onError is null");
            ObjectHelper.requireNonNull(onComplete, "onComplete is null");
            ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
    
            LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
    
            subscribe(ls);
    
            return ls;
        }
    
        public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
    
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
        //最终调用了Observable的subscribeActual方法
        protected abstract void subscribeActual(Observer<? super T> observer);
    

    接下来我们看下subscribeOn方法中进行了什么操作

        //Observable.java
        public final Observable<T> subscribeOn(Scheduler scheduler) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            //这里返回了一个ObservableSubscribeOn对象,参考RxJavaPlugins.onAssembly方法,
            //返回的值就是传入的值,再根据流式调用,
            //即上面分析调用的subscribeActual方法,即是ObservableSubscribeOn的subscribeActual方法
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
    

    接下来我们看ObservableSubscribeOn的subscribeActual方法

    //ObservableSubscribeOn.java
        public void subscribeActual(final Observer<? super T> observer) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
    
            observer.onSubscribe(parent);
            //这里生成了一个SubscribeTask,查看源码可知实现了Runnable接口
            //这里调用了scheduler.scheduleDirect
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    

    看下scheduler.scheduleDirect,再次之前,我们先看下传入的Scheduler.io
    查看传入的Schedule

        public static Scheduler io() {
            //  这里查看下IO
            return RxJavaPlugins.onIoScheduler(IO);
        }
     //new IOTask
    IO = RxJavaPlugins.initIoScheduler(new IOTask());
    
        static final class IOTask implements Callable<Scheduler> {
            @Override
            public Scheduler call() throws Exception {
                return IoHolder.DEFAULT;
            }
        //由此可见,最后Schedulers.io就是IoScheduler
        static final class IoHolder {
            static final Scheduler DEFAULT = new IoScheduler();
        }
    
        //scheduler
        public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            //这里生成一个Worker,但是createWorker是一个虚方法,有上可知
            //这里调用了IoScheduler.createWorker,生成EventLoopWorker对象
            final Worker w = createWorker();
    
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            
            DisposeTask task = new DisposeTask(decoratedRun, w);
            //调用了EventLoopWorker.schedule
            w.schedule(task, delay, unit);
    
            return task;
        }
    

    接下来看EventLoopWorker

            public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
                //取消注册
                if (tasks.isDisposed()) {
                    // don't schedule, we are unsubscribed
                    return EmptyDisposable.INSTANCE;
                }
                //NewThreadWorker.scheduleActual
                return threadWorker.scheduleActual(action, delayTime, unit, tasks);
            }
    

    真正进入线程调度的代码,在NewThreadWorker中

        public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
            if (parent != null) {
                if (!parent.add(sr)) {
                    return sr;
                }
            }
    
            Future<?> f;
            try {
                if (delayTime <= 0) {
                    //executor是一个线程池
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    //存在延迟的
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                if (parent != null) {
                    parent.remove(sr);
                }
                RxJavaPlugins.onError(ex);
            }
    
            return sr;
        }
    

    所以到最后,真正进行线程调度的,其实是一个线程池,看完了subscribeOn,我们再来看看observeOn,首先看下AndroidSchedulers.mainThread()到底是哪个线程

        public static Scheduler mainThread() {
            return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
        }
    
        private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
                new Callable<Scheduler>() {
                    @Override public Scheduler call() throws Exception {
                        return MainHolder.DEFAULT;
                    }
                });
    
        private static final class MainHolder {
            static final Scheduler DEFAULT
                //从Looper.getMainLooper()可以看出,这里是获取了主线程的Looper
                = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
        }
    

    好确定了这个问题,我们再继续往下看

        public final Observable<T> observeOn(Scheduler scheduler) {
            return observeOn(scheduler, false, bufferSize());
        }
    
        public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            ObjectHelper.verifyPositive(bufferSize, "bufferSize");
            //生成一个新的ObservableObserverOn对象
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
        }
    

    接下去看ObservableObserveOn对象

        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                //跟之前一样还是调用createWorker,从上面代码可知调用了HandlerScheduler.createWorker返回HandlerWorker
                Scheduler.Worker w = scheduler.createWorker();
                //这里有一个内部类对象ObserveOnObserver
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    

    //内部类ObserveOnObserver,以下是回调方法

            public void onSubscribe(Disposable d) {
                if (DisposableHelper.validate(this.upstream, d)) {
                    this.upstream = d;
                    if (d instanceof QueueDisposable) {
                        @SuppressWarnings("unchecked")
                        QueueDisposable<T> qd = (QueueDisposable<T>) d;
    
                        int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
    
                        if (m == QueueDisposable.SYNC) {
                            sourceMode = m;
                            queue = qd;
                            done = true;
                            downstream.onSubscribe(this);
                            //调用schedule
                            schedule();
                            return;
                        }
                        if (m == QueueDisposable.ASYNC) {
                            sourceMode = m;
                            queue = qd;
                            downstream.onSubscribe(this);
                            return;
                        }
                    }
    
                    queue = new SpscLinkedArrayQueue<T>(bufferSize);
    
                    downstream.onSubscribe(this);
                }
            }
    
            @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != QueueDisposable.ASYNC) {
                    queue.offer(t);
                }
                //调用schedule
                schedule();
            }
    
            @Override
            public void onError(Throwable t) {
                if (done) {
                    RxJavaPlugins.onError(t);
                    return;
                }
                error = t;
                done = true;
                //调用schedule
                schedule();
            }
    
            @Override
            public void onComplete() {
                if (done) {
                    return;
                }
                done = true;
                 //调用schedule
                schedule();
            }
    
            void schedule() {
                if (getAndIncrement() == 0) {
                    //所以当回调的时候,最终是调用了worker.schedule
                    worker.schedule(this);
                }
            }
    
    //最终看一下HandlerWorker的schedule方法,一看源码,老朋友了,Handler就不解释了
            public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
                if (run == null) throw new NullPointerException("run == null");
                if (unit == null) throw new NullPointerException("unit == null");
    
                if (disposed) {
                    return Disposables.disposed();
                }
    
                run = RxJavaPlugins.onSchedule(run);
    
                ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    
                Message message = Message.obtain(handler, scheduled);
                message.obj = this; // Used as token for batch disposal of this worker's runnables.
    
                if (async) {
                    message.setAsynchronous(true);
                }
    
                handler.sendMessageDelayed(message, unit.toMillis(delay));
    
                // Re-check disposed state for removing in case we were racing a call to dispose().
                if (disposed) {
                    handler.removeCallbacks(scheduled);
                    return Disposables.disposed();
                }
    
                return scheduled;
            }
    

    相关文章

      网友评论

          本文标题:Rxjava的线程调度源码解析

          本文链接:https://www.haomeiwen.com/subject/ksmwvqtx.html