美文网首页
RxJava2.x-Flowable创建

RxJava2.x-Flowable创建

作者: 河马过河 | 来源:发表于2018-08-23 17:58 被阅读298次

一、Flowable创建

 public Flowable<Integer> getRxJavaFlowableData() {
        return Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 1);
                emitter.onNext(1);

                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 2);
                emitter.onNext(2);
//                Thread.sleep(5000);
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 3);
                emitter.onNext(3);
                emitter.onComplete();
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 4);
                emitter.onNext(4);
            }
        }, BackpressureStrategy.ERROR);
    }
 public void rxJavaFlowableCreateExample() {
        model.getRxJavaFlowableData()
                .subscribe(new FlowableSubscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onSubscribe-:" + "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onError-:" + "onError--:0" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onComplete-:" + "onComplete");
                    }
                });
    }

日志

08-23 10:47:55.032 4992-4992/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onSubscribe-:onSubscribe
08-23 10:47:55.032 4992-4992/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:1
08-23 10:47:55.034 4992-4992/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onError-:onError--:0io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
08-23 10:47:55.034 4992-4992/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:2
    getRxJavaFlowableData---:main--:3
08-23 10:47:55.035 4992-4992/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:4

总结

1、当下游没有通知上游,自己有处理数据的能力时,上游发送数据会直接报错,也就是著名的背压错误MissingBackpressureException
2、被观察者则继续发送数据到发送完毕

二、Subscriber发送能处理数据的能力时


    public void rxJavaFlowableCreateExample() {

        model.getRxJavaFlowableData()
                .subscribe(new FlowableSubscriber<Integer>() {
                    Subscription  subscription;
                    @Override
                    public void onSubscribe(Subscription s) {
                        subscription= s;
                        subscription.request(Integer.MAX_VALUE);
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onSubscribe-:" + "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onError-:" + "onError--:0" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onComplete-:" + "onComplete");
                    }
                });
    }

日志

08-23 11:33:56.631 11312-11312/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onSubscribe-:onSubscribe
08-23 11:33:56.632 11312-11312/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:1
08-23 11:33:56.633 11312-11312/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:1
08-23 11:33:56.633 11312-11312/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:2
08-23 11:33:56.633 11312-11312/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:2
08-23 11:33:56.633 11312-11312/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:3
08-23 11:33:56.633 11312-11312/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:3
    rxJavaFlowableCreateExample--:main-onComplete-:onComplete
08-23 11:33:56.633 11312-11312/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:4

总结

1、当下游通知上游,自己有处理数据的能力时,只要上游发送的数据不超过下游能力,就不会报错

日志(当request的数据是2时)

08-23 11:36:58.318 12001-12001/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onSubscribe-:onSubscribe
08-23 11:36:58.318 12001-12001/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:1
08-23 11:36:58.320 12001-12001/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:1
08-23 11:36:58.320 12001-12001/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:2
08-23 11:36:58.320 12001-12001/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:2
08-23 11:36:58.320 12001-12001/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:3
08-23 11:36:58.321 12001-12001/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onError-:onError--:0io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
08-23 11:36:58.321 12001-12001/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:4

2、 emitter.onComplete(); 发送的complete事件不算request数据中

三、异步创建Flowable

  public void rxJavaFlowableCreateExample() {
        model.getRxJavaFlowable128Data()
                .subscribeOn(Schedulers.io())
                .unsubscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new FlowableSubscriber<Integer>() {
                    Subscription subscription;

                    @Override
                    public void onSubscribe(Subscription s) {
//                        subscription= s;
//                        subscription.request(3);
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onSubscribe-:" + "onSubscribe");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onError-:" + "onError--:0" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onComplete-:" + "onComplete");
                    }
                });
    }

日志

08-23 16:42:44.141 21570-21570/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onSubscribe-:onSubscribe
08-23 16:42:44.145 21570-23129/com.example.zhang D/MainModel: getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:0
08-23 16:42:44.146 21570-23129/com.example.zhang D/MainModel: getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:1
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:2
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:3
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:4
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:5
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:6
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:7
08-23 16:42:44.147 21570-23129/com.example.zhang D/MainModel: getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:8
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:9
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:10
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:11
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:12
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:13
................................................................................................................
08-23 16:42:44.154 21570-23129/com.example.zhang D/MainModel: getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:123
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:124
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:125
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:126
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:127

