美文网首页
RxJava2.0背压与不背压

RxJava2.0背压与不背压

作者: CHSmile | 来源:发表于2017-05-12 10:14 被阅读2442次
    RxJava是什么?

    一个基于观察者模式(事件流)的异步任务库。可以很简洁地完成一个异步任务,当任务复杂时也能清晰地表达逻辑。GitHub地址。,具体的一些理论可以查看抛物线
    这边文章《给 Android 开发者的 RxJava 详解》,很好的入门教程。

    基本使用

    在RxJava2.0中,把背压和非背压分两种观察者模式。
    背压:事件产生的速度远远快于事件消费的速度,最终导致数据积累越来越多,从而导致OOM等异常。

    1、非背压

     /**
     * 非背压
     * Observable对应Observer
     */
    private void createObservable() {
        //被观察者
        Observable<String> observable = Observable.create(new ObservableOnSubscribe<String>() {
            @Override
            public void subscribe(ObservableEmitter<String> e) throws Exception {
                e.onNext("This");
                e.onNext("is");
                e.onNext("RxJava");
                e.onComplete();
            }
        });
        //观察者
        Observer<String> observer = new Observer<String>() {
            Disposable disposable;
    
            @Override
            public void onSubscribe(Disposable d) {
                disposable = d;
            }
    
            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext: " + s);
            }
    
            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError: " + e.getLocalizedMessage());
            }
    
            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete");
                //取消订阅
                if (!disposable.isDisposed()) {
                    disposable.dispose();
                }
            }
        };
        observable.subscribe(observer);
    }
    

    2、背压

    /**
     * 背压(在异步过程中,由于被观察者发射数据过快,而观察者处理数据不及时,
     * 导致内存里堆积了太多数据,从而OOM,可以选择不同的策略处理该问题)
     * Flowable对应subscriber
     */
    private void createFlowable() {
        Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() {
            @Override
            public void subscribe(FlowableEmitter<String> e) throws Exception {
                if (!e.isCancelled()) {
                    e.onNext("This");
                    e.onNext("is");
                    e.onNext("RxJava");
                    e.onComplete();
                }
            }
            //抛弃策略
        }, BackpressureStrategy.DROP);
    
        Subscriber<String> subscriber = new Subscriber<String>() {
            Subscription subscription;
            @Override
            public void onSubscribe(Subscription s) {
                subscription = s;
                //请求一个数据
                subscription.request(1);
            }
    
            @Override
            public void onNext(String s) {
                Log.i(TAG, "onNext: " + s);
                //处理完后,再请求一个数据
                subscription.request(1);
            }
    
            @Override
            public void onError(Throwable e) {
                Log.i(TAG, "onError: " + e.getLocalizedMessage());
            }
    
            @Override
            public void onComplete() {
                Log.i(TAG, "onComplete");
                //取消订阅
                subscription.cancel();
            }
        };
        flowable.subscribe(subscriber);
    }
    

    相关文章

      网友评论

          本文标题:RxJava2.0背压与不背压

          本文链接:https://www.haomeiwen.com/subject/pmrztxtx.html