美文网首页
RxJava2.0源码详解

RxJava2.0源码详解

作者: 星云春风 | 来源:发表于2020-05-11 17:27 被阅读0次

    RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库,而且是链式调用、逻辑简洁 。

    这里是基于RxJava2.0源码,以这个调用顺序来进行讲解,其他的操作符也大体基本差不多。根据源码可以多看看。

        Observable.just(1)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<Integer>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(Integer value) {
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    
    1. Observable是被观察者,调用just()方法,会返回一个经过钩子函数(Hook)调用的被观察者ObservableJust,
     public static <T> Observable<T> just(T item) {
            ObjectHelper.requireNonNull(item, "The item is null");
            // 钩子函数调用
            return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
        }
    
    1. ObservableJust 会调用父类的subscribeOn(Schedulers.io())方法指定被观察者线程,同样经过钩子函数返回一个被观测者ObservableSubscribeOn,
     public final Observable<T> subscribeOn(Scheduler scheduler) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
          // 这个this 就是ObservableJust,把这个被观察者赋值给父类,并传入线程类型
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
    
        static final class IoHolder {
            // 这个就是Schedulers类中的线程类型Schedulers.io()
            static final Scheduler DEFAULT = new IoScheduler();
        }
    
    1. ObservableSubscribeOn调用observeOn(AndroidSchedulers.mainThread())指定观察者线程类型。返回经过钩子函数调用的ObservableObserveOn,传入的参数是,ObservableSubscribeOn这个被观察者,观察者的线程类型AndroidSchedulers.mainThread(),是否延时错误delayError,缓冲区大小bufferSize
      public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            ObjectHelper.verifyPositive(bufferSize, "bufferSize");
          /**
          *  this 是ObservableSubscribeOn
          *  scheduler是AndroidSchedulers.mainThread()
          * delayError是延时错误(onError延时调用)
          * bufferSize是缓冲区大小
          */
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
        }
    
    • AndroidSchedulers.mainThread()其实就是一个handler
      private static final class MainHolder {
            //这个就是AndroidSchedulers类中的AndroidSchedulers.mainThread()
            static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
        }
    
        private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
                new Callable<Scheduler>() {
                    @Override public Scheduler call() throws Exception {
                        return MainHolder.DEFAULT;
                    }
                });
    
    
    
    1. ObservableObserveOn调用subscribe(Observer)方法,传入ObservableObserveOn被观察者和Observer观察者对象调用钩子函数处理得到一个observer ,Observer是一个接口,通过subscribeActual(observer)订阅观察者
    public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
              // 这里的this 就是ObservableObserveOn,调用钩子函数重新得到一个observer 
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
                // 这个方法就是订阅观察者的方法
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    
    1. ObservableObserveOn中调用subscribeActual(observer)方法,这里对观察者进行了封装,参数的观察者是内部类ObserveOnObserver。(其实这里不同的被观察者都会有这样的一个内部类)
     @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
              //创建一个线程,这里是传入的AndroidSchedulers.mainThread()
                Scheduler.Worker w = scheduler.createWorker();
                //调用这个subscribe方法,传入一个观察者(observer),delayError是延时错误(onError延时调用)
                 // bufferSize是缓冲区大小
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    
    1. 实现的Observer接口的内容在这个内部类ObserveOnObserver中,
    • onSubscribe方法
       @Override
            public void onSubscribe(Disposable s) {
                if (DisposableHelper.validate(this.s, s)) {
                    this.s = s;
                    if (s instanceof QueueDisposable) {
                        @SuppressWarnings("unchecked")
                        QueueDisposable<T> qd = (QueueDisposable<T>) s;
    
                        int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
    
                        if (m == QueueDisposable.SYNC) {
                            sourceMode = m;
                            queue = qd;
                            done = true;
                            actual.onSubscribe(this);
                            schedule();
                            return;
                        }
                        if (m == QueueDisposable.ASYNC) {
                            sourceMode = m;
                            queue = qd;
                            actual.onSubscribe(this);
                            return;
                        }
                    }
    
                    queue = new SpscLinkedArrayQueue<T>(bufferSize);
    
                    actual.onSubscribe(this);
                }
            }
    
    • onNext
     @Override
            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != QueueDisposable.ASYNC) {
                    queue.offer(t);
                }
                schedule();
            } 
    
    • onError
      @Override
            public void onError(Throwable t) {
                if (done) {
                    RxJavaPlugins.onError(t);
                    return;
                }
                error = t;
                done = true;
                schedule();
            }
    
    • onComplete
       @Override
            public void onComplete() {
                if (done) {
                    return;
                }
                done = true;
                schedule();
            }
    
    • dispose
     @Override
            public void dispose() {
                if (!cancelled) {
                    cancelled = true;
                    s.dispose();
                    worker.dispose();
                    if (getAndIncrement() == 0) {
                        queue.clear();
                    }
                }
            }
    
    
    • 这些方法都会调用schedule()方法,worker 就是传入的创建线程的对象Scheduler
      void schedule() {
                if (getAndIncrement() == 0) {
                    worker.schedule(this);
                }
            }
    

    subscribeOn和observeOn的线程切换

    先看subscribeOn的流程

    1. 在subscribeOn(Schedulers.io())方法中传入的Schedulers这个类中有很多不同的线程对象,比如IO,SINGLE,COMPUTATION,TRAMPOLINE,NEW_THREAD。这些都是抽象类Scheduler。我们在代码中使用的时候调用的subscribe()方法中会调用subscribeActual(final Observer<? super T> s)方法,我们以ObservableSubscribeOn中的这个方法来举例,
    @Override
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
            s.onSubscribe(parent);
          // 这里就是调用的Scheduler中的scheduleDirect方法,这个parent会在后面使用,可以注意下
        //parent作为一个dispose
            parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
                @Override
                public void run() {
                    source.subscribe(parent);
                }
            }));
        }
    

    2.在上面中的scheduler.scheduleDirect最终会调用前面提到的创建线程对象的createWorker()方法,但是这个方法是在Scheduler的子类中实现的,比如IoScheduler

     public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
            final Worker w = createWorker();
    
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            w.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        decoratedRun.run();
                    } finally {
                        w.dispose();
                    }
                }
            }, delay, unit);
    
            return w;
        }
    
    

    3.IoScheduler中的createWorker 方法返回的是一个实现Disposable的内部类EventLoopWorker,传入的参数是内部类CachedWorkerPool,这个CachedWorkerPool是一个实现Runnable的类,线程是从这个里面获取的ThreadWorker,

     @Override
        public Worker createWorker() {
            return new EventLoopWorker(pool.get());
        }
    
    
    • EventLoopWorker 类
     static final class EventLoopWorker extends Scheduler.Worker {
            private final CompositeDisposable tasks;
            private final CachedWorkerPool pool;
        
            private final ThreadWorker threadWorker;
    
            final AtomicBoolean once = new AtomicBoolean();
    
            EventLoopWorker(CachedWorkerPool pool) {
                this.pool = pool;
                this.tasks = new CompositeDisposable();
                this.threadWorker = pool.get();
            }
    
            @Override
            public void dispose() {
                if (once.compareAndSet(false, true)) {
                    tasks.dispose();
    
                    // releasing the pool should be the last action
                    // should prevent pool reuse in case there is a blocking
                    // action not responding to cancellation
    //                threadWorker.scheduleDirect(() -> {
    //                    pool.release(threadWorker);
    //                }, 0, TimeUnit.MILLISECONDS);
    
                    pool.release(threadWorker);
                }
            }
    
            @Override
            public boolean isDisposed() {
                return once.get();
            }
    
            @Override
            public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) {
                if (tasks.isDisposed()) {
                    // don't schedule, we are unsubscribed
                    return EmptyDisposable.INSTANCE;
                }
              // 主要是这个里面
                return threadWorker.scheduleActual(action, delayTime, unit, tasks);
            }
        }
    
    
    • ThreadWorker 类
    static final class ThreadWorker extends NewThreadWorker {
            private long expirationTime;
    
            ThreadWorker(ThreadFactory threadFactory) {
                super(threadFactory);
                this.expirationTime = 0L;
            }
    
            public long getExpirationTime() {
                return expirationTime;
            }
    
            public void setExpirationTime(long expirationTime) {
                this.expirationTime = expirationTime;
            }
        }
    
    1. 看看上面的threadWorker.scheduleActual(action, delayTime, unit, tasks)方法
        public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
          // 这个run 就是第一步中的parent  ,是一个观察者SubscribeOnObserver
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            //进一步的封装
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
            if (parent != null) {
                if (!parent.add(sr)) {
                    return sr;
                }
            }
    
            Future<?> f;
            try {
                if (delayTime <= 0) {
                  // 执行线程池ScheduledExecutorService的submit方法 ,然后parent也就是SubscribeOnObserver的        方法会调用
                    f = executor.submit((Callable<Object>)sr);
                } else {  
              
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                parent.remove(sr);
                RxJavaPlugins.onError(ex);
            }
    
            return sr;
        }
    
    • ObservableSubscribeOn中的subscribeActual会调用,也就是Observable对象的的订阅会执行,任务也就在子线程中执行了,线程就切换到了工作线程,这就是subscribeOn(Schedulers.io())的基本流程
      @Override
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
            s.onSubscribe(parent);
    
            parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
                @Override
                public void run() {
                 // Observable对象的的订阅会执行
                    source.subscribe(parent);
                }
            }));
        }
    

    再来看看observeOn(AndroidSchedulers.mainThread())的基本流程

    1. 主要是在ObservableObserveOn中的subscribeActual方法,这里面也会调用createWorker方法,然后执行schedule方法,但是这里涉及到了RxAndroid,在之前提及到AndroidSchedulers.mainThread()是一个Handler
     @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                Scheduler.Worker w = scheduler.createWorker();
    
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    

    这里再看下AndroidSchedulers.mainThread()的值

    
        private static final class MainHolder {
            // 这里的是HandlerScheduler ,是RxAndroid中封装的,继承自RxJava中的Scheduler
            static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
        }
    
    1. HandlerScheduler中调用的createWorker返回的是一个HandlerWorker(handler),参数传入的是一个Handler
     @Override
        public Worker createWorker() {
            return new HandlerWorker(handler);
        }
    
    1. 再看看HandlerWorker 中的schedule方法,这里最终也是调用的传入的线程的run方法,也就是MainHandler的run方法,
     private static final class HandlerWorker extends Worker {
            private final Handler handler;
    
            private volatile boolean disposed;
    
            HandlerWorker(Handler handler) {
                this.handler = handler;
            }
    
            @Override
            public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
                if (run == null) throw new NullPointerException("run == null");
                if (delay < 0) throw new IllegalArgumentException("delay < 0: " + delay);
                if (unit == null) throw new NullPointerException("unit == null");
    
                if (disposed) {
                    return Disposables.disposed();
                }
    
                run = RxJavaPlugins.onSchedule(run);
    
                ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    
                Message message = Message.obtain(handler, scheduled);
                message.obj = this; // Used as token for batch disposal of this worker's runnables.
              // 通过handler发送过去的消息
                handler.sendMessageDelayed(message, unit.toMillis(delay));
    
                // Re-check disposed state for removing in case we were racing a call to dispose().
                if (disposed) {
                    handler.removeCallbacks(scheduled);
                    return Disposables.disposed();
                }
    
                return scheduled;
            }
    
            @Override
            public void dispose() {
                disposed = true;
                handler.removeCallbacksAndMessages(this /* token */);
            }
    
            @Override
            public boolean isDisposed() {
                return disposed;
            }
        }
    
    1. 这里基本切换到主线程的流程也完了,大体都是通过不同的Scheduler(IoScheduler,HandlerScheduler等)来处理不同的schedule()方法进行线程间的切换。

    相关文章

      网友评论

          本文标题:RxJava2.0源码详解

          本文链接:https://www.haomeiwen.com/subject/ksykwhtx.html