总结

1、异步的情况下,上游被观察者和下游观察者之前有个水缸,大小128
2、所以,当上游发送数据,下游不接受的情况下,能发送128条数据不报MissingBackpressureException

日志(当发送129条数据)

................................................................................................................
  getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:125
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:126
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:127
    getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:128
08-23 16:49:16.869 24202-24202/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onError-:onError--:0io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests

四、验证requested大小

1、当同一个线程下时

 public Flowable<Integer> getRxJavaFlowableData() {
        return Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 1);
                emitter.onNext(1);
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 2);
                emitter.onNext(2);
//                Thread.sleep(5000);
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 3);
                emitter.onNext(3);
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:onComplete");
                emitter.onComplete();
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 4);
                emitter.onNext(4);
            }
        }, BackpressureStrategy.ERROR);
    }
public void rxJavaFlowableSizeExample() {
        model.getRxJavaFlowableData()
                .subscribe(new FlowableSubscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onSubscribe-:");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onError-:" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onComplete-:");
                    }
                });
    }

日志

08-24 14:29:06.026 22382-22382/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onSubscribe-:
08-24 14:29:06.026 22382-22382/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:0
    getRxJavaFlowableData---:main--:1
08-24 14:29:06.028 22382-22382/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onError-:io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
08-24 14:29:06.028 22382-22382/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:0
    getRxJavaFlowableData---:main--:2
    getRxJavaFlowableData---:main-request-:0
    getRxJavaFlowableData---:main--:3
    getRxJavaFlowableData---:main-request-:0
    getRxJavaFlowableData---:main--:onComplete
    getRxJavaFlowableData---:main-request-:0
    getRxJavaFlowableData---:main--:4

总结

1、当在同一个线程时,当下游观察者未请求数据时,requested为0

  public void rxJavaFlowableSizeExample() {
        model.getRxJavaFlowableData()
                .subscribe(new FlowableSubscriber<Integer>() {
                    Subscription s;

                    @Override
                    public void onSubscribe(Subscription s) {
                        this.s = s;
                        s.request(100);
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onSubscribe-:");

                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);

                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onError-:" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onComplete-:");
                    }
                });
    }

日志

08-24 14:32:15.405 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onSubscribe-:
08-24 14:32:15.405 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:100
    getRxJavaFlowableData---:main--:1
08-24 14:32:15.406 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:1
08-24 14:32:15.406 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:99
    getRxJavaFlowableData---:main--:2
08-24 14:32:15.406 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:2
08-24 14:32:15.406 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:98
    getRxJavaFlowableData---:main--:3
08-24 14:32:15.406 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:3
08-24 14:32:15.406 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:97
    getRxJavaFlowableData---:main--:onComplete
08-24 14:32:15.406 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onComplete-:
08-24 14:32:15.406 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:97
    getRxJavaFlowableData---:main--:4

总结

1、下游观察者 s.request(100),改变requested值

2、当在异步线程时

   public Flowable<Integer> getRxJavaFlowableData() {
        return Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 1);
                emitter.onNext(1);
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 2);
                emitter.onNext(2);
//                Thread.sleep(5000);
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 3);
                emitter.onNext(3);
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:onComplete");
                emitter.onComplete();
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
                LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 4);
                emitter.onNext(4);
            }
        }, BackpressureStrategy.ERROR);
    }

    public Flowable<Integer> getRxJavaFlowable128Data() {
        return Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; i < 129; i++) {
                    LogUtils.debug(TAG, "getRxJavaFlowable128Data---:" + Thread.currentThread().getName() + "--:" + i);
                    emitter.onNext(i);
                }
            }
        }, BackpressureStrategy.ERROR);
    }
  public void rxJavaFlowableSizeExample() {
        model.getRxJavaFlowableData()
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new FlowableSubscriber<Integer>() {
                    Subscription s;

                    @Override
                    public void onSubscribe(Subscription s) {
                        this.s = s;
                        s.request(100);
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onSubscribe-:");

                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);

                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onError-:" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onComplete-:");
                    }
                });
    }

日志

