美文网首页
RxJava2.x-Flowable创建

RxJava2.x-Flowable创建

作者: 河马过河 | 来源:发表于2018-08-23 17:58 被阅读298次

    一、Flowable创建

     public Flowable<Integer> getRxJavaFlowableData() {
            return Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 1);
                    emitter.onNext(1);
    
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 2);
                    emitter.onNext(2);
    //                Thread.sleep(5000);
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 3);
                    emitter.onNext(3);
                    emitter.onComplete();
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 4);
                    emitter.onNext(4);
                }
            }, BackpressureStrategy.ERROR);
        }
     public void rxJavaFlowableCreateExample() {
            model.getRxJavaFlowableData()
                    .subscribe(new FlowableSubscriber<Integer>() {
                        @Override
                        public void onSubscribe(Subscription s) {
                            LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onSubscribe-:" + "onSubscribe");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onError-:" + "onError--:0" + t.toString());
                        }
    
                        @Override
                        public void onComplete() {
                            LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onComplete-:" + "onComplete");
                        }
                    });
        }
    
    

    日志

    08-23 10:47:55.032 4992-4992/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onSubscribe-:onSubscribe
    08-23 10:47:55.032 4992-4992/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:1
    08-23 10:47:55.034 4992-4992/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onError-:onError--:0io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
    08-23 10:47:55.034 4992-4992/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:2
        getRxJavaFlowableData---:main--:3
    08-23 10:47:55.035 4992-4992/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:4
    

    总结

    1、当下游没有通知上游,自己有处理数据的能力时,上游发送数据会直接报错,也就是著名的背压错误MissingBackpressureException
    2、被观察者则继续发送数据到发送完毕

    二、Subscriber发送能处理数据的能力时

    
        public void rxJavaFlowableCreateExample() {
    
            model.getRxJavaFlowableData()
                    .subscribe(new FlowableSubscriber<Integer>() {
                        Subscription  subscription;
                        @Override
                        public void onSubscribe(Subscription s) {
                            subscription= s;
                            subscription.request(Integer.MAX_VALUE);
                            LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onSubscribe-:" + "onSubscribe");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onError-:" + "onError--:0" + t.toString());
                        }
    
                        @Override
                        public void onComplete() {
                            LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onComplete-:" + "onComplete");
                        }
                    });
        }
    
    

    日志

    08-23 11:33:56.631 11312-11312/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onSubscribe-:onSubscribe
    08-23 11:33:56.632 11312-11312/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:1
    08-23 11:33:56.633 11312-11312/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:1
    08-23 11:33:56.633 11312-11312/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:2
    08-23 11:33:56.633 11312-11312/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:2
    08-23 11:33:56.633 11312-11312/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:3
    08-23 11:33:56.633 11312-11312/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:3
        rxJavaFlowableCreateExample--:main-onComplete-:onComplete
    08-23 11:33:56.633 11312-11312/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:4
    

    总结

    1、当下游通知上游,自己有处理数据的能力时,只要上游发送的数据不超过下游能力,就不会报错

    日志(当request的数据是2时)

    08-23 11:36:58.318 12001-12001/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onSubscribe-:onSubscribe
    08-23 11:36:58.318 12001-12001/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:1
    08-23 11:36:58.320 12001-12001/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:1
    08-23 11:36:58.320 12001-12001/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:2
    08-23 11:36:58.320 12001-12001/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:2
    08-23 11:36:58.320 12001-12001/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:3
    08-23 11:36:58.321 12001-12001/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onError-:onError--:0io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
    08-23 11:36:58.321 12001-12001/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:4
    
    

    2、 emitter.onComplete(); 发送的complete事件不算request数据中

    三、异步创建Flowable

      public void rxJavaFlowableCreateExample() {
            model.getRxJavaFlowable128Data()
                    .subscribeOn(Schedulers.io())
                    .unsubscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new FlowableSubscriber<Integer>() {
                        Subscription subscription;
    
                        @Override
                        public void onSubscribe(Subscription s) {
    //                        subscription= s;
    //                        subscription.request(3);
                            LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onSubscribe-:" + "onSubscribe");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onError-:" + "onError--:0" + t.toString());
                        }
    
                        @Override
                        public void onComplete() {
                            LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onComplete-:" + "onComplete");
                        }
                    });
        }
    

    日志

    08-23 16:42:44.141 21570-21570/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onSubscribe-:onSubscribe
    08-23 16:42:44.145 21570-23129/com.example.zhang D/MainModel: getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:0
    08-23 16:42:44.146 21570-23129/com.example.zhang D/MainModel: getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:1
        getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:2
        getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:3
        getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:4
        getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:5
        getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:6
        getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:7
    08-23 16:42:44.147 21570-23129/com.example.zhang D/MainModel: getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:8
        getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:9
        getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:10
        getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:11
        getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:12
        getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:13
    ................................................................................................................
    08-23 16:42:44.154 21570-23129/com.example.zhang D/MainModel: getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:123
        getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:124
        getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:125
        getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:126
        getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:127
    
    

    总结

    1、异步的情况下,上游被观察者和下游观察者之前有个水缸,大小128
    2、所以,当上游发送数据,下游不接受的情况下,能发送128条数据不报MissingBackpressureException

    日志(当发送129条数据)

    ................................................................................................................
      getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:125
        getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:126
        getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:127
        getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:128
    08-23 16:49:16.869 24202-24202/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onError-:onError--:0io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
    
    

    四、验证requested大小

    1、当同一个线程下时

     public Flowable<Integer> getRxJavaFlowableData() {
            return Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 1);
                    emitter.onNext(1);
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 2);
                    emitter.onNext(2);
    //                Thread.sleep(5000);
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 3);
                    emitter.onNext(3);
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:onComplete");
                    emitter.onComplete();
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 4);
                    emitter.onNext(4);
                }
            }, BackpressureStrategy.ERROR);
        }
    public void rxJavaFlowableSizeExample() {
            model.getRxJavaFlowableData()
                    .subscribe(new FlowableSubscriber<Integer>() {
                        @Override
                        public void onSubscribe(Subscription s) {
                            LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onSubscribe-:");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onError-:" + t.toString());
                        }
    
                        @Override
                        public void onComplete() {
                            LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onComplete-:");
                        }
                    });
        }
    
    

    日志

    08-24 14:29:06.026 22382-22382/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onSubscribe-:
    08-24 14:29:06.026 22382-22382/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:0
        getRxJavaFlowableData---:main--:1
    08-24 14:29:06.028 22382-22382/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onError-:io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
    08-24 14:29:06.028 22382-22382/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:0
        getRxJavaFlowableData---:main--:2
        getRxJavaFlowableData---:main-request-:0
        getRxJavaFlowableData---:main--:3
        getRxJavaFlowableData---:main-request-:0
        getRxJavaFlowableData---:main--:onComplete
        getRxJavaFlowableData---:main-request-:0
        getRxJavaFlowableData---:main--:4
    

    总结

    1、当在同一个线程时,当下游观察者未请求数据时,requested为0

      public void rxJavaFlowableSizeExample() {
            model.getRxJavaFlowableData()
                    .subscribe(new FlowableSubscriber<Integer>() {
                        Subscription s;
    
                        @Override
                        public void onSubscribe(Subscription s) {
                            this.s = s;
                            s.request(100);
                            LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onSubscribe-:");
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
    
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onError-:" + t.toString());
                        }
    
                        @Override
                        public void onComplete() {
                            LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onComplete-:");
                        }
                    });
        }
    
    

    日志

    08-24 14:32:15.405 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onSubscribe-:
    08-24 14:32:15.405 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:100
        getRxJavaFlowableData---:main--:1
    08-24 14:32:15.406 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:1
    08-24 14:32:15.406 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:99
        getRxJavaFlowableData---:main--:2
    08-24 14:32:15.406 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:2
    08-24 14:32:15.406 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:98
        getRxJavaFlowableData---:main--:3
    08-24 14:32:15.406 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:3
    08-24 14:32:15.406 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:97
        getRxJavaFlowableData---:main--:onComplete
    08-24 14:32:15.406 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onComplete-:
    08-24 14:32:15.406 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:97
        getRxJavaFlowableData---:main--:4
    
    

    总结

    1、下游观察者 s.request(100),改变requested值

    2、当在异步线程时

       public Flowable<Integer> getRxJavaFlowableData() {
            return Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 1);
                    emitter.onNext(1);
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 2);
                    emitter.onNext(2);
    //                Thread.sleep(5000);
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 3);
                    emitter.onNext(3);
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:onComplete");
                    emitter.onComplete();
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 4);
                    emitter.onNext(4);
                }
            }, BackpressureStrategy.ERROR);
        }
    
        public Flowable<Integer> getRxJavaFlowable128Data() {
            return Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; i < 129; i++) {
                        LogUtils.debug(TAG, "getRxJavaFlowable128Data---:" + Thread.currentThread().getName() + "--:" + i);
                        emitter.onNext(i);
                    }
                }
            }, BackpressureStrategy.ERROR);
        }
      public void rxJavaFlowableSizeExample() {
            model.getRxJavaFlowableData()
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new FlowableSubscriber<Integer>() {
                        Subscription s;
    
                        @Override
                        public void onSubscribe(Subscription s) {
                            this.s = s;
                            s.request(100);
                            LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onSubscribe-:");
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
    
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onError-:" + t.toString());
                        }
    
                        @Override
                        public void onComplete() {
                            LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onComplete-:");
                        }
                    });
        }
    

    日志

    08-24 14:48:05.367 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onSubscribe-:
    08-24 14:48:05.374 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:128
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:1
    08-24 14:48:05.376 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:127
    08-24 14:48:05.376 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:1
    08-24 14:48:05.376 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:2
    08-24 14:48:05.377 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:126
    08-24 14:48:05.377 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:2
    08-24 14:48:05.377 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:3
        getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:125
    08-24 14:48:05.377 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:3
    08-24 14:48:05.377 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:onComplete
    08-24 14:48:05.377 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onComplete-:
    08-24 14:48:05.377 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:125
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:4
    
    

    总结

    1、requested 是128 而不是100,这是被观察者里面有个''水缸'' ,大小是128
    2、被观察者每发送一条数据,requested减少1
    3、onComplete(),onError()不减少requested值

    3、当在异步线程时,什么情况下触发被观察者继续发送数据

     public Flowable<Integer> getRxJavaFlowableRealExample() {
            return Flowable.create(new FlowableOnSubscribe<Integer>() {
                @Override
                public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                    for (int i = 0; ; i++) {
                             if (emitter.isCancelled()) {
                                break;
                            }
                        while (emitter.requested() == 0) {
                         
                        }
                        LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-emit:" + emitter.requested());
                        LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + i);
                        emitter.onNext(i);
                    }
    
                }
            }, BackpressureStrategy.ERROR);
        }
     public void rxJavaFlowableRealExample() {
            model.getRxJavaFlowableRealExample()
                    .subscribeOn(Schedulers.io())
                    .unsubscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new FlowableSubscriber<Integer>() {
                        @Override
                        public void onSubscribe(Subscription s) {
    //                        s.request(96);
                            LogUtils.error(TAG, "rxJavaFlowableRealExample--:" + Thread.currentThread().getName() + "-onSubscribe-:");
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            LogUtils.error(TAG, "rxJavaFlowableRealExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
                        }
    
                        @Override
                        public void onError(Throwable t) {
                            LogUtils.error(TAG, "rxJavaFlowableRealExample--:" + Thread.currentThread().getName() + "-onError-:" + t.toString());
                        }
    
                        @Override
                        public void onComplete() {
                            LogUtils.error(TAG, "rxJavaFlowableRealExample--:" + Thread.currentThread().getName() + "-onComplete-:");
                        }
                    });
        }
    

    日志

    08-24 16:36:38.599 6546-6546/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onSubscribe-:
    08-24 16:36:38.606 6546-8058/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:128
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:0
    08-24 16:36:38.607 6546-8058/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:127
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:1
        getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:126
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:2
        getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:125
    。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
     getRxJavaFlowableData---:RxCachedThreadScheduler-1--:122
        getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:5
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:123
        getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:4
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:124
        getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:3
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:125
    08-24 16:36:38.621 6546-8058/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:2
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:126
        getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:1
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:127
    

    总结

    1、当观察者没有request时,只是被观察者把128条数据发送完毕,观察者并未接受到数据
    2、当观察者request(95)时,观察者接受到95条数据,但并没有触发被观察者继续发送数据,也就是并未改变水缸的size值即requested

    日志

    08-24 16:20:38.610 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onSubscribe-:
    08-24 16:20:38.617 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:128
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:0
    08-24 16:20:38.619 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:127
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:1
    。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
    08-24 16:20:38.624 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:0
    08-24 16:20:38.624 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:29
    08-24 16:20:38.624 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:1
    08-24 16:20:38.624 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:98
    08-24 16:20:38.624 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:2
    08-24 16:20:38.625 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:30
    08-24 16:20:38.625 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:3
        rxJavaFlowableRealExample--:main-onNext-:4
    。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
    08-24 16:20:38.638 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:93
    08-24 16:20:38.638 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:100
    08-24 16:20:38.638 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:94
    08-24 16:20:38.638 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:27
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:101
    08-24 16:20:38.639 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:26
    。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
       getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:3
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:125
        getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:2
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:126
        getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:1
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:127
    

    3、当观察者request(96)时,观察者接受到96条数据,同时触发被观察者继续发送数据,改变水缸的size值

    日志

    08-24 16:23:23.690 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onSubscribe-:
    08-24 16:23:23.696 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:128
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:0
    08-24 16:23:23.697 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:127
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:1
        getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:126
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:2
    08-24 16:23:23.697 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:0
    08-24 16:23:23.697 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:125
    。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
    08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:97
    08-24 16:23:23.701 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:29
    08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:31
    08-24 16:23:23.701 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:30
    08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:96
    08-24 16:23:23.701 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:31
    08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:32
        getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:95
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:33
        getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:94
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:34
    08-24 16:23:23.701 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:32
    08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:93
    。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
    
    08-24 16:23:23.710 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:94
        getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:33
    08-24 16:23:23.710 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:94
    08-24 16:23:23.710 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:95
        getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:32//①此时requested值为32
    08-24 16:23:23.710 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:95//②此时观察者又处理了一条数据,到达处理总值96
    08-24 16:23:23.710 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:96//③此时被观察者又发送了一条数据
        getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:127//④此时取得的requested值为32+96-1 为何需要减去1:(刚才又发送了一条数据)
    。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
        getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:3
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:221
        getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:2
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:222
        getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:1
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:223
    
    08-24 15:38:47.430 31071-31193/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:30
    08-24 15:38:47.430 31071-31071/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:95
    08-24 15:38:47.430 31071-31193/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:98
        getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:125
        getRxJavaFlowableData---:RxCachedThreadScheduler-1--:99
    
    河马过河微信公众号.jpg

    4、异步线程-consumer

    ···
    public Flowable<Integer> getRxJavaFlowableRealExample() {
    return Flowable.create(new FlowableOnSubscribe<Integer>() {
    @Override
    public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
    for (int i = 0; ; i++) {
    if (emitter.isCancelled()) {
    break;
    }
    while (emitter.requested() == 0) {
    }
    LogUtils.debug(TAG, "getRxJavaFlowableRealExample---:" + Thread.currentThread().getName() + "-request-emit:" + emitter.requested());
    LogUtils.debug(TAG, "getRxJavaFlowableRealExample---:" + Thread.currentThread().getName() + "--:" + i);
    emitter.onNext(i);
    }

            }
        }, BackpressureStrategy.ERROR);
    }
    public void rxJavaFlowableConsumeExample() {
        model.getRxJavaFlowableRealExample()
                .subscribeOn(Schedulers.io())
                .unsubscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .compose(RxLifeCycleUtils.<Integer>bindUntilEvent(view, ActivityEvent.DESTROY))
                .subscribe(new Consumer<Integer>() {
                    @Override
                    public void accept(Integer integer) throws Exception {
                        LogUtils.error(TAG, "rxJavaFlowableConsumeExample--:" + Thread.currentThread().getName() + "-Consumer-:" + integer);
                    }
                });
    
    }
    

    ···

    日志

    ....................................................
    2019-02-12 17:53:21.381 29563-29563/com.example.zhang E/RxJavaPresenter: rxJavaFlowableConsumeExample--:main-Consumer-:67323
    2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67324
    2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3-request-emit:99
    2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67331
    2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67333
    2019-02-12 17:53:21.381 29563-29563/com.example.zhang E/RxJavaPresenter: rxJavaFlowableConsumeExample--:main-Consumer-:67339
    2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67342
    2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67344
    2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3-request-emit:78
    2019-02-12 17:53:21.382 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67346
    

    总结

    当观察者是consume时,会持续请求request

    相关文章

      网友评论

          本文标题:RxJava2.x-Flowable创建

          本文链接:https://www.haomeiwen.com/subject/aalpiftx.html