美文网首页
Rxjava源码解析

Rxjava源码解析

作者: 张三疯啊啊啊 | 来源:发表于2018-08-27 16:54 被阅读0次

    先上代码:

    ObservableOnSubscribe<Integer> oos = new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> e) throws Exception {
             final int max = 100;
             for (int i = 1; i <= max; i++) {
                 e.onNext(max);
             }
             e.onComplete();
         }
     };
     Observer<Integer> observer = new Observer<Integer>() {
         @Override
         public void onSubscribe(Disposable d) {
         }
         @Override
         public void onNext(Integer integer) {
         }
         @Override
         public void onError(Throwable e) {
         }
         @Override
         public void onComplete() {
         }
     };
    
     Observable.create(oos)
               .observeOn(AndroidSchedulers.mainThread())
               .subscribeOn(Schedulers.computation())
               .subscribe(observer);
    

    上面是Rxjava最简单的实现模型。
    从链式调用的返回值来看:

      Observable.create()------》ObservableCreate extends Observable
      ObservableCreate.observerOn()------->ObservableObserveOn extends  AbstractObservableWithUpstream  extends  Observable
      ObservableObserveOn.subscribeOn()------->ObservableSubscribeOn extends  AbstractObservableWithUpstream  extends  Observable
    

    所以最后的调用对象是

      ObservableSubscribeOn.subscribe(observer)
    

    从上面的返回值可以看出中间任一一个的返回值返回的都是observable的子对象。

    为什么要强调中间几个的返回值都是observable的返回值,这里要先明确一下,待会会大量用到subscribe()方法,在Observable(子类)中的subscribe()方法:

    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            //钩子,如果未设置的话,返回值还是observer
            observer = RxJavaPlugins.onSubscribe(this, observer);
            //空检验
            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
            //核心
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            ...... //handle exception 
        }
    }
    

    因此下面的分析代码中,如果是调用上面4个对象的subscribe()方法的时候,直接看subscribeActual()方法即可。

    那就从最后一层 ObservableSubscribeOn 的 subscribeActual() 方法开始分析。

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
        s.onSubscribe(parent);
    
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    

    这里的这个s就是我们缩写的observer
    第一行:先把我们的observer封装成了SubscribeOnObserver
    第二行:调用了observer.onSubscribe()方法
                  也就是observer订阅Observable时候的方法,一般这个时候可以做一些操作
    第三行:
    parent.setDisposable() 以及scheduler调度器先不论,待会再分析,这里先看SubscribeTask这个类:

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
    
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
    
        @Override
        public void run() {
            source.subscribe(parent);
        }
    }
    

    看到runnable,熟悉线程的同学已经可以猜到source.subscribe(parent) 这句代码很可能在子线程中执行,这里先mark一下,待会回到这个地方再具体看。

    这里要先插入一下source和Observer的问题:

    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    
    public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
        super(source);
        this.scheduler = scheduler;
    }
    
    public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
        super(source);
        this.scheduler = scheduler;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
    
    //后面两个类继承抽象类,调用super(source)方法。
    AbstractObservableWithUpstream(ObservableSource<T> source) {
        this.source = source;
    }
    

    通过上面的代码可以看到所有的这三个关键类,source都是通过构造传入进来的,
    而后两个类都还有schedule参数,这个涉及线程调度,待会也会说,也mark一下。
    通过以上代码可以分析:

    ObservableCreate 的 source 是 oos
    ObservableObserveOn 的 source 是 ObservableCreate
    ObservableSubscribeOn 的 source 是 ObservableObserveOn
    

    至于Observer,通过代码可以分析:

     (ObservableObserveOn)source
        .subscribe(parent(SubscribeOnObserver));
    
     (ObservableCreate)source
        .subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    
    
     CreateEmitter<T> parent = new CreateEmitter<T>(observer);
     (oos)source.subscribe(parent);
    

    也即是:

    ObservableObserveOn 的 observer 是 SubscribeOnObserver
    ObservableCreate 的 observer 是 ObserveOnObserver
    oos 的 observer 是 CreateEmitter
    

    这里有点一级一级调用的意味了,而这个意味就是Rxjava的一个很重要的点。

    插入结束,继续回到刚才的 SubscribeTask
    结合上面的分析:

     source.subscribe(parent)
    

    也就意味着

    ObservableObserveOn.subscribeActual()
    

    这里转了两个弯,各位可以稍微思考一下
    而在ObservableObserveOn中:

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            //这个暂时忽略,未设置的时候不走这里
            source.subscribe(observer);
        } else {
            //线程调度,待会再分析
            Scheduler.Worker w = scheduler.createWorker();
            //最终会调用这个,又是很熟悉的source subscribe()方法
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    

    继续往上走,走到 ObservableCreate 中,这里省略了重复流程。

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);
        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }
    

    走到最上层,这个最上层的source就是我们前面写的oos。
    这个observer是在 ObservableObserverOn 中的 ObserveOnObserver。这个名字有点像,汗

    第一行:先把 ObserveOnObserver 封装成 CreateEmitter
    第二行:调用 ObserveOnObserver.onSubscribe()方法。

    @Override
    public void onSubscribe(Disposable s) {
        if (DisposableHelper.validate(this.s, s)) {
            this.s = s;
            if (s instanceof QueueDisposable) {
                @SuppressWarnings("unchecked")
                QueueDisposable<T> qd = (QueueDisposable<T>) s;
    
                int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
    
                if (m == QueueDisposable.SYNC) {
                    sourceMode = m;
                    queue = qd;
                    done = true;
                    actual.onSubscribe(this);
                    schedule();
                    return;
                }
                if (m == QueueDisposable.ASYNC) {
                    sourceMode = m;
                    queue = qd;
                    actual.onSubscribe(this);
                    return;
                }
            }
    
            queue = new SpscLinkedArrayQueue<T>(bufferSize);
    
            actual.onSubscribe(this);
        }
    }
    

    这个方法比较长,但是对我们的流程分析关键的代码其实就一句

    actual.onSubscribe(this);
    

    根据前面的observer的分析,这个observer其实就是 ObservableSubscribeOn 的 SubscribeOnObserver
    最后找到源码,调用了 SubscribeOnObserver 的 onSubscribe()方法。

        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }
    
        public static boolean setOnce(AtomicReference<Disposable> field, Disposable d) {
            ObjectHelper.requireNonNull(d, "d is null");
            if (!field.compareAndSet(null, d)) {
                d.dispose();
                if (field.get() != DISPOSED) {
                    reportDisposableSet();
                }
                return false;
            }
            return true;
        }
    

    涉及到CAS的操作,感兴趣的同学可以研究一下,这里对我们的流程没有太大影响。

    第三行:至此,整个的流程终于回到了我们的oos。

    从ObservableSubscribeOn的subscribe()方法历尽千辛万苦终于调用了oos的subscribe()方法。

        ObservableOnSubscribe<Integer> oos = new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                final int max = 100;
                for (int i = 1; i <= max; i++) {
                    e.onNext(max);
                }
                e.onComplete();
            }
        };
    

    首先创建了 ObservableEmitter ,然后调用emmiter.onNext()方法。

        @Override
        public void onNext(T t) {
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            if (!isDisposed()) {
                observer.onNext(t);
            }
        }
    

    异常先不考虑,最终是调用 observer.onNext()方法。
    根据上面的分析,这个Observer是ObserveOnObserver
    第一行:先把 ObserveOnObserver 封装成 CreateEmitter,而CreateEmmiter的构造:

        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }
    

    所以,可以知道这个observer就是最前面的 ObserveOnObserver

    也就是e.Next(n)------>最终会调用ObserveOnObserver.onNext(n)

        @Override
        public void onNext(T t) {
            if (done) {
                return;
            }
            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }
    

    最终调用了 schedule() 方法。

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
    

    将当前对象添加到worker中,这个是线程调度的问题了,待会分析。

    再看一下ObserveOnObserver 类的声明:

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable{}
    

    实现了Runnable接口,所以关键代码就在run()方法之中。

       @Override
        public void run() {
            if (outputFused) {
                drainFused();
            } else {
                drainNormal();
            }
        }
    
        //我们看drainNormal()的方法
        void drainNormal() {
            int missed = 1;
    
            final SimpleQueue<T> q = queue;
            final Observer<? super T> a = actual;
    
            for (;;) {
                if (checkTerminated(done, q.isEmpty(), a)) {
                    return;
                }
    
                for (;;) {
                    boolean d = done;
                    T v;
    
                    try {
                        v = q.poll();
                    } catch (Throwable ex) {
                        Exceptions.throwIfFatal(ex);
                        s.dispose();
                        q.clear();
                        a.onError(ex);
                        worker.dispose();
                        return;
                    }
                    boolean empty = v == null;
    
                    if (checkTerminated(d, empty, a)) {
                        return;
                    }
    
                    if (empty) {
                        break;
                    }
    
                    a.onNext(v);
                }
    
                missed = addAndGet(-missed);
                if (missed == 0) {
                    break;
                }
            }
        }
    

    终于看到了a.onNext()方法,也就是actual.onNext()方法。
    通过 ObserveOnObserver 的构造:

    //构造方法
    ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
        this.actual = actual;
        this.worker = worker;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }
    
    //创建对象的时候
    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
            //这里new了ObserveOnObserver对象。
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    

    可以看出,这个actual对象,其实就是传入进来的observer。
    而这个observer结合SubscribeTask代码,可以知道:
    这个observer其实就是讲我们的observer封装起来的SubscribeOnObserver对象。

    而SubscribeOnObserver的onNext()方法:

        @Override
        public void onNext(T t) {
            actual.onNext(t);
        }
    

    其实就是我们的o.next()方法。

    七转八弯,经历这个这么多,也是本文最核心的:

    subscribe()方法,先一层一层往上回调,调用了我们的oos的onNext()方法,
    而onNext()里面又一层一层往下回调,调用了我们的obsrever的onNext()方法,实现了数据的传递。

    然后是线程切换问题:

    还记得我们之前说ObservableSubscribeOn, ObservableObserveOn这两个对象的构造都会传入一个 schedule 的调度器吗?

    先看 ObservableSubscribeOn

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    

    再结合前面的代码,我们知道这个 scheduler 是Schedulers.computation()

    最后走到了:

    public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
        ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
        try {
            Future<?> f;
            if (delayTime <= 0L) {
                f = executor.submit(task);
            } else {
                f = executor.schedule(task, delayTime, unit);
            }
            task.setFuture(f);
            return task;
        } catch (RejectedExecutionException ex) {
            RxJavaPlugins.onError(ex);
            return EmptyDisposable.INSTANCE;
        }
    }
    

    executor,我们非常熟悉的线程池。看到这,也就大概明白了我们的 source.subscribe(parent)
    以及其对应的一层层往上回调都是在subscribeOn(线程) 所调用的线程之中

    然后线程什么时候会再度切换呢?
    是在ObservableObserveOn中的 schedule() 方法中:

        void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }
    

    这个worker一层层追踪溯源,找到了其初始化的地方,是在ObservableObserveOn的subscribeActual()方法之中:

    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();
    
            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }
    

    这个schedule就是observerOn所对应的线程。
    AndroidSchedulers.mainThread() 的实现是 HandlerScheduler

        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");
    
            if (disposed) {
                return Disposables.disposed();
            }
    
            run = RxJavaPlugins.onSchedule(run);
    
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
    
            if (async) {
                message.setAsynchronous(true);
            }
    
            handler.sendMessageDelayed(message, unit.toMillis(delay));
    
            // Re-check disposed state for removing in case we were racing a call to dispose().
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }
    
            return scheduled;
        }
    

    最终通过hanler进行了线程的切换。
    也就是最后我们的observer.onNext()方法执行的线程是由observeOn()所对应的线程

    相关文章

      网友评论

          本文标题:Rxjava源码解析

          本文链接:https://www.haomeiwen.com/subject/nqrtwftx.html