08-24 14:48:05.367 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onSubscribe-:
08-24 14:48:05.374 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:128
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:1
08-24 14:48:05.376 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:127
08-24 14:48:05.376 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:1
08-24 14:48:05.376 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:2
08-24 14:48:05.377 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:126
08-24 14:48:05.377 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:2
08-24 14:48:05.377 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:3
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:125
08-24 14:48:05.377 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:3
08-24 14:48:05.377 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:onComplete
08-24 14:48:05.377 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onComplete-:
08-24 14:48:05.377 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:125
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:4

总结

1、requested 是128 而不是100,这是被观察者里面有个''水缸'' ,大小是128
2、被观察者每发送一条数据,requested减少1
3、onComplete(),onError()不减少requested值

3、当在异步线程时,什么情况下触发被观察者继续发送数据

 public Flowable<Integer> getRxJavaFlowableRealExample() {
        return Flowable.create(new FlowableOnSubscribe<Integer>() {
            @Override
            public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
                for (int i = 0; ; i++) {
                         if (emitter.isCancelled()) {
                            break;
                        }
                    while (emitter.requested() == 0) {
                     
                    }
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-emit:" + emitter.requested());
                    LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + i);
                    emitter.onNext(i);
                }

            }
        }, BackpressureStrategy.ERROR);
    }
 public void rxJavaFlowableRealExample() {
        model.getRxJavaFlowableRealExample()
                .subscribeOn(Schedulers.io())
                .unsubscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(new FlowableSubscriber<Integer>() {
                    @Override
                    public void onSubscribe(Subscription s) {
//                        s.request(96);
                        LogUtils.error(TAG, "rxJavaFlowableRealExample--:" + Thread.currentThread().getName() + "-onSubscribe-:");
                    }

                    @Override
                    public void onNext(Integer integer) {
                        LogUtils.error(TAG, "rxJavaFlowableRealExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
                    }

                    @Override
                    public void onError(Throwable t) {
                        LogUtils.error(TAG, "rxJavaFlowableRealExample--:" + Thread.currentThread().getName() + "-onError-:" + t.toString());
                    }

                    @Override
                    public void onComplete() {
                        LogUtils.error(TAG, "rxJavaFlowableRealExample--:" + Thread.currentThread().getName() + "-onComplete-:");
                    }
                });
    }

日志

08-24 16:36:38.599 6546-6546/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onSubscribe-:
08-24 16:36:38.606 6546-8058/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:128
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:0
08-24 16:36:38.607 6546-8058/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:127
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:1
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:126
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:2
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:125
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
 getRxJavaFlowableData---:RxCachedThreadScheduler-1--:122
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:5
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:123
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:4
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:124
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:3
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:125
08-24 16:36:38.621 6546-8058/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:2
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:126
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:1
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:127

总结

1、当观察者没有request时,只是被观察者把128条数据发送完毕,观察者并未接受到数据
2、当观察者request(95)时,观察者接受到95条数据,但并没有触发被观察者继续发送数据,也就是并未改变水缸的size值即requested

日志

08-24 16:20:38.610 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onSubscribe-:
08-24 16:20:38.617 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:128
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:0
08-24 16:20:38.619 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:127
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:1
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
08-24 16:20:38.624 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:0
08-24 16:20:38.624 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:29
08-24 16:20:38.624 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:1
08-24 16:20:38.624 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:98
08-24 16:20:38.624 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:2
08-24 16:20:38.625 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:30
08-24 16:20:38.625 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:3
    rxJavaFlowableRealExample--:main-onNext-:4
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
08-24 16:20:38.638 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:93
08-24 16:20:38.638 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:100
08-24 16:20:38.638 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:94
08-24 16:20:38.638 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:27
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:101
08-24 16:20:38.639 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:26
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
   getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:3
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:125
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:2
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:126
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:1
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:127

3、当观察者request(96)时,观察者接受到96条数据,同时触发被观察者继续发送数据,改变水缸的size值

日志

08-24 16:23:23.690 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onSubscribe-:
08-24 16:23:23.696 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:128
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:0
08-24 16:23:23.697 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:127
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:1
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:126
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:2
08-24 16:23:23.697 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:0
08-24 16:23:23.697 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:125
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:97
08-24 16:23:23.701 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:29
08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:31
08-24 16:23:23.701 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:30
08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:96
08-24 16:23:23.701 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:31
08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:32
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:95
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:33
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:94
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:34
08-24 16:23:23.701 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:32
08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:93
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。

08-24 16:23:23.710 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:94
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:33
08-24 16:23:23.710 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:94
08-24 16:23:23.710 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:95
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:32//①此时requested值为32
08-24 16:23:23.710 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:95//②此时观察者又处理了一条数据,到达处理总值96
08-24 16:23:23.710 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:96//③此时被观察者又发送了一条数据
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:127//④此时取得的requested值为32+96-1 为何需要减去1:(刚才又发送了一条数据)
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:3
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:221
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:2
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:222
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:1
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:223

08-24 15:38:47.430 31071-31193/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:30
08-24 15:38:47.430 31071-31071/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:95
08-24 15:38:47.430 31071-31193/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:98
    getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:125
    getRxJavaFlowableData---:RxCachedThreadScheduler-1--:99
河马过河微信公众号.jpg

4、异步线程-consumer

···
public Flowable<Integer> getRxJavaFlowableRealExample() {
return Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
if (emitter.isCancelled()) {
break;
}
while (emitter.requested() == 0) {
}
LogUtils.debug(TAG, "getRxJavaFlowableRealExample---:" + Thread.currentThread().getName() + "-request-emit:" + emitter.requested());
LogUtils.debug(TAG, "getRxJavaFlowableRealExample---:" + Thread.currentThread().getName() + "--:" + i);
emitter.onNext(i);
}

        }
    }, BackpressureStrategy.ERROR);
}
public void rxJavaFlowableConsumeExample() {
    model.getRxJavaFlowableRealExample()
            .subscribeOn(Schedulers.io())
            .unsubscribeOn(Schedulers.io())
            .observeOn(AndroidSchedulers.mainThread())
            .compose(RxLifeCycleUtils.<Integer>bindUntilEvent(view, ActivityEvent.DESTROY))
            .subscribe(new Consumer<Integer>() {
                @Override
                public void accept(Integer integer) throws Exception {
                    LogUtils.error(TAG, "rxJavaFlowableConsumeExample--:" + Thread.currentThread().getName() + "-Consumer-:" + integer);
                }
            });

}

