part06_Rxjava背压原理

作者: IT魔幻师 | 来源:发表于2018-08-30 20:09 被阅读20次

    作者:IT魔幻师
    博客:www.huyingzi.top
    转载请注明出处:https://www.jianshu.com/p/23f74055e999


    一、RxJava1与RxJava2 对比

    • RxJava 2x 不再支持 null 值,如果传入一个null会抛出 NullPointerException

    • Observable.just(null)(不支持)

    • RxJava2 所有的函数接口(Function/Action/Consumer)均设计为可抛出Exception,自己去解决编译异常需要转换问题。

    • RxJava1 中Observable不能很好支持背压,在RxJava2 中将Oberservable实现成不支持背压,而新增Flowable 来支持背压

    二、背压

    事件上游产生的事件高于事件下游消费的事件导致内存不断扩大
    rxjava1并没有对这个的解决方案
    rxjava2
    添加了一个新的被观察者角色操作符Flowable所有的Observable操作都可以用Flowable替换

    什么时候用 Observable:
    一般处理最大不超过1000条数据,并且几乎不会出现内存溢出
    如果式 鼠标事件,频率不超过1000 Hz,基本上不会背压;

    什么时候用 Flowable:
    处理以某种方式产生超过10K的元素;
    文件读取与分析,例如 读取指定行数的请求;网络IO流;
    有很多的阻塞和/或 基于拉取的数据源,但是又想得到一个响应式非阻塞接口的。

    三、背压策略

    1.BackpressureStrategy.ERROR:若上游发送事件速度超出下游处理事件能力,且事件缓存池已满,则抛出异常
    //阻塞时队列
    2.BackpressureStrategy.BUFFER:若上游发送事件速度超出下游处理能力,则把事件存储起来等待下游处理
    3.BackpressureStrategy.DROP:若上游发送事件速度超出下游处理能力,事件缓存池满了后将之后发送的事件丢弃
    4.BackpressureStrategy.LATEST:若上有发送时间速度超出下游处理能力,则只存储最新的128个事件

    四、Flowable的使用

       @Test
        public void testFlowable() {
            Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; i < 1000000; i++) {
                        emitter.onNext(i);
                    }
                }
            }, BackpressureStrategy.ERROR).subscribe(new FlowableSubscriber<Integer>() {
                @Override
                public void onSubscribe(Subscription s) {
                    //使用Flowable需要在此处给其一个最大的事件处理能力
                    //设置为最大的处理能力
                    s.request(500);
    //                s.request(Integer.MAX_VALUE);
                }
    
                @Override
                public void onNext(Integer integer) {
                    //模拟处理
                    try {
                        Thread.currentThread().sleep(5);
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    System.out.println("处理事件:"+integer);
    
                }
    
                @Override
                public void onError(Throwable t) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
        }
    

    相关文章

      网友评论

        本文标题:part06_Rxjava背压原理

        本文链接:https://www.haomeiwen.com/subject/cmtaiftx.html