美文网首页
Rxjava 2.0

Rxjava 2.0

作者: 小浩_w | 来源:发表于2019-01-11 15:18 被阅读0次

添加依赖

    //RxJava的依赖包(我使用的最新版本)
    implementation 'io.reactivex.rxjava2:rxjava:2.0.1'
    //RxAndroid的依赖包
    implementation 'io.reactivex.rxjava2:rxandroid:2.0.1'
  • Observable(被观察者)/Observer(观察者)
  • Flowable(被观察者)/Subscriber(观察者)

RxJava2.X中,Observeable用于订阅Observer,是不支持背压的,而Flowable用于订阅Subscriber,是支持背压(Backpressure)

  • 背压是指在异步场景中,被观察者发送事件速度远快于观察者的处理速度的情况下,一种告诉上游的被观察者降低发送速度的策略

Observable/Observer

Observable正常用法:

  Observable mObservable=Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> e) throws Exception {
                e.onNext(1);
                e.onNext(2);
                e.onComplete();
            }
        });
        
Observer mObserver=new Observer<Integer>() {
            //这是新加入的方法,在订阅后发送数据之前,
            //回首先调用这个方法,而Disposable可用于取消订阅
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onNext(Integer value) {

            }

            @Override
            public void onError(Throwable e) {

            }

            @Override
            public void onComplete() {

            }
        };
        
mObservable.subscribe(mObserver);

这种观察者模型是不支持背压的。
当被观察者快速发送大量数据时,下游不会做其他处理,即使数据大量堆积,调用链也不会报MissingBackpressureException,消耗内存过大只会OOM
我在测试的时候,快速发送了50000个整形数据,下游延迟接收,结果被观察者的数据全部发送出去了,内存确实明显增加了,但是没有OOM。
使用Observable/Observer的时候,需要考虑的是,数据量是不是很大(官方给出以1000个事件为分界线,仅供参考)

Flowable/Subscriber

Flowable.range(0,10)
        .subscribe(new Subscriber<Integer>() {
            Subscription sub;
            //当订阅后,会首先调用这个方法,其实就相当于onStart(),
            //传入的Subscription s参数可以用于请求数据或者取消订阅
            @Override
            public void onSubscribe(Subscription s) {
                Log.w("TAG","onsubscribe start");
                sub=s;
                sub.request(1);
                Log.w("TAG","onsubscribe end");
            }

            @Override
            public void onNext(Integer o) {
                Log.w("TAG","onNext--->"+o);
                sub.request(1);
            }
            @Override
            public void onError(Throwable t) {
                t.printStackTrace();
            }
            @Override
            public void onComplete() {
                Log.w("TAG","onComplete");
            }
        });

Flowable是支持背压的

       Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> emitter) throws Exception {
                for (int i = 0; i < 10000; i++) {
                    emitter.onNext(i+"for");
                }
            }
        }, BackpressureStrategy.BUFFER).observeOn(AndroidSchedulers.mainThread())
                .observeOn(Schedulers.io())
                .subscribe(new FlowableSubscriber<String>() {
                    @Override
                    public void onSubscribe(Subscription s) {

                    }

                    @Override
                    public void onNext(String s) {

                    }

                    @Override
                    public void onError(Throwable t) {

                    }

                    @Override
                    public void onComplete() {

                    }
                });

Flowable虽然可以通过create()来创建,但是你必须指定背压的策略,以保证你创建的Flowable是支持背压的(这个在1.0的时候就很难保证,可以说RxJava2.0收紧了create()的权限)。
根据上面的代码的结果输出中可以看到,当我们调用subscription.request(n)方法的时候,不等onSubscribe()中后面的代码执行,就会立刻执行到onNext方法,因此,如果你在onNext方法中使用到需要初始化的类时,应当尽量在subscription.request(n)这个方法调用之前做好初始化的工作;
当然,这也不是绝对的,我在测试的时候发现,通过create()自定义Flowable的时候,即使调用了subscription.request(n)方法,也会等onSubscribe()方法中后面的代码都执行完之后,才开始调用onNext。

尽可能确保在request()之前已经完成了所有的初始化工作,否则就有空指针的风险。

其他观察者模式

  • Single/SingleObserver
  • Completable/CompletableObserver
  • Maybe/MaybeObserver

其实这三者都差不多,Maybe/MaybeObserver可以说是前两者的复合体,因此以Maybe/MaybeObserver为例简单介绍一下这种观察者模式的用法

//判断是否登陆
Maybe.just(isLogin())
    //可能涉及到IO操作,放在子线程
    .subscribeOn(Schedulers.newThread())
    //取回结果传到主线程
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new MaybeObserver<Boolean>() {
            @Override
            public void onSubscribe(Disposable d) {

            }

            @Override
            public void onSuccess(Boolean value) {
                if(value){
                    ...
                }else{
                    ...
                }
            }

            @Override
            public void onError(Throwable e) {
                
            }

            @Override
            public void onComplete() {

            }
        });

上面就是Maybe/MaybeObserver的普通用法,你可以看到,实际上,这种观察者模式并不用于发送大量数据,而是发送单个数据,也就是说,当你只想要某个事件的结果(true or false)的时候,你可以用这种观察者模式

相关文章

网友评论

      本文标题:Rxjava 2.0

      本文链接:https://www.haomeiwen.com/subject/xjotdqtx.html