···

日志

....................................................
2019-02-12 17:53:21.381 29563-29563/com.example.zhang E/RxJavaPresenter: rxJavaFlowableConsumeExample--:main-Consumer-:67323
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67324
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3-request-emit:99
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67331
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67333
2019-02-12 17:53:21.381 29563-29563/com.example.zhang E/RxJavaPresenter: rxJavaFlowableConsumeExample--:main-Consumer-:67339
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67342
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67344
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3-request-emit:78
2019-02-12 17:53:21.382 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67346

总结

当观察者是consume时,会持续请求request

相关文章

  • RxJava2.x-Flowable创建

    一、Flowable创建 日志 总结 1、当下游没有通知上游,自己有处理数据的能力时,上游发送数据会直接报错,也就...

  • RxJava2.x-Flowable背压策略

    一、Flowable背压策略 BackpressureStrategy.MISSING 只接受一部分,其他的丢弃...

  • Java后台-2.创建数据库和表

    创建用户表 创建角色表 创建权限表 创建用户角色关系表 创建角色权限关系表 创建refreshToken表 创建商...

  • 创建

    http://blog.csdn.net/itpinpai/article/details/8151219 标记判...

  • 创建

    你好

  • 创建

    10月9日召开创建会,市委副书记马小秋

  • 创建

    自从去年开始咱们这个四线的小城市就迈入创建文明城市的行列,做为基层工作者肯定是参与的一员。在期间的工作中你发现虽...

  • 创建

    自从去年开始咱们这个四线的小城市就迈入创建文明城市的行列,做为基层工作者肯定是参与的一员。在期间的工作中你发现虽...

  • 创建

    早上到校,突然接到通知,明天教办要来验收“绿色学校”创建工作。 还是在二月份的时候,上交了一个“绿色学校创建方案”...

  • go workspace体验

    创建workspace 创建demowork 创建main.go,内容如下 创建demodep 创建demodep...

网友评论

      本文标题:RxJava2.x-Flowable创建

      本文链接:https://www.haomeiwen.com/subject/aalpiftx.html