美文网首页工作生活
从源码分析RxJava订阅过程

从源码分析RxJava订阅过程

作者: 有没有口罩给我一个 | 来源:发表于2019-06-30 23:38 被阅读0次
wdroid.jpg

都知道观察模式吧?

在开始之前让我们简单了解一下观察模式,就是某对象A的变化引起其他多个对象B变化,但是前提是你需要去订阅我,打个比方:就是我的状态发生了改变,那我怎么通知你呢?所以我需要知道的如何去通知其他对象说我这里已经改变了,你看看那需不需要做出改变。就比如微信的订阅号,如果你不订阅,那该订阅号在发布内容也不会通知,这里的订阅号就是被观察者,而用户就是观察者。那怎么说让这两者关联来呢?前面说的订阅号是要提供一个接口,允许用户去订阅的,所以最后就是被观察者和观察者两个都得提供接口,订阅号提供的接口让用户去订阅类比微信号,当订阅号发布内容,就通过这个微信号通知观察者,所以订阅就是这两者的关联点。

开始之前的两个重要的类或接口:ObservableObserver

  • Observable 它实现ObservableSource接口,通俗来讲Observable就是一个被观察者也有人叫可观察的资源,这里就叫被观察者;
  • Observer 观察者;
    涉及的类:


    RxJava2.png

订阅流程分析

开始RxJava的订阅流程分析之前,来个简单的栗子,代码如下:

Observable<String> observableCreate = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("发射 subscribe");
            emitter.onComplete();
        }
    });//ObservableCreate

    Observable observableSubscribeOn = observableSubscribeOn.subscribeOn(Schedulers.io());//1

    observableSubscribeOn.subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            showLog("onSubscribe");
        }

        @Override
        public void onNext(String s) {
            showLog("onNext");
        }

        @Override
        public void onError(Throwable e) {
            showLog("onError");
        }

        @Override
        public void onComplete() {
            showLog("onComplete");
        }
    });

日志结果:

onSubscribe ,Thread: main
onNext ,Thread: RxNewThreadScheduler-3
onComplete ,Thread: RxNewThreadScheduler-3

如上代码,之所以分开来写是为了更清晰的去理解每一步RxJava生成的相关类。

如果你认真看前面的内容,你一下就明白Observable.subscribe()方法也就是订阅的意思,是 ObservableObserver 的关联点,也就是被观察者和观察者的关联点,所以我们的分析就从Observable.subscribe(Observer observer)方法开始代码如下:

 public final void subscribe(Observer<? super T> observer) {
    try {

        // .....此处省略几亿代码....

        //此方法在Observable类是中是抽象的,注定是子类实现
        subscribeActual(observer);

        // .....此处省略几亿代码....
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
       // .....此处省略几亿代码....
    }
}
  • 上面代码不难理解在subscribe方法中直接就调用了subscribeActual(observer)方法,我可以翻译为 实际订阅;
  • subscribe方法是Observable类的方法,他是抽象类,传入了一个 Observer 对象,开始的时候栗子我们可以知道Observable是通过我们调用Observable.create(ObservableOnSubscribe) 所创建出来的;
  • 那subscribeActual在Observable中是抽象方法,肯定是子类去实现了该方法,从第二点知道子类肯定是在Observable.create(ObservableOnSubscribe)中给new出来的,那么接下我们看看Observable.create(ObservableOnSubscribe)方法的实现;

// Observable.create(ObservableOnSubscribe)

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    // .....此处省略几亿代码....

    // 直接就创建了ObservableCreate,并把source作为参数传进去
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}

// onAssembly

public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

我们从上面代码我们知道在Observable.create(ObservableOnSubscribe)中直接就创建了ObservableCreate,而ObservableCreate是Observable的子类,并把source作为参数传进去,最后调用RxJavaPlugins.onAssembly方法,我们默认返回ObservableCreate实例,所以Observable.create方法最后返回的是ObservableCreate实例,所以就验证了上面的第三点实际调用的是ObservableCreate.subscribeActual(observer)方法,这是在不考虑其他变换和线程切换的情况,那我们就来看看ObservableCreate.subscribeActual(observer)方法的实现,代码如下:

 @Override
