美文网首页
RxJava1 原理

RxJava1 原理

作者: simplehych | 来源:发表于2019-01-25 09:22 被阅读0次

    0x01 RxJava 简介

    github 地址:https://github.com/ReactiveX/RxJava

    a library for composing asynchronous and event-based programs by using observable sequences for the Java VM.

    一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库

    关键字:异步事件可观测序列

    RxJava1原理读自 扔物线 于2015年10月写的 给 Android 开发者的 RxJava 详解

    功能:异步,解决异步处理问题
    好处:简洁,链式调用,逻辑顺序执行

    0x02 RxJava1 原理

    鉴于大多数同学已经相对熟悉RxJava,不再逐一深入,只进行核心说明。

    本节依据以下版本进行探讨

    implementation 'io.reactivex:rxjava:1.0.14'
    implementation 'io.reactivex:rxandroid:1.0.1'
    

    抛出一个示例

    简化代码如下

    Observable
            .create( new OnSubscribe() {
                  void call(Subscriber subscriber) {}
            })
            .subscribeOn()
            .flatMap()
            .observeOn()
            .map()
            .subscribeOn()
            .take()
            .observeOn()
            .subscribe(new Subscriber())
    
    

    具体代码如下,可暂时忽略

    Observable
            .create(new Observable.OnSubscribe<Integer>() {
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(1);
                    subscriber.onNext(2);
                    subscriber.onNext(3);
                    subscriber.onCompleted();
                }
            })
            .subscribeOn(Schedulers.io())
            .flatMap(new Func1<Integer, Observable<String>>() {
                @Override
                public Observable<String> call(Integer integer) {
                    return Observable.just("flatMap-" + integer);
                }
            })
            .observeOn(AndroidSchedulers.mainThread())
            .map(new Func1<String, String>() {
                @Override
                public String call(String s) {
                    return "map-" + s;
                }
            })
            .doOnSubscribe(new Action0() {
                @Override
                public void call() {
                    Toast.makeText(context, "doOnSubscribe call", Toast.LENGTH_SHORT).show();
                }
            })
            .subscribeOn(AndroidSchedulers.mainThread())
            .take(2)
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Subscriber<String>() {
    
                @Override
                public void onStart() {
                    Log.i(TAG, "Subscriber onStart");
    
                }
    
                @Override
                public void onNext(String s) {
                    Log.i(TAG, "Subscriber onNext " + s);
                }
    
                @Override
                public void onCompleted() {
                    Log.i(TAG, "Subscriber onStart");
                }
    
                @Override
                public void onError(Throwable e) {
                    Log.i(TAG, "Subscriber onStart");
                }
            });
    

    Point 核心知识点

    • 关键词:ObservableonSubscribesubscribe()Subscriber
    • Subscriber 可看做等价 Observer。Subscriber 实现 Observer;subscribe() 注册时总会将 Observer 转换成 Subscriber 再使用;有区别是 onStart() 和 unsubscribe();
    • 创建 Observable 需传入OnSubscribe对象,该对象即为 Observable 类中的 onSubscribe 属性
    • subscribe() 返回 Subscription,其他系列操作符如map()、 subscribeOn()、observeOn()等都返回 Observable
    • map()、flatMap()、subscribeOn() 等所有的操作符基于 lift()
    // 这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码
    public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
        return Observable.create(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber subscriber) {
                Subscriber newSubscriber = operator.call(subscriber);
                newSubscriber.onStart();
                onSubscribe.call(newSubscriber);
            }
        });
    }
    
    // operator.call() 的核心代码
    public final class OperatorXXX<T> implements Observable.Operator<T, T> {
        @Override
        public Subscriber<? super T> call(final Subscriber<? super T> child) {
            final Subscriber<T> parent = new Subscriber<T>() {
                @Override
                public void onNext(T i) {
                    child.onNext(i);
                }
            };
            return parent;
        }
    }
    
    • Observable.subscribe(Subscriber)
    // subscribe()的核心代码
    public Subscription subscribe(Subscriber subscriber) {
        subscriber.onStart();
        onSubscribe.call(subscriber);
        return subscriber;
    }
    
    • 通过 lift() 代码可知,下一级 (新) Observable 包含上一级 (旧) Observable, 最后 subscribe() 时进行向上打call 依次调用 Subscriber
    • 通过operator.call() 的核心代码,下一级(新) Subscriber 包含上一级 (旧) Subscriber, 最后得到最终的 Subscriber
    • 最终 subscribe() 启动全过程,然后向上依次回退
    // 整体链式调用理解
    Observable
            .create( new OnSubscribe() {
                void call(Subscriber subscriber) {}
            })
            .lift()
            .lift()
            .subscribe(new Subscriber())
    
    • subscribeOn() 指定 subscribe()所发生的线程,即 Observable.OnSubscribe 被激活时所在的线程
    • observeOn() 指定 Subscriber 所运行线程,是当前Observable 所对应的 Subscriber,即它的直接下级 Subscriber。
    • subscribeOn() 只有第一个有效,subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程
    • 与 Subscriber.onStart() 相对应的,有一个方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程


      两个 lift() 原理
      多次 lift() 原理

    相关文章

      网友评论

          本文标题:RxJava1 原理

          本文链接:https://www.haomeiwen.com/subject/cbcejqtx.html