美文网首页
RxJava2.0源码详解

RxJava2.0源码详解

作者: 星云春风 | 来源:发表于2020-05-11 17:27 被阅读0次

RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库,而且是链式调用、逻辑简洁 。

这里是基于RxJava2.0源码,以这个调用顺序来进行讲解,其他的操作符也大体基本差不多。根据源码可以多看看。

    Observable.just(1)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new Observer<Integer>() {
                    @Override
                    public void onSubscribe(Disposable d) {

                    }

                    @Override
                    public void onNext(Integer value) {

                    }

                    @Override
                    public void onError(Throwable e) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });
  1. Observable是被观察者,调用just()方法,会返回一个经过钩子函数(Hook)调用的被观察者ObservableJust,
 public static <T> Observable<T> just(T item) {
        ObjectHelper.requireNonNull(item, "The item is null");
        // 钩子函数调用
        return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
    }
  1. ObservableJust 会调用父类的subscribeOn(Schedulers.io())方法指定被观察者线程,同样经过钩子函数返回一个被观测者ObservableSubscribeOn,
 public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
      // 这个this 就是ObservableJust,把这个被观察者赋值给父类,并传入线程类型
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    static final class IoHolder {
        // 这个就是Schedulers类中的线程类型Schedulers.io()
        static final Scheduler DEFAULT = new IoScheduler();
    }
  1. ObservableSubscribeOn调用observeOn(AndroidSchedulers.mainThread())指定观察者线程类型。返回经过钩子函数调用的ObservableObserveOn,传入的参数是,ObservableSubscribeOn这个被观察者,观察者的线程类型AndroidSchedulers.mainThread(),是否延时错误delayError,缓冲区大小bufferSize
  public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
      /**
      *  this 是ObservableSubscribeOn
      *  scheduler是AndroidSchedulers.mainThread()
      * delayError是延时错误(onError延时调用)
      * bufferSize是缓冲区大小
      */
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
  • AndroidSchedulers.mainThread()其实就是一个handler
  private static final class MainHolder {
        //这个就是AndroidSchedulers类中的AndroidSchedulers.mainThread()
        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }

    private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
            new Callable<Scheduler>() {
                @Override public Scheduler call() throws Exception {
                    return MainHolder.DEFAULT;
                }
            });


  1. ObservableObserveOn调用subscribe(Observer)方法,传入ObservableObserveOn被观察者和Observer观察者对象调用钩子函数处理得到一个observer ,Observer是一个接口,通过subscribeActual(observer)订阅观察者
public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
          // 这里的this 就是ObservableObserveOn,调用钩子函数重新得到一个observer 
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            // 这个方法就是订阅观察者的方法
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }
  1. ObservableObserveOn中调用subscribeActual(observer)方法,这里对观察者进行了封装,参数的观察者是内部类ObserveOnObserver。(其实这里不同的被观察者都会有这样的一个内部类)
 @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
          //创建一个线程,这里是传入的AndroidSchedulers.mainThread()
            Scheduler.Worker w = scheduler.createWorker();
            //调用这个subscribe方法,传入一个观察者(observer),delayError是延时错误(onError延时调用)
             // bufferSize是缓冲区大小
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
  1. 实现的Observer接口的内容在这个内部类ObserveOnObserver中,
  • onSubscribe方法
   @Override
        public void onSubscribe(Disposable s) {
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;

                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);

                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        done = true;
                        actual.onSubscribe(this);
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        actual.onSubscribe(this);
                        return;
                    }
                }

                queue = new SpscLinkedArrayQueue<T>(bufferSize);

                actual.onSubscribe(this);
            }
        }
  • onNext
 @Override
        public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        } 
  • onError
  @Override
        public void onError(Throwable t) {
            if (done) {
                RxJavaPlugins.onError(t);
                return;
            }
            error = t;
            done = true;
            schedule();
        }
  • onComplete
   @Override
        public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            schedule();
        }
  • dispose
 @Override
        public void dispose() {
            if (!cancelled) {
                cancelled = true;
                s.dispose();
                worker.dispose();
                if (getAndIncrement() == 0) {
                    queue.clear();
                }
            }
        }

  • 这些方法都会调用schedule()方法,worker 就是传入的创建线程的对象Scheduler
  void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

subscribeOn和observeOn的线程切换

先看subscribeOn的流程

  1. 在subscribeOn(Schedulers.io())方法中传入的Schedulers这个类中有很多不同的线程对象,比如IO,SINGLE,COMPUTATION,TRAMPOLINE,NEW_THREAD。这些都是抽象类Scheduler。我们在代码中使用的时候调用的subscribe()方法中会调用subscribeActual(final Observer<? super T> s)方法,我们以ObservableSubscribeOn中的这个方法来举例,
@Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);
      // 这里就是调用的Scheduler中的scheduleDirect方法,这个parent会在后面使用,可以注意下
    //parent作为一个dispose
        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }));
    }

2.在上面中的scheduler.scheduleDirect最终会调用前面提到的创建线程对象的createWorker()方法,但是这个方法是在Scheduler的子类中实现的,比如IoScheduler

 public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    decoratedRun.run();
                } finally {
                    w.dispose();
                }
            }
        }, delay, unit);

        return w;
    }

3.IoScheduler中的createWorker 方法返回的是一个实现Disposable的内部类EventLoopWorker,传入的参数是内部类CachedWorkerPool,这个CachedWorkerPool是一个实现Runnable的类,线程是从这个里面获取的ThreadWorker,

 @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }

  • EventLoopWorker 类
 static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
    
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @Override
        public void dispose() {
            if (once.compareAndSet(false, true)) {
                tasks.dispose();

                // releasing the pool should be the last action
                // should prevent pool reuse in case there is a blocking
                // action not responding to cancellation
//                threadWorker.scheduleDirect(() -> {
//                    pool.release(threadWorker);
//                }, 0, TimeUnit.MILLISECONDS);

                pool.release(threadWorker);
            }
        }

        @Override
        public boolean isDisposed() {
            return once.get();
        }

        @Override
        public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
          // 主要是这个里面
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }

  • ThreadWorker 类
static final class ThreadWorker extends NewThreadWorker {
        private long expirationTime;

        ThreadWorker(ThreadFactory threadFactory) {
            super(threadFactory);
            this.expirationTime = 0L;
        }

        public long getExpirationTime() {
            return expirationTime;
        }

        public void setExpirationTime(long expirationTime) {
            this.expirationTime = expirationTime;
        }
    }
  1. 看看上面的threadWorker.scheduleActual(action, delayTime, unit, tasks)方法
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
      // 这个run 就是第一步中的parent  ,是一个观察者SubscribeOnObserver
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        //进一步的封装
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
              // 执行线程池ScheduledExecutorService的submit方法 ,然后parent也就是SubscribeOnObserver的        方法会调用
                f = executor.submit((Callable<Object>)sr);
            } else {  
          
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            parent.remove(sr);
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }
  • ObservableSubscribeOn中的subscribeActual会调用,也就是Observable对象的的订阅会执行,任务也就在子线程中执行了,线程就切换到了工作线程,这就是subscribeOn(Schedulers.io())的基本流程
  @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);

        s.onSubscribe(parent);

        parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
            @Override
            public void run() {
             // Observable对象的的订阅会执行
                source.subscribe(parent);
            }
        }));
    }

再来看看observeOn(AndroidSchedulers.mainThread())的基本流程

  1. 主要是在ObservableObserveOn中的subscribeActual方法,这里面也会调用createWorker方法,然后执行schedule方法,但是这里涉及到了RxAndroid,在之前提及到AndroidSchedulers.mainThread()是一个Handler
 @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

这里再看下AndroidSchedulers.mainThread()的值


    private static final class MainHolder {
        // 这里的是HandlerScheduler ,是RxAndroid中封装的,继承自RxJava中的Scheduler
        static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    }
  1. HandlerScheduler中调用的createWorker返回的是一个HandlerWorker(handler),参数传入的是一个Handler
 @Override
    public Worker createWorker() {
        return new HandlerWorker(handler);
    }
  1. 再看看HandlerWorker 中的schedule方法,这里最终也是调用的传入的线程的run方法,也就是MainHandler的run方法,
 private static final class HandlerWorker extends Worker {
        private final Handler handler;

        private volatile boolean disposed;

        HandlerWorker(Handler handler) {
            this.handler = handler;
        }

        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (delay < 0) throw new IllegalArgumentException("delay < 0: " + delay);
            if (unit == null) throw new NullPointerException("unit == null");

            if (disposed) {
                return Disposables.disposed();
            }

            run = RxJavaPlugins.onSchedule(run);

            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);

            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
          // 通过handler发送过去的消息
            handler.sendMessageDelayed(message, unit.toMillis(delay));

            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }

            return scheduled;
        }

        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacksAndMessages(this /* token */);
        }

        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
  1. 这里基本切换到主线程的流程也完了,大体都是通过不同的Scheduler(IoScheduler,HandlerScheduler等)来处理不同的schedule()方法进行线程间的切换。

相关文章

网友评论

      本文标题:RxJava2.0源码详解

      本文链接:https://www.haomeiwen.com/subject/ksykwhtx.html