美文网首页
RxJava2.+创建流程源码分析

RxJava2.+创建流程源码分析

作者: jtsky | 来源:发表于2017-07-25 14:38 被阅读63次

本片文章适用于有一定Android开发经验并且对于响应式编程有一定了解的程序猿阅读。

简介

RxJava按照官方的定义为:一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库。在Android上使用的比较广泛,因为在移动开发中由于UI线程不能阻塞,否则会出现卡顿,所以异步操作对于移动端编程尤其重要。而RxJava就是这样一个基于事件流并且便于异步操作的程序库。下面我们从源码角度分析一下事件的创建流程。

Demo

Observable observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> emitter) throws Exception {
               emitter.onNext("hello");
                emitter.onNext("world");
                emitter.onComplete();
            }
        });

Observer observer = new Observer<String>() {
           @Override
            public void onSubscribe(Disposable d) {
                Log.d(TAG, "onSubscribe");
            }

            @Override
            public void onNext(String value) {
                Log.d(TAG, "onNext data is :" + value);
            }


            @Override
            public void onError(Throwable e) {
                Log.d(TAG, "onError data is :" + e.toString());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete");
            }
        };

observable.subscribe(observer);

输出:

onSubscribe
onNext data is :hello
onNext data is :world
onComplete

从日志可以看出onSubscribe方法先被调用然后依次输出hello world 最后输出 onComplete 符合Observable 中subscribe方法的调用顺序。

Observable#create

 @SchedulerSupport(SchedulerSupport.NONE)
    public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        ObjectHelper.requireNonNull(source, "source is null");
        return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
    }

Observable.create()的入参为ObservableOnSubscribe,那ObservableOnSubscribe是什么呢,它是一个接口

public interface ObservableOnSubscribe<T> {

    /**
     * Called for each Observer that subscribes.
     * @param e the safe emitter instance, never null
     * @throws Exception on error
     */
    void subscribe(ObservableEmitter<T> e) throws Exception;
}

ObjectHelper.requireNonNull()对入参进行了空指针判断。
RxJavaPlugins根据注释来看其实就是一个对Rxjava标准操作进行处理的插件类。
RxJavaPlugins.onAssembly返回的其实就是new 出来的ObservableCreate对象,同时持有ObservableOnSubscribe对象引用。
那ObservableCreate又是什么呢?其实ObservableCreate就是一个Observable被观察者对象并重写了subscribeActual()方法。而subscribeActual()真正调用的地方发生在订阅发生的地方,下文会分析到。
所以Observable#create最终创建的是ObservableCreate对象。

Observable#subscribe

    @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

可以看到RxJavaPlugins.onSubscribe(this,observer)其实只是简单的返回了observer而已,关键的是subscribeActual(observer)。通过上文可知,subscribeActual真正发生的地方其实是在ObservableCreate中。

@Override
    protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

subscribeActual这段代码并不长但很重要,咱们一句一句来分析下。
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
采用的是组合的方式,parent 内部持有了observer对象仅此而已,可以理解为parent 为observer的代理。
然后observer调用了自己的onSubscribe方法,这也是为什么我们一开始看到onSubscribe日志最先输出的原因。同时它也脱离了被订阅者的管理,因为订阅者自己调用了自己。
接下来可以看到订阅真正发生的地方source.subscribe(parent);
source就是一开始我们new出来的ObservableOnSubscribe对象,而parent是订阅者的代理对象,所以当订阅发生的时候,就会输出上面的日子。

单个操作符订阅总结

1、传入的ObservableOnSubscribe最终被用来创建成ObservableCreate
2、ObservableCreate持有我们的被观察者对象以及重新了订阅触发时的回调函数subscribeActual
3、在subscribeActual实现了我们的主要逻辑,包括observer.onSubscribe(parent);
source.subscribe(parent);
parent.onError(ex)的调用
4、在Observable的subscribe被调用时开始执行事件分发流程
5、最后放一张对象间的关系图(此图来自于网络)

52eb2279jw1f2rx489robj20lk0a8my2.jpg

组合操作符总结

1、首先我们得明确一点每个操作符最后都会通过RxJavaPlugins.onAssembly(Observable<T> source)返回一个新的Observable。如下:


image.png

2、onAssembly的入参针对每一个操作符都会实现一个继承自AbstractObservableWithUpstream:Observable的对象,它的作用就是用于包装上级的AbstractObservableWithUpstream:Observable。比如:


image.png
而每一个新的Observable对象中都会有一个继承自Observer的对象,它的作用就是用于包装下级Observer和当前的Function
image.png

3、当我们调用最底层subscribe方法的时候其实真正调用的是上一级Observable的subscribeActual方法,然后subscribeActual方法中会构造一个内部Observer子类的对象,然后通过调用上级Observable的subscribe方法将新生成的包装Observer对象传入到上一级中。
4、通过一层层的包装上传,当调用链来到最顶层的ObservableCreate时,由于不能再往上一层进行封装了,就会执行ObservableOnSubscribe的subscribe方法,如下:


image.png

而subscribe方法的参数就是下级经过层层包装传递上来的Observer,所以当我们调用emitter的相关方法时,内部都会执行如下操作:


image.png
其中actual就是下级的Observer。
5、当往下的调用链来到最底层时自然调用的就是最底层Observer的相关方法了。

至此,组合操作符的调用链就分析完了。

相关文章

网友评论

      本文标题:RxJava2.+创建流程源码分析

      本文链接:https://www.haomeiwen.com/subject/radwkxtx.html