一、Flowable创建
public Flowable<Integer> getRxJavaFlowableData() {
return Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 1);
emitter.onNext(1);
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 2);
emitter.onNext(2);
// Thread.sleep(5000);
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 3);
emitter.onNext(3);
emitter.onComplete();
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 4);
emitter.onNext(4);
}
}, BackpressureStrategy.ERROR);
}
public void rxJavaFlowableCreateExample() {
model.getRxJavaFlowableData()
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onSubscribe-:" + "onSubscribe");
}
@Override
public void onNext(Integer integer) {
LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
}
@Override
public void onError(Throwable t) {
LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onError-:" + "onError--:0" + t.toString());
}
@Override
public void onComplete() {
LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onComplete-:" + "onComplete");
}
});
}
日志
08-23 10:47:55.032 4992-4992/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onSubscribe-:onSubscribe
08-23 10:47:55.032 4992-4992/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:1
08-23 10:47:55.034 4992-4992/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onError-:onError--:0io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
08-23 10:47:55.034 4992-4992/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:2
getRxJavaFlowableData---:main--:3
08-23 10:47:55.035 4992-4992/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:4
总结
1、当下游没有通知上游,自己有处理数据的能力时,上游发送数据会直接报错,也就是著名的背压错误MissingBackpressureException
2、被观察者则继续发送数据到发送完毕
二、Subscriber发送能处理数据的能力时
public void rxJavaFlowableCreateExample() {
model.getRxJavaFlowableData()
.subscribe(new FlowableSubscriber<Integer>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
subscription= s;
subscription.request(Integer.MAX_VALUE);
LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onSubscribe-:" + "onSubscribe");
}
@Override
public void onNext(Integer integer) {
LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
}
@Override
public void onError(Throwable t) {
LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onError-:" + "onError--:0" + t.toString());
}
@Override
public void onComplete() {
LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onComplete-:" + "onComplete");
}
});
}
日志
08-23 11:33:56.631 11312-11312/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onSubscribe-:onSubscribe
08-23 11:33:56.632 11312-11312/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:1
08-23 11:33:56.633 11312-11312/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:1
08-23 11:33:56.633 11312-11312/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:2
08-23 11:33:56.633 11312-11312/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:2
08-23 11:33:56.633 11312-11312/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:3
08-23 11:33:56.633 11312-11312/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:3
rxJavaFlowableCreateExample--:main-onComplete-:onComplete
08-23 11:33:56.633 11312-11312/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:4
总结
1、当下游通知上游,自己有处理数据的能力时,只要上游发送的数据不超过下游能力,就不会报错
日志(当request的数据是2时)
08-23 11:36:58.318 12001-12001/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onSubscribe-:onSubscribe
08-23 11:36:58.318 12001-12001/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:1
08-23 11:36:58.320 12001-12001/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:1
08-23 11:36:58.320 12001-12001/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:2
08-23 11:36:58.320 12001-12001/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onNext-:2
08-23 11:36:58.320 12001-12001/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:3
08-23 11:36:58.321 12001-12001/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onError-:onError--:0io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
08-23 11:36:58.321 12001-12001/com.example.zhang D/MainModel: getRxJavaFlowableData---:main--:4
2、 emitter.onComplete(); 发送的complete事件不算request数据中
三、异步创建Flowable
public void rxJavaFlowableCreateExample() {
model.getRxJavaFlowable128Data()
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
Subscription subscription;
@Override
public void onSubscribe(Subscription s) {
// subscription= s;
// subscription.request(3);
LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onSubscribe-:" + "onSubscribe");
}
@Override
public void onNext(Integer integer) {
LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
}
@Override
public void onError(Throwable t) {
LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onError-:" + "onError--:0" + t.toString());
}
@Override
public void onComplete() {
LogUtils.error(TAG, "rxJavaFlowableCreateExample--:" + Thread.currentThread().getName() + "-onComplete-:" + "onComplete");
}
});
}
日志
08-23 16:42:44.141 21570-21570/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onSubscribe-:onSubscribe
08-23 16:42:44.145 21570-23129/com.example.zhang D/MainModel: getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:0
08-23 16:42:44.146 21570-23129/com.example.zhang D/MainModel: getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:1
getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:2
getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:3
getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:4
getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:5
getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:6
getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:7
08-23 16:42:44.147 21570-23129/com.example.zhang D/MainModel: getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:8
getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:9
getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:10
getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:11
getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:12
getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:13
................................................................................................................
08-23 16:42:44.154 21570-23129/com.example.zhang D/MainModel: getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:123
getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:124
getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:125
getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:126
getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:127
总结
1、异步的情况下,上游被观察者和下游观察者之前有个水缸,大小128
2、所以,当上游发送数据,下游不接受的情况下,能发送128条数据不报MissingBackpressureException
日志(当发送129条数据)
................................................................................................................
getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:125
getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:126
getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:127
getRxJavaFlowable128Data---:RxCachedThreadScheduler-1--:128
08-23 16:49:16.869 24202-24202/com.example.zhang E/MainPresenter: rxJavaFlowableCreateExample--:main-onError-:onError--:0io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
四、验证requested大小
1、当同一个线程下时
public Flowable<Integer> getRxJavaFlowableData() {
return Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 1);
emitter.onNext(1);
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 2);
emitter.onNext(2);
// Thread.sleep(5000);
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 3);
emitter.onNext(3);
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:onComplete");
emitter.onComplete();
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 4);
emitter.onNext(4);
}
}, BackpressureStrategy.ERROR);
}
public void rxJavaFlowableSizeExample() {
model.getRxJavaFlowableData()
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onSubscribe-:");
}
@Override
public void onNext(Integer integer) {
LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
}
@Override
public void onError(Throwable t) {
LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onError-:" + t.toString());
}
@Override
public void onComplete() {
LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onComplete-:");
}
});
}
日志
08-24 14:29:06.026 22382-22382/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onSubscribe-:
08-24 14:29:06.026 22382-22382/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:0
getRxJavaFlowableData---:main--:1
08-24 14:29:06.028 22382-22382/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onError-:io.reactivex.exceptions.MissingBackpressureException: create: could not emit value due to lack of requests
08-24 14:29:06.028 22382-22382/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:0
getRxJavaFlowableData---:main--:2
getRxJavaFlowableData---:main-request-:0
getRxJavaFlowableData---:main--:3
getRxJavaFlowableData---:main-request-:0
getRxJavaFlowableData---:main--:onComplete
getRxJavaFlowableData---:main-request-:0
getRxJavaFlowableData---:main--:4
总结
1、当在同一个线程时,当下游观察者未请求数据时,requested为0
public void rxJavaFlowableSizeExample() {
model.getRxJavaFlowableData()
.subscribe(new FlowableSubscriber<Integer>() {
Subscription s;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(100);
LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onSubscribe-:");
}
@Override
public void onNext(Integer integer) {
LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
}
@Override
public void onError(Throwable t) {
LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onError-:" + t.toString());
}
@Override
public void onComplete() {
LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onComplete-:");
}
});
}
日志
08-24 14:32:15.405 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onSubscribe-:
08-24 14:32:15.405 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:100
getRxJavaFlowableData---:main--:1
08-24 14:32:15.406 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:1
08-24 14:32:15.406 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:99
getRxJavaFlowableData---:main--:2
08-24 14:32:15.406 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:2
08-24 14:32:15.406 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:98
getRxJavaFlowableData---:main--:3
08-24 14:32:15.406 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:3
08-24 14:32:15.406 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:97
getRxJavaFlowableData---:main--:onComplete
08-24 14:32:15.406 23251-23251/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onComplete-:
08-24 14:32:15.406 23251-23251/com.example.zhang D/MainModel: getRxJavaFlowableData---:main-request-:97
getRxJavaFlowableData---:main--:4
总结
1、下游观察者 s.request(100),改变requested值
2、当在异步线程时
public Flowable<Integer> getRxJavaFlowableData() {
return Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 1);
emitter.onNext(1);
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 2);
emitter.onNext(2);
// Thread.sleep(5000);
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 3);
emitter.onNext(3);
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:onComplete");
emitter.onComplete();
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-:" + emitter.requested());
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + 4);
emitter.onNext(4);
}
}, BackpressureStrategy.ERROR);
}
public Flowable<Integer> getRxJavaFlowable128Data() {
return Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; i < 129; i++) {
LogUtils.debug(TAG, "getRxJavaFlowable128Data---:" + Thread.currentThread().getName() + "--:" + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR);
}
public void rxJavaFlowableSizeExample() {
model.getRxJavaFlowableData()
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
Subscription s;
@Override
public void onSubscribe(Subscription s) {
this.s = s;
s.request(100);
LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onSubscribe-:");
}
@Override
public void onNext(Integer integer) {
LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
}
@Override
public void onError(Throwable t) {
LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onError-:" + t.toString());
}
@Override
public void onComplete() {
LogUtils.error(TAG, "rxJavaFlowableSizeExample--:" + Thread.currentThread().getName() + "-onComplete-:");
}
});
}
日志
08-24 14:48:05.367 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onSubscribe-:
08-24 14:48:05.374 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:128
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:1
08-24 14:48:05.376 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:127
08-24 14:48:05.376 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:1
08-24 14:48:05.376 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:2
08-24 14:48:05.377 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:126
08-24 14:48:05.377 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:2
08-24 14:48:05.377 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:3
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:125
08-24 14:48:05.377 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:3
08-24 14:48:05.377 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:onComplete
08-24 14:48:05.377 24916-24916/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onComplete-:
08-24 14:48:05.377 24916-25268/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-:125
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:4
总结
1、requested 是128 而不是100,这是被观察者里面有个''水缸'' ,大小是128
2、被观察者每发送一条数据,requested减少1
3、onComplete(),onError()不减少requested值
3、当在异步线程时,什么情况下触发被观察者继续发送数据
public Flowable<Integer> getRxJavaFlowableRealExample() {
return Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
if (emitter.isCancelled()) {
break;
}
while (emitter.requested() == 0) {
}
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "-request-emit:" + emitter.requested());
LogUtils.debug(TAG, "getRxJavaFlowableData---:" + Thread.currentThread().getName() + "--:" + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR);
}
public void rxJavaFlowableRealExample() {
model.getRxJavaFlowableRealExample()
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new FlowableSubscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
// s.request(96);
LogUtils.error(TAG, "rxJavaFlowableRealExample--:" + Thread.currentThread().getName() + "-onSubscribe-:");
}
@Override
public void onNext(Integer integer) {
LogUtils.error(TAG, "rxJavaFlowableRealExample--:" + Thread.currentThread().getName() + "-onNext-:" + integer);
}
@Override
public void onError(Throwable t) {
LogUtils.error(TAG, "rxJavaFlowableRealExample--:" + Thread.currentThread().getName() + "-onError-:" + t.toString());
}
@Override
public void onComplete() {
LogUtils.error(TAG, "rxJavaFlowableRealExample--:" + Thread.currentThread().getName() + "-onComplete-:");
}
});
}
日志
08-24 16:36:38.599 6546-6546/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onSubscribe-:
08-24 16:36:38.606 6546-8058/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:128
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:0
08-24 16:36:38.607 6546-8058/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:127
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:1
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:126
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:2
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:125
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:122
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:5
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:123
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:4
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:124
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:3
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:125
08-24 16:36:38.621 6546-8058/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:2
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:126
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:1
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:127
总结
1、当观察者没有request时,只是被观察者把128条数据发送完毕,观察者并未接受到数据
2、当观察者request(95)时,观察者接受到95条数据,但并没有触发被观察者继续发送数据,也就是并未改变水缸的size值即requested
日志
08-24 16:20:38.610 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onSubscribe-:
08-24 16:20:38.617 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:128
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:0
08-24 16:20:38.619 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:127
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:1
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
08-24 16:20:38.624 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:0
08-24 16:20:38.624 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:29
08-24 16:20:38.624 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:1
08-24 16:20:38.624 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:98
08-24 16:20:38.624 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:2
08-24 16:20:38.625 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:30
08-24 16:20:38.625 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:3
rxJavaFlowableRealExample--:main-onNext-:4
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
08-24 16:20:38.638 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:93
08-24 16:20:38.638 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:100
08-24 16:20:38.638 5075-5075/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:94
08-24 16:20:38.638 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:27
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:101
08-24 16:20:38.639 5075-5222/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:26
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:3
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:125
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:2
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:126
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:1
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:127
3、当观察者request(96)时,观察者接受到96条数据,同时触发被观察者继续发送数据,改变水缸的size值
日志
08-24 16:23:23.690 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onSubscribe-:
08-24 16:23:23.696 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:128
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:0
08-24 16:23:23.697 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:127
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:1
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:126
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:2
08-24 16:23:23.697 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:0
08-24 16:23:23.697 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:125
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:97
08-24 16:23:23.701 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:29
08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:31
08-24 16:23:23.701 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:30
08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:96
08-24 16:23:23.701 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:31
08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:32
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:95
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:33
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:94
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:34
08-24 16:23:23.701 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:32
08-24 16:23:23.701 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:93
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
08-24 16:23:23.710 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:94
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:33
08-24 16:23:23.710 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:94
08-24 16:23:23.710 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:95
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:32//①此时requested值为32
08-24 16:23:23.710 5714-5714/com.example.zhang E/MainPresenter: rxJavaFlowableRealExample--:main-onNext-:95//②此时观察者又处理了一条数据,到达处理总值96
08-24 16:23:23.710 5714-6020/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:96//③此时被观察者又发送了一条数据
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:127//④此时取得的requested值为32+96-1 为何需要减去1:(刚才又发送了一条数据)
。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。。
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:3
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:221
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:2
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:222
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:1
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:223
08-24 15:38:47.430 31071-31193/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:30
08-24 15:38:47.430 31071-31071/com.example.zhang E/MainPresenter: rxJavaFlowableSizeExample--:main-onNext-:95
08-24 15:38:47.430 31071-31193/com.example.zhang D/MainModel: getRxJavaFlowableData---:RxCachedThreadScheduler-1--:98
getRxJavaFlowableData---:RxCachedThreadScheduler-1-request-emit:125
getRxJavaFlowableData---:RxCachedThreadScheduler-1--:99
河马过河微信公众号.jpg
4、异步线程-consumer
···
public Flowable<Integer> getRxJavaFlowableRealExample() {
return Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
for (int i = 0; ; i++) {
if (emitter.isCancelled()) {
break;
}
while (emitter.requested() == 0) {
}
LogUtils.debug(TAG, "getRxJavaFlowableRealExample---:" + Thread.currentThread().getName() + "-request-emit:" + emitter.requested());
LogUtils.debug(TAG, "getRxJavaFlowableRealExample---:" + Thread.currentThread().getName() + "--:" + i);
emitter.onNext(i);
}
}
}, BackpressureStrategy.ERROR);
}
public void rxJavaFlowableConsumeExample() {
model.getRxJavaFlowableRealExample()
.subscribeOn(Schedulers.io())
.unsubscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.compose(RxLifeCycleUtils.<Integer>bindUntilEvent(view, ActivityEvent.DESTROY))
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
LogUtils.error(TAG, "rxJavaFlowableConsumeExample--:" + Thread.currentThread().getName() + "-Consumer-:" + integer);
}
});
}
···
日志
....................................................
2019-02-12 17:53:21.381 29563-29563/com.example.zhang E/RxJavaPresenter: rxJavaFlowableConsumeExample--:main-Consumer-:67323
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67324
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3-request-emit:99
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67331
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67333
2019-02-12 17:53:21.381 29563-29563/com.example.zhang E/RxJavaPresenter: rxJavaFlowableConsumeExample--:main-Consumer-:67339
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67342
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67344
2019-02-12 17:53:21.381 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3-request-emit:78
2019-02-12 17:53:21.382 29563-29876/com.example.zhang D/RxJavaModel: getRxJavaFlowableRealExample---:RxCachedThreadScheduler-3--:67346
总结
当观察者是consume时,会持续请求request
网友评论