我是最简单的操作符-----Create

作者: javalong | 来源:发表于2018-04-15 17:01 被阅读36次

    严格来说,create应不算操作符,但是第一篇文章我还是希望能先以最简单的入门。这样再去学习接下来的操作符的话会更加简单,由浅入深。

    来段小代码
    class ObservaleActivity : AppCompatActivity() {
        var sub: Subscription? = null
        var observable: Observable<String>? = null
        override fun onCreate(savedInstanceState: Bundle?) {
            super.onCreate(savedInstanceState)
            setContentView(R.layout.activity_common)
            btSub.setOnClickListener({
                /**
                 * 暂时都不考虑 onComplete onError 只做onNext的处理,由浅入深
                 */
                observable?.subscribe({ msg ->
                    tvContent.text = tvContent.text.toString() + "\n" + msg
                })
            })
            /**
             * 最简单的使用方式
             */
            observable = Observable.create(object : Observable.OnSubscribe<String> {
                override fun call(t: Subscriber<in String>) {
                    t.onNext("Test1")
                }
            })
        }
    
      //这里需要去解除注册,防止内存泄漏
        override fun onDestroy() {
            super.onDestroy()
            sub?.unsubscribe()
        }
    }
    

    这里直接先贴出整个Activity的代码,给大家一个印象,在以后的话就不再直接贴全代码,而是贴关键代码。

    注意:一般来说,是需要在onDestroy的时候去unsubscribe,防止内存泄漏


    先了解重要的接口和类

    在阅读源码的时候,我get到了几个重要的方法

    1. 不需要每行都去搞懂
    2. 抓住最重要的类和接口去搞懂

    这里比较重要的接口和类分别是:

    接口或类名
    OnSubscribe 实现了Action接口,只有一个call方法,在subscribe时候才会去调用
    Subscriber 调用subscribe所传的参数,里面是onNext,onComplete,onError等方法
    Observable 所有的操作符都在这个类里面,我认为最重要的其实是OnSubscribe和Subscriber,而Observable其实是作为一个桥梁来链接这2者
    Subscription 这个是调用subscribe方法的返回值,用来取消这次订阅

    我阅读RxJavau 源码的时候,最让我头疼的一个地方其实就是它的命名,很多名字起的都起的差不多,所以搞得很乱。

    这里我专门提出这4个接口或类,只要能把这几个理清楚,其实就差不多。下面我们深入源码去理解。

    看看源代码

    Observable.create方法

    Observable

        public static <T> Observable<T> create(OnSubscribe<T> f) {
            return new Observable<T>(RxJavaHooks.onCreate(f));
        }
    

    RxJavaHooks

    public static <T> Observable.OnSubscribe<T> onCreate(Observable.OnSubscribe<T> onSubscribe) {
            Func1<Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableCreate;
            if (f != null) {
                return f.call(onSubscribe);
            }
            return onSubscribe;
        }
    

    这2段代码涉及到了刚才提到的Observable,OnSubscribe
    Observable.create(OnSubscribe)其实只是new了一个Observable然后把OnSubscribe作为参数传入,这里的RxJavaHooks大家可以先忽略,这个主要是做全局处理使用的,如果没有全局设置的话,onObservableCreate = null,会直接在第二个方法直接返回你传入的OnSubscribe参数。

    Observable.subscribe方法

    Observable

     public final Subscription subscribe(final Action1<? super T> onNext) {
            if (onNext == null) {
                throw new IllegalArgumentException("onNext can not be null");
            }
    
            Action1<Throwable> onError = InternalObservableUtils.ERROR_NOT_IMPLEMENTED;
            Action0 onCompleted = Actions.empty();
            return subscribe(new ActionSubscriber<T>(onNext, onError, onCompleted));
        }
    
     public final Subscription subscribe(Subscriber<? super T> subscriber) {
            return Observable.subscribe(subscriber, this);
        }
    
    
    static <T> Subscription subscribe(Subscriber<? super T> subscriber, Observable<T> observable) {
            ...
            subscriber.onStart();
            if (!(subscriber instanceof SafeSubscriber)) {
                subscriber = new SafeSubscriber<T>(subscriber);
            }
            try {
                RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
                return RxJavaHooks.onObservableReturn(subscriber);
            } catch (Throwable e) {
                ...
                return Subscriptions.unsubscribed();
            }
        }
    

    subscribe重载方法比较多,我把我们demo中涉及到的3个重载列出。
    在demo中我们使用的是直接传入了一个Action对象处理了onNext的事件,在源码中,Observable会把我们的Action包装成一个Subscriber对象,添加上onError,onComplete等方法的默认实现。

    最终调用的其实是subscribe的第三个重载方法。
    首先判断下参数Subscriber对象是否是SafeSubscriber对象的子类,如果不是,那么就重新new一个SafeSubscriber替换掉当前Observable对象的subscriber变量。

    注意:这里的SafeSubscriber大家可以先忽略,我们先看完完整的流程,在最后再为大家来看看SafeSubscriber

    最重要的代码

    ...
    RxJavaHooks.onObservableStart(observable, observable.onSubscribe).call(subscriber);
    ...
    

    RxJavaHooks

     public static <T> Observable.OnSubscribe<T> onObservableStart(Observable<T> instance, Observable.OnSubscribe<T> onSubscribe) {
            Func2<Observable, Observable.OnSubscribe, Observable.OnSubscribe> f = onObservableStart;
            if (f != null) {
                return f.call(instance, onSubscribe);
            }
            return onSubscribe;
        }
    

    这里又出现了RxJavaHooks,大家可以直接忽略看最后返回的参数是什么,直接返回的是传入的参数OnSubscribe对象。这里的RxJavaHooks也是用来做全局处理的作用,所以我们可以先忽略,简单的来看整个流程。

    所以上面所提到的最终的代码,其实可以简化为

    observable.onSubscribe.call(subscriber)
    

    大家可以看到,其实这就是最关键的代码,也可以看出 Observable这个对象的作用,其实就是用来链接OnSubscribeSubscriber对象。

    那么我们重新来看看Activity中的关键的代码

    ...
    observable?.subscribe({ msg ->
                    tvContent.text = tvContent.text.toString() + "\n" + msg
                })
    ...
    observable = Observable.create(object : Observable.OnSubscribe<String> {
                override fun call(t: Subscriber<in String>) {
                    t.onNext("Test1")
                }
            })
    ...
    

    应该对整个流程有一个简单的认识了。


    附加知识点

    SafeSubscriber

     public void onCompleted() {
            if (!done) {
                done = true;
                try {
                    actual.onCompleted();
                } catch (Throwable e) {
               ...
              }
            }
        }
    
        @Override
        public void onError(Throwable e) {
            if (!done) {
                done = true;
                _onError(e);
            }
        }
    
        @Override
        public void onNext(T t) {
            try {
                if (!done) {
                     actual.onNext(t);
                  }
            ...
        }
    

    主要作用: 对原有的Subscriber对象包装一下。前面刚学RxJava的时候,看到有些文章说,onErroronComplete只有一个会运行,其实从这里可以看出。


    总结

    RxJava中核心的设计模式我认为是装饰者模式,所以在阅读源码的时候也非常的累,因为RxJava的链式结构,大家可以想象下如果4,5个操作符链式书写,然后一步步跟入源码会很难受的,完全不知道当前是哪个OnSubscribeSubscriber

    当然只要我们能搞清楚1层结构的流程,那么后面的3,4层结构也能理解,只是稍微麻烦点而已。

    相关文章

      网友评论

      • wsxiaoluob:厉害啊,之前不懂这一块的东西,看了这篇文章简直茅塞顿开,作者写的真是太好了,很详细解释的很到位。并且还有源码的思考,简直太完美了!!
      • ZZombiee:哇,回神

      本文标题:我是最简单的操作符-----Create

      本文链接:https://www.haomeiwen.com/subject/eqwdkftx.html