package com.github.cai.greendaotaste.rxjava;
import android.os.Bundle;
import android.support.annotation.Nullable;
import android.support.v7.app.AppCompatActivity;
import android.util.Log;
import android.view.View;
import android.widget.Button;
import android.widget.TextView;
import android.widget.Toast;
import com.github.cai.greendaotaste.R;
import org.reactivestreams.Subscriber;
import org.reactivestreams.Subscription;
import java.util.ArrayList;
import java.util.concurrent.TimeUnit;
import io.reactivex.BackpressureStrategy;
import io.reactivex.Flowable;
import io.reactivex.FlowableEmitter;
import io.reactivex.FlowableOnSubscribe;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.ObservableSource;
import io.reactivex.Observer;
import io.reactivex.android.schedulers.AndroidSchedulers;
import io.reactivex.annotations.NonNull;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.BiFunction;
import io.reactivex.functions.Consumer;
import io.reactivex.functions.Function;
import io.reactivex.functions.Predicate;
import io.reactivex.schedulers.Schedulers;
/**
* Created by admin on 2017/6/19.
* 参考自 码个蛋 http://chuansong.me/n/1875207353935
* 上游和下游就分别对应着RxJava中的Observable和Observer,它们之间的连接就对应着subscribe()
* <p>
* 知识点一:
* 一,上游可以发送无限个onNext, 下游也可以接收无限个onNext.
* 二,当上游发送了一个onComplete后, 上游onComplete之后的事件将会继续发送, 而下游收到onComplete事件之后将不再继续接收事件.
* 三,当上游发送了一个onError后, 上游onError之后的事件将继续发送, 而下游收到onError事件之后将不再继续接收事件.
* 四,上游可以不发送onComplete或onError.
* 五,最为关键的是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然
* 六,调用顺序:先调用onSubscribe -> subscribe(上游方法) -> onNext -> onComplete或onError
* <p>
* 知识点二: 调用dispose()并不会导致上游不再继续发送事件, 上游会继续发送剩余的事件.
* <p>
* 知识点三:
* 带有Observer参数的我们已经使用过了,这里对其他几个方法进行说明.
* 一,不带任何参数的subscribe() 表示下游不关心任何事件,你上游尽管发你的数据去吧, 老子可不管你发什么.
* 二,带有一个Consumer参数的方法表示下游只关心onNext事件, 其他的事件我假装没看见, 因此我们如果只需要onNext事件可以这么写:
* <p>
* 知识点四:
* 上游和下游是工作在同一个线程中的, 也就是说上游在哪个线程发事件, 下游就在哪个线程接收事件.
* <p>
* 知识点五:
* 一,多次指定上游的线程只有第一次指定的有效, 也就是说多次调用subscribeOn() 只有第一次的有效, 其余的会被忽略.
* 二,多次指定下游的线程是可以的, 也就是说*每调用一次observeOn()* , 下游的线程就会切换一次.
* <p>
* 知识点六:
* Schedulers.io()
* 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
* <p>
* Schedulers.computation()
* 代表CPU计算密集型的操作, 例如需要大量计算的操作
* <p>
* Schedulers.newThread()
* 代表一个常规的新线程
* <p>
* AndroidSchedulers.mainThread()
* 代表Android的主线程
* <p>
* 知识点七:
* 那如果有多个Disposable 该怎么办呢, RxJava中已经内置了一个容器CompositeDisposable,
* 每当我们得到一个Disposable时就调用CompositeDisposable.add()将它添加到容器中, 在退出的时候,
* 调用CompositeDisposable.clear() 即可切断所有的水管.
* <p>
* 知识点八:
* 上游每发送一个事件, flatMap都将创建一个新的水管, 然后发送转换之后的新的事件, 下游接收到的就是这些新的水管发送的数据.
* 这里需要注意的是, flatMap并不保证事件的顺序, 也就是图中所看到的, 并不是事件1就在事件2的前面.
* 如果需要保证顺序则需要使用concatMap.
* <p>
* 最终下游收到的事件数量是和上游中发送事件最少的那一根水管的事件数量相同,这个也很好理解,
* 因为是从每一根水管里取一个事件来进行合并,
* 最少的那个肯定就最先取完, 这个时候其他的水管尽管还有事件,
* 但是已经没有足够的事件来组合了, 因此下游就不会收到剩余的事件
* <p>
*/
public class RxJavaActivity extends AppCompatActivity implements View.OnClickListener {
public static final String TAG = RxJavaActivity.class.getSimpleName();
private TextView showRxMessage;
private Button subscriber, observer,requestBtn;
private Subscription mSubscription;
@Override
protected void onCreate(@Nullable Bundle savedInstanceState) {
super.onCreate(savedInstanceState);
setContentView(R.layout.activity_rxjava);
showRxMessage = (TextView) findViewById(R.id.show_rx_message);
subscriber = (Button) findViewById(R.id.subscriber);
observer = (Button) findViewById(R.id.observer);
requestBtn = (Button) findViewById(R.id.request);
subscriber.setOnClickListener(this);
observer.setOnClickListener(this);
requestBtn.setOnClickListener(this);
Log.d(TAG, "onCreate: current thread is " + Thread.currentThread().getName());
}
@Override
public void onClick(View v) {
switch (v.getId()) {
case R.id.subscriber:
//simpleUseRxJava();
//onlyCareAboutOnNext();
//rxJavaMapTaste();
//flatMapTaste();
//rxJavaZipTaste();
//dealWithBackpressureWithFilter();
//dealWithBackPressWithSample();
rxJavaFlowableResponseFetchAync();
break;
case R.id.observer:
//flowableTaste();
//changeFlowableCacheSize();
//rxJavaIntervalTaste();
//rxJavaFlowableResponseFetch();
lookWhatTimeHitRequestValue();
break;
case R.id.request:
mSubscription.request(96);
break;
}
}
public void simpleUseRxJava() {
//可观察者 上游 可观察者所以用订阅subscribeOn
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
//ObservableEmitter: Emitter是发射器的意思 可以联想为送报员
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onComplete();
e.onComplete();//发完一个后还可以发第二个,只是观察者只收一个
//e.onError(new Throwable());崩溃 onComplete和onError事件是互斥的,只能发一个
Log.d(TAG, "subscribe: current thread is " + Thread.currentThread().getName());
}
});
//观察者 所有用观察observeOn
Observer<Integer> observer = new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(@NonNull Disposable d) {
// Disposable, 这个单词的字面意思是一次性用品,用完即可丢弃的.
// 那么在RxJava中怎么去理解它呢, 对应于上面的水管的例子,
// 我们可以把它理解成两根管道之间的一个机关, 当调用它的dispose()方法时,
// 它就会将两根管道切断, 从而导致下游收不到事件.
// 因此我们可以在Activity中将这个Disposable 保存起来, 当Activity退出时, 切断它即可.
showMessageInText("onSubscribe: ");
disposable = d;
Log.d(TAG, "onSubscribe: current thread is " + Thread.currentThread().getName());
}
@Override
public void onNext(@NonNull Integer integer) {
//showMessageInText("onNext: " + integer.toString());
if (integer == 2) {
disposable.dispose();
}
//showMessageInText("onNext: " + disposable.isDisposed());//切断水管,之后的时间不在接收
//但是不妨碍上游时间的传递
Log.d(TAG, "onNext: current thread is " + Thread.currentThread().getName());
}
@Override
public void onError(@NonNull Throwable e) {
//showMessageInText("onError: ");
Log.d(TAG, "onError: current thread is " + Thread.currentThread().getName());
}
@Override
public void onComplete() {
//showMessageInText("onComplete: ");
Log.d(TAG, "onComplete: current thread is " + Thread.currentThread().getName());
}
};
//邮局派送报纸(联想订报纸,虽然是人去订报社报纸,但是报社是实际递送报纸给人的),上游向下游发送事件
observable.subscribeOn(Schedulers.newThread())//指定发送事件的线程 上游发送事件的线程,
.subscribeOn(Schedulers.newThread())
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())//指定接收事件的线程 下游接收事件的线程.
.observeOn(Schedulers.io())
.observeOn(Schedulers.computation())
.subscribe(observer);
}
public void onlyCareAboutOnNext() {
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
//ObservableEmitter: Emitter是发射器的意思 可以联想为送报员
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onError(new Throwable("test error"));
//e.onComplete();
Log.d(TAG, "subscribe: current thread is " + Thread.currentThread().getName());
}
});
Consumer<Integer> careAboutNext = new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
showMessageInText("accept: " + integer.toString() + " login is success");
}
};
Consumer<Throwable> failureNext = new Consumer<Throwable>() {//注意:用于错误的Consumer只能使用泛型Throwable
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
showMessageInText("accept: login is failure");
}
};
observable.subscribe(careAboutNext, failureNext);
}
//map是RxJava中最简单的一个变换操作符了, 它的作用就是对上游发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化
public void rxJavaMapTaste() {
//要传递的是Integer类型
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(10);
e.onNext(11);
e.onNext(12);
e.onComplete();
}
});
//要接收的是String类型
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
showMessageInText("accept: I receive value is" + s);
}
};
//提供的转换函数
observable.map(new Function<Integer, String>() {
@Override
public String apply(@NonNull Integer integer) throws Exception {
return String.valueOf(integer);
}
}).subscribe(consumer);
}
//FlatMap将一个发送事件的上游Observable变换为多个发送事件的Observables,然后将它们发射的事件合并后放进一个单独的Observable里.
//自己的理解:
public void flatMapTaste() {
//要传递的是Integer类型
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(100);
e.onNext(110);
e.onNext(120);
e.onComplete();
}
});
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
showMessageInText("accept: After flat map I receive value is" + s);
}
};
observable.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(@NonNull Integer integer) throws Exception {
ArrayList<String> list = new ArrayList<>(3);
for (int i = 0; i < 3; i++) {
list.add(String.valueOf(integer));
}
return Observable.fromIterable(list).delay(15, TimeUnit.MILLISECONDS);//不保证发送顺序
}
}).observeOn(AndroidSchedulers.mainThread()).subscribe(consumer);
}
// Zip通过一个函数将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件.
// 它按照*严格的顺序*应用这个函数。它只发射与 发射数据项最少的那个Observable 一样多的数据
// (最终下游收到的事件数量是和上游中发送事件最少的那一根水管的事件数量相同)
public void rxJavaZipTaste() {
//要传递的是Integer类型
Observable<Integer> observableInt = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
e.onNext(1000);
e.onNext(1100);
e.onNext(1200);
e.onNext(1300);
e.onComplete();
}
});
//要传递的是Integer类型
Observable<String> observableStr = Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> e) throws Exception {
e.onNext("thousand");
e.onNext("thousand one hundred");
e.onNext("thousand two hundred");
e.onComplete();
}
});
Consumer<String> consumer = new Consumer<String>() {
@Override
public void accept(@NonNull String s) throws Exception {
showMessageInText("accept: After zip I receive value is" + s);
}
};
//泛型参数表示 第一个表示一个上游的类型,第二个参数表示第二个上游的类型,第三个参数表示组合后返回的类型
Observable.zip(observableInt, observableStr, new BiFunction<Integer, String, String>() {
@Override
public String apply(@NonNull Integer integer, @NonNull String s) throws Exception {
return integer + s;
}
}).subscribe(consumer);//如果没有指定线程,默认全部都是在同一个线程中执行
}
public void backpressureTaste() {
/**
* Zip可以将多个上游发送的事件组合起来发送给下游, 那大家有没有想过一个问题,
* 如果其中一个水管A发送事件特别快, 而另一个水管B 发送事件特别慢, 那就可能出现这种情况,
* 发得快的水管A 已经发送了1000个事件了, 而发的慢的水管B 才发一个出来, 组合了一个之后水管A 还剩999个事件,
* 这些事件需要继续等待水管B 发送事件出来组合, 那么这么多的事件是放在哪里的呢? 总有一个地方保存吧?
* 没错, Zip给我们的每一根水管都弄了一个水缸 , 用来保存这些事件
*
* 所谓的Backpressure其实就是为了控制流量
*/
/**
* 知识点九:
* 一,当上下游工作在同一个线程中时, 这时候是一个同步的订阅关系, 也就是说上游每发送一个事件必须等到下游接收处理完了以后才能接着发送下一个事件
* 相当于直接调用方法的过程
* 二,当上下游工作在不同的线程中时, 这时候是一个异步的订阅关系, 这个时候上游发送数据不需要等待下游接收,
* 为什么呢, 因为两个线程并不能直接进行通信, 因此上游发送的事件并不能直接到下游里去,
* 这个时候就需要一个田螺姑娘来帮助它们俩, 这个田螺姑娘就是我们刚才说的水缸 ! 上游把事件发送到水缸里去,
* 下游从水缸里取出事件来处理, 因此, 当上游发事件的速度太快, 下游取事件的速度太慢, 水缸就会迅速装满, 然后溢出来, 最后就OOM了.
*/
}
public void dealWithBackpressureWithFilter() {
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; ; i++)
e.onNext(i);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
showMessageInText("the receive value is " + integer);
}
};
Consumer<Throwable> error = new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Toast.makeText(RxJavaActivity.this, "Oop,occur error", Toast.LENGTH_SHORT).show();
}
};
observable.observeOn(AndroidSchedulers.mainThread())
.subscribeOn(Schedulers.io())
.filter(new Predicate<Integer>() {//过滤,只有符合条件的才放入水缸
@Override
public boolean test(@NonNull Integer integer) throws Exception {
if (integer % 10000 == 0)
return true;
return false;
}
})
.subscribe(consumer, error);
}
public void dealWithBackPressWithSample() {
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> e) throws Exception {
for (int i = 0; ; i++)
e.onNext(i);
}
});
Consumer<Integer> consumer = new Consumer<Integer>() {
@Override
public void accept(@NonNull Integer integer) throws Exception {
Log.e(TAG, "accept: the receive value is " + integer);
}
};
Consumer<Throwable> error = new Consumer<Throwable>() {
@Override
public void accept(@NonNull Throwable throwable) throws Exception {
Toast.makeText(RxJavaActivity.this, "Oop,occur error", Toast.LENGTH_SHORT).show();
}
};
observable.subscribeOn(Schedulers.computation())
.observeOn(Schedulers.io())
.sample(2,TimeUnit.SECONDS)//这个操作符每隔指定的时间就从上游中取出一个事件发送给下游,这里是每隔2秒钟取样一次
.subscribe(consumer,error);
}
/**
* 知识点一:
* 我们注意到这次和Observable有些不同. 首先是创建Flowable的时候增加了一个参数,
* 这个参数是用来选择背压,也就是出现上下游流速不均衡的时候应该怎么处理的办法,
* 这里我们直接用BackpressureStrategy.ERROR这种方式, 这种方式会在出现上下游流速不均衡的时候直接抛出一个异常,
* 这个异常就是著名的MissingBackpressureException. 其余的策略后面再来讲解
*
* 知识点二:
* 另外的一个区别是在下游的onSubscribe方法中传给我们的不再是Disposable了, 而是Subscription,
* 它俩有什么区别呢, 首先它们都是上下游中间的一个开关, 之前我们说调用Disposable.dispose()方法可以切断水管,
* 同样的调用Subscription.cancel()也可以切断水管, 不同的地方在于Subscription增加了一个void request(long n)方法,
* 这个方法有什么用呢, 在上面的代码中也有这么一句代码:
*
* 知识点三:
* 关于Subscription的request(int)方法
* 这是因为Flowable在设计的时候采用了一种新的思路也就是响应式拉取的方式来更好的解决上下游流速不均衡的问题,
* 与我们之前所讲的控制数量和控制速度不太一样, 这种方式用通俗易懂的话来说就好比是叶问打鬼子, 我们把上游看成小日本,
* 把下游当作叶问, 当调用Subscription.request(1)时, 叶问就说我要打一个! 然后小日本就拿出一个鬼子给叶问,
* 让他打, 等叶问打死这个鬼子之后, 再次调用request(10), 叶问就又说我要打十个! 然后小日本又派出十个鬼子给叶问,
* 然后就在边上看热闹, 看叶问能不能打死十个鬼子, 等叶问打死十个鬼子后再继续要鬼子接着打...
* 所以我们把request当做是一种能力, 当成下游处理事件的能力, 下游能处理几个就告诉上游我要几个,
* 这样只要上游根据下游的处理能力来决定发送多少事件, 就不会造成一窝蜂的发出一堆事件来, 从而导致OOM.
* 这也就完美的解决之前我们所学到的两种方式的缺陷, 过滤事件会导致事件丢失, 减速又可能导致性能损失.
* 而这种方式既解决了事件丢失的问题, 又解决了速度的问题, 完美 !
*
* 知识点四:
* 首先第一个同步的代码, 为什么上游发送第一个事件后下游就抛出了MissingBackpressureException异常(最新版的只是调用onError方法),
* 这是因为下游没有调用request, 上游就认为下游没有处理事件的能力, 而这又是一个同步的订阅, 既然下游处理不了,
* 那上游不可能一直等待吧, 如果是这样, 万一这两根水管工作在主线程里, 界面不就卡死了吗, 因此只能抛个异常来提醒我们.
* 那如何解决这种情况呢, 很简单啦, 下游直接调用request(Long.MAX_VALUE)就行了, 或者根据上游发送事件的数量来request就行了,
* 比如这里request(3)就可以了.
* 然后我们再来看看第二段代码, 为什么上下游没有工作在同一个线程时, 上游却正确的发送了所有的事件呢?
* 这是因为在Flowable里默认有一个大小为128的水缸, 当上下游工作在不同的线程中时, 上游就会先把事件发送到这个水缸中,
* 因此, 下游虽然没有调用request, 但是上游在水缸中保存着这些事件, 只有当下游调用request时, 才从水缸里取出事件发给下游.
*/
public void flowableTaste(){
//上游变成了Flowable
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
e.onNext(10);
e.onNext(11);
e.onNext(12);
e.onNext(13);
e.onComplete();
}
}, BackpressureStrategy.ERROR);
//下游变成了Subscriber
Subscriber<Integer> subcriber = new Subscriber<Integer>() {
Subscription s;
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe: ");
//未指定线程注释这段话,不会发送onNext事件,发完onSubscribe,直接发onError
//如果指定线程注释这段话,那么只会执行onSubscribe
s.request(2);//可看做是下游处理事件的能力,但是需要上游配合,如果上游不配合应该没有用
try {
Thread.sleep(2000);
} catch (InterruptedException e) {
e.printStackTrace();
}
s.request(2);
this.s = s;
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
//if (integer == 12)
//s.cancel();//切断水管
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: " + t.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
};
flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subcriber);
}
/**
* 知识点一:
* 想想看我们之前学习Observable的时候说到的如何解决上游发送事件太快的, 有一招叫从数量上取胜,
* 同样的Flowable中也有这种方法, 对应的就是BackpressureStrategy.DROP和BackpressureStrategy.LATEST这两种策略.
* 从名字上就能猜到它俩是干啥的, Drop就是直接把存不下的事件丢弃,Latest就是只保留最新的事件, 来看看它们的实际效果吧.
*/
public void changeFlowableCacheSize(){
//上游变成了Flowable
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
for (int i = 0;i < 1000; i++){
e.onNext(i);
}
e.onComplete();
}
}, BackpressureStrategy.BUFFER);//使用ERROR的时候overflow了,这次换成BUFFER,它没有大小限制, 因此可以存放许许多多的事件.
// 但是使用BUFFER无限循环的发送事件,会和Observable一样发生OOM
// 所以我们得从数量上取胜, 同样的Flowable中也有这种方法, 对应的就是BackpressureStrategy.DROP和BackpressureStrategy.LATEST这两种策略.
//下游变成了Subscriber
Subscriber<Integer> subcriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe: ");
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: " + t.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
};
flowable.subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(subcriber);
}
public void rxJavaIntervalTaste(){
//每隔一秒进行加1
Flowable.interval(1,TimeUnit.SECONDS)
.onBackpressureBuffer()//指定缓存的大小
.subscribe(new Subscriber<Long>() {
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe: ");
s.request(Integer.MAX_VALUE);
}
@Override
public void onNext(Long aLong) {
Log.e(TAG, "onNext: " + aLong);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: " + t.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
});
}
/**
* 知识点一:
* 在下游中调用Subscription.request(n)就可以告诉上游,下游能够处理多少个事件,
* 那么上游要根据下游的处理能力正确的去发送事件,那么上游是不是应该知道下游的处理能力是多少啊,
* 对吧,不然,一个巴掌拍不响啊,这种事情得你情我愿才行。
* 那么上游从哪里得知下游的处理能力呢?我们来看看上游最重要的部分,肯定就是FlowableEmitter了啊,
* 我们就是通过它来发送事件的啊,来看看它的源码吧
*
* 知识点二:
* 下游调用request(n) 告诉上游它的处理能力,上游每发送一个next事件之后,requested就减一,注意是next事件,
* complete和error事件不会消耗requested,当减到0时,则代表下游没有处理能力了,这个时候你如果继续发送事件,就会调用onError事件了
*/
public void rxJavaFlowableResponseFetch(){//同步的情况
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
e.onNext(15);
e.onNext(17);
e.onNext(19);
e.onNext(21);
e.onComplete();
}
},BackpressureStrategy.ERROR);
Subscriber<Integer> subcriber = new Subscriber<Integer>() {
Subscription sub;
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe: ");
sub = s;
s.request(1);
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
sub.request(1);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: " + t.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
};
flowable.subscribe(subcriber);
}
/**
* 可以看到,当上下游工作在不同的线程里时,每一个线程里都有一个requested,而我们调用request(1000)时,
* 实际上改变的是下游主线程中的requested,而上游中的requested的值是由RxJava内部调用request(n)去设置的,
* 这个调用会在合适的时候自动触发。
*
* question:什么时候是何时的时候?
* 刚才同步的时候我们说了,上游每发送一个事件,requested的值便会减一,对于异步来说同样如此,
* 那有人肯定有疑问了,一开始上游的requested的值是128,那这128个事件发送完了不就不能继续发送了吗?
*/
public void rxJavaFlowableResponseFetchAync(){//异步的情况
//上下游都不做操作的情况
Flowable<Integer> flowableNothingDoing = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
Log.e(TAG, "subscribe: request count is " + e.requested());//128
}
},BackpressureStrategy.ERROR);
Subscriber<Integer> subcriberNothingDoing = new Subscriber<Integer>() {
Subscription sub;
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe: ");
sub = s;
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: " + t.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
};
flowableNothingDoing.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subcriberNothingDoing);
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
Log.e(TAG, "subscribe: request count is " + e.requested());//128
e.onNext(15);
e.onNext(17);
e.onNext(19);
e.onNext(21);
e.onComplete();
}
},BackpressureStrategy.ERROR);
Subscriber<Integer> subcriber = new Subscriber<Integer>() {
Subscription sub;
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe: ");
sub = s;
s.request(1000);
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
sub.request(1);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: " + t.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
};
flowable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subcriber);
}
/**
* 可以看到,当下游消费掉第96个事件之后,上游又开始发事件了,
* 而且可以看到当前上游的requested的值是96(打印出来的95是已经发送了一个事件减一之后的值),
* 最终发出了第223个事件之后又进入了等待区,而223-127 正好等于 96。
* 这是不是说明当下游每消费96个事件便会自动触发内部的request()去设置上游的requested的值啊!没错,就是这样,而这个新的值就是96。
*/
public void lookWhatTimeHitRequestValue(){
Flowable<Integer> flowable = Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull FlowableEmitter<Integer> e) throws Exception {
Log.e(TAG, "subscribe: request count is " + e.requested());//128
boolean flag;
for (int i = 0; ; i++){
flag = false;
while(e.requested() == 0){
if (!flag) {
Log.e(TAG, "subscribe: Oop I can't emitter event");
flag =true;
}
}
e.onNext(i);
Log.e(TAG, "subscribe: emit " + i + " request is " + e.requested());
}
}
},BackpressureStrategy.ERROR);
Subscriber<Integer> subscriber = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
Log.e(TAG, "onSubscribe: ");
mSubscription = s;
}
@Override
public void onNext(Integer integer) {
Log.e(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.e(TAG, "onError: " + t.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
};
flowable.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(subscriber);
}
public void showMessageInText(String appendStr) {
showRxMessage.append(appendStr + "\n");
}
}
网友评论