美文网首页
2019-11-08 Rxjava 源码解析<1>

2019-11-08 Rxjava 源码解析<1>

作者: 猫KK | 来源:发表于2019-11-08 17:24 被阅读0次

先来看看rxjava一般用法

      Observable.create<String> {
            it.onNext("next")
            it.onComplete()
        }
            .subscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe({

            }, {

            }, {
                
            })

我这里是用kotlin写的,这样写可能不太好懂,可以按下面这样来解读

       var sources = object : ObservableOnSubscribe<String> {
            override fun subscribe(emitter: ObservableEmitter<String>) {
                emitter.onNext("下一步")
                emitter.onComplete()
            }
        }
        var observable = Observable.create(sources)
        var observable1 = observable.subscribeOn(Schedulers.io())
        var observable2 = observable1.observeOn(AndroidSchedulers.mainThread())
        var observer = object :Observer<String>{
            override fun onComplete() {
            }

            override fun onSubscribe(d: Disposable) {
            }

            override fun onNext(t: String) {
            }

            override fun onError(e: Throwable) {
            }
        }
        observable2.subscribe(observer)

这样子,可能会比较容易理解,现在我我们先忽略里面的线程调度方法,就如下面这样

        var sources = object : ObservableOnSubscribe<String> {
            override fun subscribe(emitter: ObservableEmitter<String>) {
                emitter.onNext("下一步")
                emitter.onComplete()
            }
        }
        var observable = Observable.create(sources)
        var observer = object :Observer<String>{
            override fun onComplete() {
            }

            override fun onSubscribe(d: Disposable) {
            }

            override fun onNext(t: String) {
            }

            override fun onError(e: Throwable) {
            }
        }
        observable.subscribe(observer)

先来看observable是如何创建的

    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        //这个是用来判断是否为null,如果source为null,则会抛出异常
        ObjectHelper.requireNonNull(source, "source is null");
        //先判断onObservableAssembly是否为null,为null直接返回ObservableCreate
        //第一次创建,onObservableAssembly为null,所以返回的是括号中的内容
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

直接返回ObservableCreate对象,看这个对象的构造方法

public final class ObservableCreate<T> extends Observable<T> {
    final ObservableOnSubscribe<T> source;

    //传进来一个source,并赋值
    public ObservableCreate(ObservableOnSubscribe<T> source) {
        this.source = source;
    }
    //.....
}

所以Observable.create就是创建一个ObservableCreate对象,并把source保存到ObservableCreate中,其中source就是前面创建的,继续看subscribe方法,知道observable为ObservableCreate对象,所以在ObservableCreate中找,但是ObservableCreate并没有这个方法,去看ObservableCreate的父类

    //Observable类,ObservableCreate的父类

    public final void subscribe(Observer<? super T> observer) {
        //判断observer是否为null,为null抛出异常
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            //判断onObservableSubscribe是否为null,为null直接返回observer自身
            //到目前为止都没有看见onObservableSubscribe的赋值,先默认为null
            observer = RxJavaPlugins.onSubscribe(this, observer);
            //判断处理后的observer是否为null
            ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
            //调用子类的subscribeActual方法,即ObservableCreate.subscribeActual
            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

subscribeActual是一个抽象方法,所以又回到ObservableCreate中

    @Override
    protected void subscribeActual(Observer<? super T> observer) {
        //创建CreateEmitter类,并将observer传过去
        //CreateEmitter是ObservableCreate的内部类,用来处理onNext等方法的
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //调用observer的onSubscribe方法
        //也就是传过来的observer的onSubscribe方法,说明已经绑定了
        observer.onSubscribe(parent);
        try {
            //调用source的subscribe方法,并将CreateEmitter过去
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

从上面可以知道source就是前面创建的

        //这里就是真实的sources
        var sources = object : ObservableOnSubscribe<String> {
            override fun subscribe(emitter: ObservableEmitter<String>) {
                emitter.onNext("下一步")
                emitter.onComplete()
            }
        }
        var observable = Observable.create(sources)
       //....

所以就回调到这里的sources的subscribe方法,其中emitter就是ObservableCreate的内部类CreateEmitter,当我们调用emitter.onNext()方法,就是调用CreateEmitter.onNext()方法,来看做了什么

    static final class CreateEmitter<T>
    extends AtomicReference<Disposable>
    implements ObservableEmitter<T>, Disposable {

        private static final long serialVersionUID = -3434801548987643227L;

        final Observer<? super T> observer;
        //构造方法传过来的observer,
        //就是我们调用 observable.subscribe(observer)传过来的observer
        CreateEmitter(Observer<? super T> observer) {
            this.observer = observer;
        }

        @Override
        public void onNext(T t) {
            //调用onNext方法
            //判断onNext传过来的参数是否为null,为null,调用自身onError
            if (t == null) {
                onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                return;
            }
            //判断是否已经解除绑定,
           //所以如果调用了onComplete再调用onNext,observer是不会收到消息的
            if (!isDisposed()) {
                 //调用observer.onNext(t),并将参数传过去
                //所以,到这就能回到我们的observer的onNext方法中,并获取到参数
                observer.onNext(t);
            }
        }

        @Override
        public void onError(Throwable t) {
            //判断是否解绑,没有就调用observer.onError
            //再调用dispose()方法解绑
            if (!tryOnError(t)) {
                RxJavaPlugins.onError(t);
            }
        }

        @Override
        public boolean tryOnError(Throwable t) {
            if (t == null) {
                t = new NullPointerException("onError called with null. Null values are generally not allowed in 2.x operators and sources.");
            }
            if (!isDisposed()) {
                try {
                    observer.onError(t);
                } finally {
                    dispose();
                }
                return true;
            }
            return false;
        }

        @Override
        public void onComplete() {
            //判断是否解绑,没有就调用onComplete()然后再调用 dispose()解绑
            if (!isDisposed()) {
                try {
                    observer.onComplete();
                } finally {
                    dispose();
                }
            }
        }
    //......

从上面可以知道,如何从sources调用onNext方法能传到observer的onNext方法,并接收到参数。

相关文章

网友评论

      本文标题:2019-11-08 Rxjava 源码解析<1>

      本文链接:https://www.haomeiwen.com/subject/qxlwbctx.html