导包
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
被观察者
Observable<Integer> integerObservable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
//ObservableEmitter发射器
e.onNext(1);
e.onNext(2);
e.onNext(3);
e.onNext(4);
e.onError(new RuntimeException("test"));
e.onComplete();
}
});
观察者
Observer<Integer> integerObserver = new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
//Disposable 一次性用品 管道连接的开关
disposable = d;
System.out.println("onSubscribe");
}
@Override
public void onNext(Integer value) {
System.out.println("onNext" + value);
if (value % 3 == 0) {
disposable.dispose();//将开关关闭
System.out.println("disposable . dispose");
}
}
@Override
public void onError(Throwable e) {
System.out.println("onError");
}
@Override
public void onComplete() {
System.out.println("onComplete");
}
};
integerObservable.subscribe(integerObserver);//订阅
观察者简写
//onNext简写
Consumer consumer = new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println("next--"+integer);
}
};
//onError简写
Consumer<Throwable> throwableConsumer = new Consumer<Throwable>() {
@Override
public void accept(Throwable throwable) throws Exception {
System.out.println("异常");
}
};
//onComplete简写
Action action = new Action() {
@Override
public void run() throws Exception {
System.out.println("complete");
}
};
integerObservable.subscribe(consumer, throwableConsumer, action);
切换线程
.subscribeOn(Schedulers.newThread()) //被观察者线程 被观察者线程只有第一次有效,其余无效
.observeOn(AndroidSchedulers.mainThread()) //观察者线程 观察者可多次切换线程
所有线程选项
Schedulers.io() 代表io操作的线程, 通常用于网络,读写文件等io密集型的操作
Schedulers.computation() 代表CPU计算密集型的操作, 例如需要大量计算的操作
Schedulers.newThread() 代表一个常规的新线程
AndroidSchedulers.mainThread() 代表Android的主线程
在onNext之前执行 是属于Observar的线程
.doOnNext(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.d(TAG, "After observeOn(mainThread), current thread is: " + Thread.currentThread().getName());
}
})
map操作符,
位于被观察者与观察者中间,将被观察者转换为另外一种类型
.map(new Function<Integer, String>() {
@Override
public String apply(Integer integer) throws Exception {
return "This is result " + integer;
}
})
FlatMap
位于被观察者与观察者中间
将被观察者挨个拆分成Observables,并发送每一个Observable
.flatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);//fromIterable将list转换为Observables delay为延时
}
}
问题
FlatMap调用子Observable是无序的
解决方案
使用concatMap 顺序执行
.concatMap(new Function<Integer, ObservableSource<String>>() {
@Override
public ObservableSource<String> apply(Integer integer) throws Exception {
final List<String> list = new ArrayList<>();
for (int i = 0; i < 3; i++) {
list.add("I am value " + integer);
}
return Observable.fromIterable(list).delay(10,TimeUnit.MILLISECONDS);
}
})
zip操作符
参数1,2都是observable 参数3为 压缩算法
发送的事件数量和两个observable中最短事件数量的哪一个有关
如果observable和observer在同一个线程 执行顺序会 observable1先发送,observable2在发送,当observable2每发送一次都会进行zip
如果observable和observer在不同线程,则observer会同时发送
Observable.zip(observable1, observable2, new BiFunction<Integer, String, String>() {
@Override
public String apply(Integer integer, String s) throws Exception {
return integer + s;
}
})
问题
Observable发送事件过快Observer处理事件过慢,Observable发送的事件去哪里了?
解决方案
使用一个容器将Observable发送的事件储存起来,等待Observer挨个读取
问题
储存Observable发送事件的容器容量有限,如何解决Observable发送过快,Observer处理过慢的问题
解决方案
使用Backpressure背压,及
1.加入延时是Observable发送慢点(从速度上进行治理, 减缓事件发送进水缸的速度)
Observable发送事件时延时一段时间
2.容器不保存全部,定时保存,或某些特定的条件才保存(从数量上进行治理, 减少发送进水缸里的事件)
.sample(2, TimeUnit.SECONDS) //sample取样 两秒获取一次事件放入容器
.filter(new Predicate<Integer>() {//过滤器返回true才会发送到容器
@Override
public boolean test(Integer integer) throws Exception {
return integer % 10 == 0;
}
})
Flowable对上一个问题的解决方案做了一个封装 自带了一个128大小的容器
参数2为容器满了的时候的解决方案
取值
BackpressureStrategy.DROP 超过满的都删除
BackpressureStrategy.LATEST 一直保存最新的,将老的删除
BackpressureStrategy.BUFFER 无限存放,不删除
Flowable.create(new FlowableOnSubscribe<Integer>() {
@Override
public void subscribe(FlowableEmitter<Integer> emitter) throws Exception {
Log.d(TAG, "emit 1");
emitter.onNext(1);
Log.d(TAG, "emit 2");
emitter.onNext(2);
Log.d(TAG, "emit 3");
emitter.onNext(3);
Log.d(TAG, "emit complete");
emitter.onComplete();
}
}, BackpressureStrategy.ERROR); //增加了一个参数
Subscriber
Subscriber<Integer> downstream = new Subscriber<Integer>() {
@Override
public void onSubscribe(Subscription s) {
//Subscription使用Subscription.cancel() 关闭管道连接 Subscription.request(long)来请求容器获取多少个事件
Log.d(TAG, "onSubscribe");
s.request(Long.MAX_VALUE); //注意这句代码
}
@Override
public void onNext(Integer integer) {
Log.d(TAG, "onNext: " + integer);
}
@Override
public void onError(Throwable t) {
Log.w(TAG, "onError: ", t);
}
@Override
public void onComplete() {
Log.d(TAG, "onComplete");
}
};
问题
同步线程中 下游不调用Subscription.request(long)会抛出MissingBackpressureException,而异步线程不会
解决方案
同步线程中,下游不调用Subscription,则上游会认为下游无处理数据能力,因为在同一线程中,下游不处理上游不能干等,所以抛出异常提醒用户
网友评论