part06_Rxjava背压原理

作者: IT魔幻师 | 来源:发表于2018-08-30 20:09 被阅读20次

作者:IT魔幻师
博客:www.huyingzi.top
转载请注明出处:https://www.jianshu.com/p/23f74055e999


一、RxJava1与RxJava2 对比

  • RxJava 2x 不再支持 null 值,如果传入一个null会抛出 NullPointerException

  • Observable.just(null)(不支持)

  • RxJava2 所有的函数接口(Function/Action/Consumer)均设计为可抛出Exception,自己去解决编译异常需要转换问题。

  • RxJava1 中Observable不能很好支持背压,在RxJava2 中将Oberservable实现成不支持背压,而新增Flowable 来支持背压

二、背压

事件上游产生的事件高于事件下游消费的事件导致内存不断扩大
rxjava1并没有对这个的解决方案
rxjava2
添加了一个新的被观察者角色操作符Flowable所有的Observable操作都可以用Flowable替换

什么时候用 Observable:
一般处理最大不超过1000条数据,并且几乎不会出现内存溢出
如果式 鼠标事件,频率不超过1000 Hz,基本上不会背压;

什么时候用 Flowable:
处理以某种方式产生超过10K的元素;
文件读取与分析,例如 读取指定行数的请求;网络IO流;
有很多的阻塞和/或 基于拉取的数据源,但是又想得到一个响应式非阻塞接口的。

三、背压策略

1.BackpressureStrategy.ERROR:若上游发送事件速度超出下游处理事件能力,且事件缓存池已满,则抛出异常
//阻塞时队列
2.BackpressureStrategy.BUFFER:若上游发送事件速度超出下游处理能力,则把事件存储起来等待下游处理
3.BackpressureStrategy.DROP:若上游发送事件速度超出下游处理能力,事件缓存池满了后将之后发送的事件丢弃
4.BackpressureStrategy.LATEST:若上有发送时间速度超出下游处理能力,则只存储最新的128个事件

四、Flowable的使用

   @Test
    public void testFlowable() {
        Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 1000000; i++) {
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR).subscribe(new FlowableSubscriber<Integer>() {
            @Override
            public void onSubscribe(Subscription s) {
                //使用Flowable需要在此处给其一个最大的事件处理能力
                //设置为最大的处理能力
                s.request(500);
//                s.request(Integer.MAX_VALUE);
            }

            @Override
            public void onNext(Integer integer) {
                //模拟处理
                try {
                    Thread.currentThread().sleep(5);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
                System.out.println("处理事件:"+integer);

            }

            @Override
            public void onError(Throwable t) {

            }

            @Override
            public void onComplete() {

            }
        });
    }

相关文章

  • part06_Rxjava背压原理

    作者:IT魔幻师博客:www.huyingzi.top转载请注明出处:https://www.jianshu.co...

  • RxJava2

    Map和FlatMap的区别 如何实现线程并发 subscribeOn和observeOn 背压的原理

  • 背压

    背压的含义就是指在一个典型的生产者消费者模型下,生产者生产数据的速度超过了消费者消费的速度导致的问题。 RxJS ...

  • 背压

    Back Pressure是流处理系统中,非常经典常见的问题,它是让流系统能对压力变化能够呈现良好抗压性的关键所在...

  • RxJava源码分析(二)基本的数据流分析(有背压)

    引言 上篇文章中,我们了解了RxJava基本的无背压数据流实现原理,本篇我们依然从案例着手,学习有背压下数据流响应...

  • RxJava背压

    订阅分为:同步订阅 异步订阅 同步订阅Rxjava1与Rxjava2中 同步订阅没有用到缓冲区,只要上游事件数量不...

  • RxJava背压

    RxJava 当我们在对RxJava从1.0版本升级到2.0版本的时候,我们发现RxJava2.0增加了一个被观察...

  • 背压-BackPressure

    2016-09-23 14:00 最近更新了RxJava2,看到了新增的Flowable支持背压。什么是背压呢? ...

  • JAVA背压

    Reactive Streams:一种支持背压的异步数据流处理标准,主流实现有RxJava和Reactor,Spr...

  • RxJava<第七篇>:用RxJava实现Event

    (1)没有压背处理的Rxbus 注册: 发送数据: (2)有压背处理的Rxbus

网友评论

    本文标题:part06_Rxjava背压原理

    本文链接:https://www.haomeiwen.com/subject/cmtaiftx.html