protected void subscribeActual(Observer<? super T> observer) {
    //事件发射器
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //直接回调Observer的onSubscribe方法,这个方法是和线程切换无关,只在当前的线程中执行
    observer.onSubscribe(parent);

    try {
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

代码不多,也很好理解:

  • 首先调用observer.onSubscribe(parent)方法通知Observer已经订阅成功了。
  • 最后调用source.subscribe(parent)方法完成订阅,source又是什么呢?我们知道在ObservableCreate是在Observable.create方法时创建的,并把ObservableOnSubscribe传进来,所以source就是ObservableOnSubscribe,直接回调ObservableOnSubscribe.subscribe方法并把CreateEmitter作为参数传递进去,之后再我们是栗子中通过这个对象调研onNext方法或者onComplete方法发射事件;

看一下CreateEmitter的实现,代码如下:

static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {

    //通过构造方法注入 观察者实例
    final Observer<? super T> observer;

    CreateEmitter(Observer<? super T> observer) {
        this.observer = observer;
    }

    @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {
            observer.onNext(t);
        }
    }

    @Override
    public void onError(Throwable t) {
        if (!tryOnError(t)) {
            RxJavaPlugins.onError(t);
        }
    }

   // .....此处省略几亿代码....

    @Override
    public void onComplete() {
        if (!isDisposed()) {
            try {
                observer.onComplete();
            } finally {
                dispose();
            }
        }
    }

   // .....此处省略几亿代码....
}

为了简介清晰我删掉很多无关代码,只保留onNext等这些相关的方法。

  • 其实CreateEmitter是Observable的静态内部类,
  • 在上面我们知道Observable.subscribeActual方法中创建了CreateEmitter实例并将Observer作为参数通过构造方法注入Observer实例,作为CreateEmitter的成员变量;
  • 之后在subscribeActual方法中调用ObservableOnSubscribe.subscribe的方法并把CreateEmitter实例作为方法参数传递进去;
  • 简单来说CreateEmitter的作用就是发射事件,里面分装了Observer实例,发射事件就回调到Obsever中的方法,如onNext等方法;

有没有发现从一开始我们就仅仅讲了从Obsevable的创建到订阅,这是比较汉理解的,如果我增加一个map或线程切换呢?这里暂时不展开讲线程切换。

重新把栗子的代码在贴一遍:

Observable<String> observableCreate = Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("发射 subscribe");
            emitter.onComplete();
        }
    });//ObservableCreate

    Observable observableSubscribeOn = observableSubscribeOn.map(new Function<String, Object>() {
        @Override
        public Object apply(String s) throws Exception {
            Log.e("tag", "map");
            return "aa";
        }
    })

    observableSubscribeOn.subscribe(new Observer<String>() {
        @Override
        public void onSubscribe(Disposable d) {
            showLog("onSubscribe");
        }

        @Override
        public void onNext(String s) {
            showLog("onNext");
        }

        @Override
        public void onError(Throwable e) {
            showLog("onError");
        }

        @Override
        public void onComplete() {
            showLog("onComplete");
        }
    });

如上代码,订阅流程会和之前的有什么不一样呢?那么我们看个究竟,就从 Observable observableSubscribeOn = observableSubscribeOn.subscribeOn(Schedulers.io())开始,代码如下:

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper){
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    //这里把上游this传进去也就是source,以便调用上游的subscribe方法
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

从上面代码看,我们知道在map方法中创建了ObservableMap并把上游的Observable参进去了,而我们知道从Observable.subscribe方法开始订阅就会调用 subscribeActual(observer)方法,所以在Observable.subscribe之后就会调用ObservableMap的subscribbeActual方法,代码如下:

 public void subscribeActual(Observer<? super U> t) {
    source.subscribe(new MapObserver<T, U>(t, function));
}

在ObservableMap的subscribbeActual方法中,直接调用传进来的Observable的subscribe方法又间接调用subscribbeActual方法没所以,订阅的过程实际上是一样的。

总结

  • Observable是由上游往下游传递的,并且每个操作符都会创建新的Observable对象包裹上游的实例;
  • Observer是由下游往上游传递的,也就是从Observable.subscribe方法开始。

相关文章

网友评论

    本文标题:从源码分析RxJava订阅过程

    本文链接:https://www.haomeiwen.com/subject/thiicctx.html