美文网首页
Rxjava(五)之背压模式

Rxjava(五)之背压模式

作者: 梦星夜雨 | 来源:发表于2021-01-27 14:12 被阅读0次

前言

我们在上面的篇幅讲解了Rxjava的操作符的使用,那么这篇文章我们将讲解Rxjava的背压模式。
在Rxjava1.0的时候还没有背压模式,当我们被观察者大量发送事件,远远大于观察者处理事件的速度的时候,会造成内存溢出。这时候背压模式就产生了。

背压模式的代码实现

Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> e) throws Exception {
                for (int i = 0; i < 200; i++) {
                    e.onNext(i);
                }
                e.onComplete();
            }
        },
        BackpressureStrategy.ERROR
//        BackpressureStrategy.BUFFER
//        BackpressureStrategy.DROP
//        BackpressureStrategy.LATEST
        ).
        .subscribeOn(Schedulers.io())
        .observeOn(AndroidSchedulers.mainThread())
        subscribe(new Subscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                s.request(3);
            }

            @Override
            public void onNext(Integer integer) {
                try {
                    Thread.currentThread().sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                Log.d(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(Throwable t) {
                Log.d(TAG, "onError: " + t.getMessage());
            }

            @Override
            public void onComplete() {
                Log.d(TAG, "onComplete: ");
            }
        });

这里有一个BackpressureStrategy参数,一共有四种MODE。
BackpressureStrategy.ERROR
被观察者发送大量事件,当观察者处理不过来时,就放入缓存池,如果缓存池满了,就会抛出异常。
BackpressureStrategy.BUFFER
被观察者发送大量事件,当观察者处理不过来时,就放入缓存池,如果缓存池满了,就会等待下游处理。
BackpressureStrategy.DROP
被观察者发送大量事件,当观察者处理不过来时,就放入缓存池,如果缓存池满了,就会丢弃多余事件。
BackpressureStrategy.LATEST
被观察者发送大量事件,当观察者处理不过来时,就放入缓存池,只会存储128个事件。
这里需要注意的是,缓存池大小就是128。

我们运行上述代码,会发现程序直接报错了(onError: create: could not emit value due to lack of requests)。因为我们采用了BackpressureStrategy.ERROR模式,并且发送的事件大于128。
我们将发送的事件改为100,然后在被观察者的onSubscribe方法中请求了三次,这时就会打印三次。结果如下图:

onNext: 0
onNext: 1
onNext: 2

然后我们在请求数据的时候新建一个线程:

s.request(3);
new Thread(new Runnable() {
    @Override
    public void run() {
        try {
            Thread.sleep(5000);
            Log.d(TAG,"Thread request");
        } catch (InterruptedException e) {
            e.printStackTrace();
        }
        s.request(3);
    }
}).start();
onNext: 0
onNext: 1
onNext: 2
Thread request
onNext: 3
onNext: 4
onNext: 5

这时候从结果可以看出,当我们采用异步线程时,我们可以在线程中请求数据,并且可以多次请求。
注意:异步线程切换时,被观察者的线程只会切换一次,而观察者的线程每次都会切换。

总结:
Flowable就是Observable的升级版,用法大致相同,他的主要不同点有以下几点:
1.Flowable在create方法中多了一个BackpressureStrategy参数,一共有四种MODE。
2.Observable有一个切断被观察者和观察者dispose方法,而在Flowable方法中这是采用request方法请求被观察者的事件。

相关文章

  • Rxjava(五)之背压模式

    前言 我们在上面的篇幅讲解了Rxjava的操作符的使用,那么这篇文章我们将讲解Rxjava的背压模式。在Rxjav...

  • RxJava2背压管理策略实例解析

    1、前言: 在使用RxJava 的背压时,遇到了很多困扰。本文主要是针对RxJava背压策略的5种模式下,观察者和...

  • RxJava2--Flowable与BackPress

    转载自:Rxjava2入门教程五:Flowable背压支持——对Flowable最全面而详细的讲解 背压介绍 当上...

  • Rxjava(六)主线流程分析之create和map流程

    前言 我们在上面的篇幅讲了Rxjava的使用、操作符、背压模式,这篇文章,我们将对Rxjava主线流程进行分析。 ...

  • RxJava之背压策略

    转载请以链接形式标明出处:本文出自:103style的博客 本文基于 RxJava 2.x 版本 目录 RxJav...

  • RxJava背压

    订阅分为:同步订阅 异步订阅 同步订阅Rxjava1与Rxjava2中 同步订阅没有用到缓冲区,只要上游事件数量不...

  • RxJava背压

    RxJava 当我们在对RxJava从1.0版本升级到2.0版本的时候,我们发现RxJava2.0增加了一个被观察...

  • Rxjava的背压

    一、什么是背压? 被观察者与观察者通过subscribe()订阅并使用异步后,一般上游发射一条数据下游就会...

  • Rxjava系列(七) RxJava2.0背压原理解析

    RxJava2.0有一个很大的特色是背压的支持,如果要使用背压的话需要使用 Flowable。为什么需要背压这种机...

  • RxJava基础总结

    1.本文仅基于RxJava2.0、Retrofit2.0(引入背压) 当下Rxjava,Retrofit已成项目标...

网友评论

      本文标题:Rxjava(五)之背压模式

      本文链接:https://www.haomeiwen.com/subject/niefzktx.html