序言
由于我是先学习了1.0版本接着继续学习2.0,所以本文可能不太适合没有接触过RxJava的同学,所以可以先看一下,1.0的学习笔记,传送门 http://www.jianshu.com/p/a8cedc061ab1
首先要使用RxJava2,先要添加依赖:
compile 'io.reactivex.rxjava2:rxjava:2.0.1'
compile 'io.reactivex.rxjava2:rxandroid:2.0.1'
类介绍
与RxJava1相比:
-
相同点:
都有Observable,Observer,subscribe() - 不同点:
-
在RxJava2的Observable中重写的方法变为subscribe(ObservableEmitter<String> emitter),其中的ObservableEmitter: Emitter是发射器的意思,就是用来发出事件的。
-
在RxJava2的observer重写方法中新添加了一个方法onSubscribe(Disposable d),其中Disposable是一次性用品,用完就丢弃,对与这个参数可以理解为一个拦截器,将所有发送过来的数据拦截下俩,让observer不会收到。
-
在RxJava2中subscribe()重载方法的参数变了。
public final Disposable subscribe() {} public final Disposable subscribe(Consumer<? super T> onNext) {} public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError) {} public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete) {} public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError, Action onComplete, Consumer<? super Disposable> onSubscribe) {} public final void subscribe(Observer<? super T> observer) {}
在实际项目中我们一般只关心onNext(),和onError(),所以我们一般只会重载两个参数的。
然后我们用代码来理解一下上面的相同点和不同点
Observable<Integer> observable = Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> e) throws Exception {
Log.e(TAG, "subscribe: 1" );
e.onNext(1);
Log.e(TAG, "subscribe: 2" );
e.onNext(2);
Log.e(TAG, "subscribe: 3" );
e.onNext(3);
e.onComplete();
}
});
Observer<Integer> observer = new Observer<Integer>() {
private Disposable disposable;
@Override
public void onSubscribe(Disposable d) {
Log.e(TAG, "onSubscribe: " + d.isDisposed());
disposable = d;
}
@Override
public void onNext(Integer value) {
Log.e(TAG, "onNext: " + value);
if (value == 2) {
disposable.dispose();
Log.e(TAG, "onNext: " + disposable.isDisposed());
}
}
@Override
public void onError(Throwable e) {
Log.e(TAG, "onError: " + e.getMessage());
}
@Override
public void onComplete() {
Log.e(TAG, "onComplete: ");
}
};
observable.subscribe(observer);
运行结果如下所示:
com.example.cosima.rxjava2learn E/MainActivity: onSubscribe: false
com.example.cosima.rxjava2learn E/MainActivity: subscribe: 1
com.example.cosima.rxjava2learn E/MainActivity: onNext: 1
com.example.cosima.rxjava2learn E/MainActivity: subscribe: 2
com.example.cosima.rxjava2learn E/MainActivity: onNext: 2
com.example.cosima.rxjava2learn E/MainActivity: onNext: true
com.example.cosima.rxjava2learn E/MainActivity: subscribe: 3
可以看到在disposable为true的时候,Observable可以发送数据,但是在Observer没有接收到数据。
注:与RxJava1相同,当Observable发送onComplete之后,Observable在onComplete之后的数据可以发送,Observer在接收到onComplete之后不再继续接收事件。onError与onComplete的原理一样,但是onComplete和onError必须唯一并且互斥, 即不能发多个onComplete, 也不能发多个onError, 也不能先发一个onComplete, 然后再发一个onError, 反之亦然。
创建方式
想对于RxJava1来说,Observable的创建方式心添加了几种方式:
-
fromIterable((Iterable<? extends T> list)方式
遍历集合,发送每个item相当于多次调用onNext().
注Collection接口是Iterable接口的子接口,所以所有Collection接口的实现类都可以作为Iterable对象直接传入fromIterable()方法。
-
fromArray(T... items)方式
遍历集合,发送每个item相当于多次调用onNext(). -
interval(long period, TimeUnit unit)方式
创建一个按固定时间间隔发射整数序列的Observable,可用作定时器 period:时间间隔 -
interval(long initialDelay, long period, TimeUnit unit)方式
initialDelay:开始值,默认为0。 -
range(final int start, final int count)方式
创建一个发射特定整数序列的Observable,第一个参数为起始值,第二个为发送的个数,如果为0则不发送,负数则抛异常。 -
timer(long delay, TimeUnit unit)方式
一个给定的延迟后发射一个特殊的值,即表示延迟2秒后,调用onNext()方法。
用于实现观察者模式方式有很多种:
<center>
现在我们来实现一个简单的倒计时功能:
封装Observable
private Observable<Integer> initEvent2(final int time) {
return Observable.interval(0, 1, TimeUnit.SECONDS)
.subscribeOn(AndroidSchedulers.mainThread())
.observeOn(AndroidSchedulers.mainThread())
.map(new Function<Long, Integer>() {
@Override
public Integer apply(Long aLong) throws Exception {
return time - aLong.intValue();
}
})
.take(time + 1);//限制循环次数
}
使用如下:
initEvent2(5).doOnSubscribe(new Consumer<Disposable>() {
@Override
public void accept(Disposable disposable) throws Exception {
Log.e(TAG, "accept: 开始计时");
}
})
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
Log.e(TAG, "accept: 当前计时" + integer);
}
});
打印结果为
在上述代码中我们使用到了map()操作符,接下来我们就一起来看下RxJava中的操作符
RxJava操作符
1.map()操作符:
把原来的Observable对象转换成另一个Observable对象
2.flatMap()操作符:
flatMap和Map的相同点就是把一个对象转化为另一个对象返回,但是不同的是flatMap()返回的是个Observable对象,并且这个对象并不是直接发送到了回调方法中,而是把这个对象激活,之后将他发送到回调方法中。
3.filter()操作符:
根据自己的需求加入判断逻辑,他的返回值是true或者是false,用于表示是否需要被过滤。
4.take()操作符:
再上面的代码中已经用到过,具体含义就是限制输出次数
5.doOnNext()操作符:
在输出前可以做一个额外的操作
6.delay()操作符:
延迟Observer的输出
线程控制Scheduler
该部分与RxJava1相比没有变换,在平常使用中较多的都是以下几个:
- Schedulers.newThread(): 总是启用新线程,并在新线程执行操作。
- Schedulers.io(): I/O 操作(读写文件、读写数据库、网络信息交互等)所使用的 Scheduler。行为模式和 newThread() 差不多,区别在于 io() 的内部实现是用一个无数量上限的线程池,可以重用空闲的线程,因此多数情况下 io() 比 newThread() 更有效率。不要把计算工作放在 io() 中,可以避免创建不必要的线程。
- Schedulers.computation(): 计算所使用的 Scheduler。这个计算指的是 CPU 密集型计算,即不会被 I/O 等操作限制性能的操作,例如图形的计算。这个 Scheduler 使用的固定的线程池,大小为 CPU 核数。不要把 I/O 操作放在 computation() 中,否则 I/O 操作的等待时间会浪费 CPU。
- Android 还有一个专用的 AndroidSchedulers.mainThread(),它指定的操作将在 Android 主线程运行。
都是通过subscribeOn()和 observeOn() 两个方法来对线程进行控制。
具体可见RxJava1.0学习笔记
与Retrofit结合使用
与1.0版本总体相差不多,只需要修改相应的参数就可以了,比如subcribe()方法中的参数,请求如下所示:
主要内容为红框所示
接下来讲一下RxJava2特有的了。
Flowable及Backpressure
来由
Rxjava2中,还有一个很大的变化就是Backpressure(背压),何为背压,就是观察者来不及处理被观察者发出的事件(产生事件的速度大于处理事件的速度),导致事件被无限堆积,最后产生异常。Flowable就是由此产生,专门用来处理这类问题。将原来的Observable拆分成了新的Observable和Flowable,同时其他相关部分也同时进行了拆分。
注意:处理Backpressure的策略仅仅是处理Subscriber接收事件的方式,并不影响Flowable发送事件的方法。即使采用了处理Backpressure的策略,Flowable原来以什么样的速度产生事件,现在还是什么样的速度不会变化,主要处理的是Subscriber接收事件的方式。
处理Backpressure的策略
-
产生原因:
生产者和消费者不在同一线程下,生产者的速度大于消费者的速度,就会产生Backpressure问题。如果生产者和消费者在同一线程下,不会产生Backpressure问题,所以可以说成同步不会产生问题,异步可能产生问题。 -
处理策略:
1. ERROR策略
产生Backpressure问题的时候直接抛出异常(MissingBackpressureException)Flowable<String> flowable = Flowable.create(new FlowableOnSubscribe<String>() { @Override public void subscribe(FlowableEmitter<String> e) throws Exception { e.onNext("我"); e.onNext("爱"); e.onNext("你"); e.onComplete(); } }, BackpressureStrategy.ERROR); Subscriber<String> subscriber = new Subscriber<String>() { @Override public void onSubscribe(Subscription s) { Log.e(TAG, "onSubscribe: "); s.request(Long.MAX_VALUE); } @Override public void onNext(String s) { Log.e(TAG, "onNext: " + s); } @Override public void onError(Throwable t) { Log.e(TAG, "onError: " + t.getMessage()); } @Override public void onComplete() { Log.e(TAG, "onComplete: " ); } }; flowable.subscribeOn(Schedulers.io()) .observeOn(AndroidSchedulers.mainThread()) .subscribe(subscriber);
上述代码中在Flowable(被观察者)添加了一个参数,并且onSubscribe(Subscription s)中传给我们的不再是Disposable了, 而是Subscription。然而Subscription也可以用于切断观察者与被观察者之间的联系,调用Subscription.cancel()方法,并在此方法中添加了s.request(long count);这个方法就是用来向生产者申请可以消费的事件数量。这样就可以根据自身的消费能力进行消费。在异步调用时,RxJava中有个缓存池,用来缓存消费者处理不了暂时缓存下来的数据,缓存池的默认大小为128,即只能缓存128个事件。无论request()中传入的数字比128大或小,缓存池中在刚开始都会存入128个事件。当然如果本身并没有这么多事件需要发送,则不会存128个事件。
在ERROR策略下,如果缓存池溢出,就会立刻抛出MissingBackpressureException异常。
注:如果不调用request表示消费能力为0。如果不限制想request()中传入任意参数,超过消费能力,也会造成资源浪费,产生OOM。
2. BUFFER策略
BUFFER就是把RxJava中默认的只能存128个事件的缓存池换成一个大的缓存池,支持存很多很多的数据。
这样,消费者通过request()即使传入一个很大的数字,生产者也会生产事件,并将处理不了的事件缓存。
但是这种方式任然比较消耗内存,除非是我们比较了解消费者的消费能力,能够把握具体情况,不会产生OOM。BUFFER要慎用
3. DROP策略
当消费者处理不了事件,就丢弃。
消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。
4. LATEST策略
LATEST与DROP功能基本一致。
消费者通过request()传入其需求n,然后生产者把n个事件传递给消费者供其消费。其他消费不掉的事件就丢掉。
唯一的区别就是LATEST总能使消费者能够接收到生产者产生的最后一个事件。
还是以上述例子展示,唯一的区别就是Flowable不再无限发事件,只发送1000000个。
参考:http://www.jianshu.com/p/d149043d103a
源码地址:https://github.com/MrMJL/RxJava2Demo
由于只是学习笔记,源码可能会有点乱,又不对或者不明白欢迎评论多多交流。推荐一个Android实习&&经验交流群:541144061
网友评论