零 书籍信息
中信出版社出版,沈哲著。ISBN:9787121337222。
当当电子书5块钱,不过当当电子书感觉用起来不怎么爽。
第一章 RxJava简介
1.1 你需要了解的函数响应式编程
1. 响应式编程
- 响应式编程的特点:
1)异步编程:提供了合适的异步编程模型,能够挖掘多核CPU的能力,提高效率,降低延迟和阻塞等。
2)数据流:基于数据流模型,响应式编程提供了一套统一的Stream风格的数据处理接口。与Java 8中的Stream相比,响应式编程除了支持静态数据流,还支持动态数据流,并且支持复用和同时接入多个订阅者。
3)变化传播:以一个输入流为输入,经过一连串操作转化为另一个数据流,然后分发给各个订阅者的过程。 - 响应式编程在用户界面及基于实时系统的动画方面有广泛的应用;也在处理嵌套回调的异步事件、负责的列表过滤和变换的时候也有良好的表现。
- 和前端的响应式设计是不同的概念。前端的响应式设计指的是自适应。
2.函数式编程
- 函数式编程中,由于数据是不可变的,因此没有并发编程的问题,是线程安全的。它将计算机运算看做数学中的的函数计算,主要特点是将计算过程分解为多个可复用的函数,并且避免了状态及变量的概念。
- 函数式编程具有以下特点:
1) 函数是“第一等公民” :所谓第一等公民是指函数与其他数据类型一样,处于平等地位,可以赋值给其他变量,也可以作为参数,传入另外一个函数,或者作为别的函数的返回值。
2)闭包和高阶函数:闭包是起函数的作用并可以像对象一样操作的对象。
3)递归:把递归作为控制流程的机制。
4)惰性求值。
5)没有“副作用”:指的是函数内部与外部互动,产生运算以为的其他结果。函数式编程强调没有“副作用”,意味着函数要保持独立,所有功能就是返回一个新值,没有其他行为,尤其是不能修改外部变量的值。
1.2 RxJava简介
1.RxJava的由来
RxJava是rx的Java实现。
2. Rx是什么
RX是reactive Extension的缩写,是由微软架构师所开发的一种编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流。
3.ReactiveX的历史
微软的定义是,rx是一个函数库。
reactiveX.io给的定义是:Rx是一个使用可观察数据流进行异步编程的编程接口,ReactiveX结合了观察者模式、迭代器模式和函数式编程的精华。
4.RX模式
- 使用观察者模式
- 创建:Rx可以方便的创建事件流和数据流。
- 组合:Rx使用查询式的操作符组合和变换数据流。
- 监听:Rx可以订阅任何可观察的数据流并执行操作。
- 简化代码
- 函数式风格:对可观察的数据流使用无副作用的输入/输出函数,避免程序里错综复杂的状态。
- 简化代码:Rx操作符通常可以将复杂的难题简化为很少的几行代码。
- 异步错误处理:传统的try/catch没办法处理异步寄宿,Rx提供了合适的错误处理机制。
- 轻松使用并发:Rx的Observables和Schedulers可以让开发者摆脱底层的线程同步和各种并发问题。
总之,RxJava是Rx在JVM平台上的一个实现,通过使用观察者序列来构建异步、基于事件的程序。
1.3 为何选择RxJava
- 可组合
对于单层的异步操作来说,Java中的Future对象处理非常简单有效,但是一旦涉及嵌套,它们就开始变得非常繁琐和复杂。 - 更灵活
RxJava的Observable不仅支持单独的标量值(就想Future可以做的),还支持数据序列,甚至是无穷的数据流。 - 无偏见
Rx对于并发性或异步性没有任何特殊的偏好,Observable可以用任何方式(如线程池、事件循环、非阻塞I/O)来满足我们的需求。无论我们怎样实现它,无论底层实现时阻塞还是非阻塞的,客户端代码都将与Observable的全部交互当成是异步的。
1.4 RxJava能做什么
能做的很多,但是最合适的就是多线程处理,超方便。Android和服务端都可以。
1.6 小结
RxJava是结合了多种设计模式并优化而产生的结晶。
RxJava的思维导图。
思维导图
第2章 RxJava基础知识
2.1 Observable
RxJava的使用通常需要三步。
1)创建Observable
Observable的字面意思就是被观察者,使用RxJava时需要创建一个被观察者,它会决定什么时候触发怎样的事件。可以看做是上游发送命令。
2)创建Observer
Observer即观察者,它可以在不同的线程中执行任务。这种模式极大的简化了并发操作,因为它创建了饿一个处于待命状态的观察者哨兵,可以在未来某个时刻响应Observable的通知,而不需要阻塞等待Observable发射数据。
3)使用subscribe()进行订阅
创建了Observable和Observer之后,需要使用subscribe()方法把它们连接起来。
可以简化为:
创建Observable对象,创建Observer对象,然后用subscribe()方法进行关联。
可以看一段代码示例:
Obserbale.just("hell world").subscribe(new Consume<String>(){
@Override
public void accept(@NonNull String s) throws Exception{
System.out.println(s);
}
}
subscribe()方法有多个重载的方法。
在RxJava中,被观察者(Observable)、观察者(Observer)、subscribe()方法三者缺一不可,只有使用了subscribe()方法,被观察者才会开始发送数据。
5中被观察者模式
5种被观察者模式描述
只有Flowable支持背压,所以在需要背压的情况下,则必须使用Flowable。
do操作符
do操作可以给Observable的生命周期的各个阶段加上一系列的回调监听,当Observable执行到这个阶段时,这些回调就会被触发。在RxJava中包含了很多doXXX操作符。各个do操作符的用途如下:
操作符 | 用途 |
---|---|
doOnSubscribe | 一旦观察者订阅了Observable,它就会被调用 |
doOnLifecycle | 可以在观察者订阅之后,设置是否取消订阅 |
doOnNext | 它产生的Observable每发射一项数据就会调用它一次,它的Consumer接受发射的数据项。一般用于在subscribe之前对数据进行处理 |
doOnEach | 它产生的Observable每发射一项数据就会调用它一次,不仅包括onNext,还包括onError和onCompleted |
doAfterNext | 在onNext之后执行,而doOnNext()是在onNext之前执行 |
doOnCompolete | 当它产生Observable在正常终止调用onComplete时会被调用 |
doFinally | 在当产生的Observable终止之后被调用,无论是否正常终止还是异常终止。doFinally优先于doAfterTerminate的调用 |
doAfterTerminate | 注册一个Action,当Observable调用onComplete或onError时触发 |
2.2 Hot Observable和Cold Observable
1. Observable的分类
Observable有Hot和Cold之分。Hot Observable无论有没有订阅者订阅都会发送事件。当Hot Observable有多个订阅者时,Hot Observable与订阅者的关系时一对多的关系,可以与多个订阅者共享信息。
Cold Observalbe是只有观察者订阅了,才开始进行发射数据流代码,并且是一对一的关系。当有多个不同的订阅者时,消息是重新完整发送的。也就是说对于多个Observer,他们各自的事件是独立的。
Hot Obseravle是一个电台,大家听到的是同一首歌;Cold Observable是一张音乐CD,人们可以独立购买并听取它。
2.Cold Observable
Observable的just、create、range、fromXXX等操作符都能生成Cold Observable。尽管Cold Observable很好,但是对于某些事件不确定何时发生及不确定Observable发射的元素数量的情况,还需要Hot Observable。
3.Cold Observable如何转换成Hot Observable
1)使用publish,生成ConnectableObservable。
ConnectableObservable<Long> observable = Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> e) throws Exception {
Observable.interval(10, TimeUnit.MILLISECONDS, Schedulers.computation()).take(Integer.MAX_VALUE).subscribe(e::onNext);
}
}).observeOn(Schedulers.newThread()).publish();
//ConnectableObservable需要调用connect()方法才能真正执行
observable.connect();
Consumer<Long> subscribe1 = new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("subscriber1:" + aLong);
}
};
Consumer<Long> subscribe2 = new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("subscriber1:" + aLong);
}
};
observable.subscribe(subscribe1);
observable.subscribe(subscribe2);
try {
Thread.sleep(20L);
} catch (InterruptedException e) {
e.printStackTrace();
}
输出结果为:
subscriber1:0
subscriber2:0
subscriber1:1
subscriber2:1
subscriber1:2
subscriber2:2
ConnectableObservable是线程安全的,需要调用connect()方法才会执行。
2)使用Subject/Processor
Subject和Processor的作用相同。Processor是RxJava 2.x新增的类,继承自Flowable,支持背压控制,而Subject则不支持背压控制。
Subject既是Observable,又是Observer(Subscriber)。这一点可以从Subject的源码上看到,继承自Observable,实现Observer。
Subject作为观察者,可以订阅目标Cold Observable,使对方开始发送事件。同是它又作为Observable转发或者发送新的事件,让Cold Observable借助Subject转换为Hot Observable。
Subject并不是线程安全的,如果想要其线程安全,则需要调用toSerialized()方法。
4. Hot Observable如何转换成Cold Observable
1)ConnectableObservable的refCount操作符
RefCount操作符把一个可连接的Observable连接和断开的过程自动化了。它操作一个可连接的Observable,返回一个普通的Observable。当第一个订阅者/观察者订阅这个Observable时,RefCount连接到下层的可连接Observable。RefCount跟踪有多少个观察者订阅它,知道最后一个观察者完成,才断开与下层可连接Observable的连接。
示例代码:
package chapter2;
import io.reactivex.Observable;
import io.reactivex.ObservableEmitter;
import io.reactivex.ObservableOnSubscribe;
import io.reactivex.disposables.Disposable;
import io.reactivex.functions.Consumer;
import io.reactivex.observables.ConnectableObservable;
import io.reactivex.schedulers.Schedulers;
import java.util.concurrent.TimeUnit;
/**
* @author shaopeng.wei
* @since 2019-01-14 14:45
* Purpose
*/
public class ConnectObservable2ColdObservable {
public static void main(String[] args) throws InterruptedException {
Consumer<Long> subscriber1 = new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("subscriber1:" + aLong);
}
};
Consumer<Long> subscriber2 = new Consumer<Long>() {
@Override
public void accept(Long aLong) throws Exception {
System.out.println("subscriber2:" + aLong);
}
};
Observable<Long> longObservable = Observable.create(new ObservableOnSubscribe<Long>() {
@Override
public void subscribe(ObservableEmitter<Long> e) throws Exception {
Observable.interval(5, TimeUnit.MILLISECONDS, Schedulers.io()).take(Integer.MAX_VALUE).subscribe(e::onNext);
}
}).observeOn(Schedulers.newThread());
System.out.println("默认cold Observable,subscriber1先发送10ms,预期subscriber2不受干扰从0开始");
Disposable subscribe1 = longObservable.subscribe(subscriber1);
Thread.sleep(10L);
Disposable subscribe2 = longObservable.subscribe(subscriber2);
Thread.sleep(20L);
subscribe1.dispose();
subscribe2.dispose();
System.out.println("和预期一直,并没有因为subscriber1先执行而导致subscriber2跟着subscriber1开始");
System.out.println("————————————————————————————————————————————————————————————————————");
System.out.println("Cold Observable转化为Hot 不会重新开始,所有subscriber2会跟着subscriber1的数,不会从0开始。");
ConnectableObservable<Long> publish = longObservable.publish();
publish.connect();
Disposable disposable1 = publish.subscribe(subscriber1);
Thread.sleep(10L);
Disposable disposable2 = publish.subscribe(subscriber2);
Thread.sleep(20L);
disposable1.dispose();
disposable2.dispose();
System.out.println("发现和预期一致");
System.out.println("————————————————————————————————————————————————————————————————————");
System.out.println("预期和cold Observable一样");
Observable<Long> observable = publish.refCount();
disposable1 = observable.subscribe(subscriber1);
Thread.sleep(20L);
disposable2 = observable.subscribe(subscriber2);
Thread.sleep(20L);
disposable1.dispose();
disposable2.dispose();
System.out.println("发现预期并不一样。因为如果不是所有的订阅者/观察者取消了订阅,而只是部分取消,则部分的订阅者/观察者重新开始订阅时,不会从头开始数据流");
}
}
当不是所有的订阅者/观察者取消订阅,而只是部分取消时,部分订阅者/观察者重新订阅时,数据流并不会从头开始
2)Observable的share操作符
share操作符封装了publish的.refCount()调用。
2.3 Flowable
Flowable是RxJava 2.x引入的新的实现,支持非阻塞式的背压。与FLowable相比,使用Observable较好的场景如下:
- 一般处理不超过1000条数据,并且几乎不会出现内存溢出;
- GUI鼠标事件,基本不会背压
- 处理同步流
使用Flowable较好的场景如下: - 处理以某种方式产生超过10KB的元素;
- 文件读取与分析;
- 读取数据库记录,也是一个阻塞和基于拉取模式;
- 网络I/O流
- 创建一个响应式非阻塞接口
2.5 Single、Completable和Maybe
1.SIngle
从SingleEmitter源码可以看出,Single的只有onSuccess和onError事件。其中onSuccess()用于发射数据(在Observable/Flowable中使用onNext()来发射数据),而且只能发射一个数据,后面即使再发射数据也不会做任何处理。
Single的SingleObservable中只有onSuccess和onError,并没有onComplete,这也是Single与其他4中观察者之间最大的区别。
Single可以通过toXXX方法转化成Observable、Flowable、Completable以及Maybe。
2.Completable
Completable在创建后,不会发射任何数据。只有onComplete和onError事件,同事Completable并没有map、flatMap等操作符,它的操作符比Observable/Flowable要少很多。
Completable经常结合andThen操作符使用。
3.Maybe
Maybe可以看做Single和Completable的结合体。
2.5 Subject和Processor
2.5.1 Subject是一种特殊的存在
Subject及时Observable也是Observer。官网称可以看做一个桥梁或者代理。
1.Subject的分类
Subject包含4中类型,分别是AsyncSubject、BehaviorSubject、ReplaySubject和PublishSubject。
1) AsyncSubject
Observer会接受AsyncSubject的onComple()之前的最后一个数据。即在前面有多个onNext,只有在onComplete前的一个生效。
注意:可以看做必须调用subject.onComplete才会开始发送数据,否则观察者将不接受任何数据。
2). BehaviorSubject
Observer会先接受到BehaviorSubject被订阅之前的最后一个数据,再接受订阅之后发射过来的数据,如果BehaviorSubject被订阅之前没有发送任何数据,则会发送一个默认数据。
3)ReplaySubject
ReplaySubject会发射所有来自原始Observable的数据给观察者,无论他们是合适订阅的。
ReplaySubject除了可以通过createWithSize来限制缓存数据的数量外,还可以通过createWithTime()来限制缓存时间。
4)PublishSubject
Observer只接受PublishSubject被订阅之后发送的数据。
通过前面的介绍,总结4个Subject的特性为:
Subject | 发射行为 |
---|---|
AsnycSubject | 不论订阅在什么时候发生,只发射最后一个数据 |
BehaviorSubject | 发送订阅前的一个数据和订阅后的全部数据 |
ReplaySubject | 不论订阅在什么时候,都全部发射 |
PublishSubject | 发送订阅之后的全部数据 |
第3章 创建操作符
RxJava常用操作符- jush():将一个或多个对象转换成发射这个或这些对象的一个Observable。
- from():将一个Iterable、一个Future或者一个数组转换成一个Observable。
- create():使用一个函数从头创建一个Observable。
- defer():只有当订阅者订阅才创建Observable,为每个订阅者创建一个新的Observable。
- range():创建一个发射指定范围的整数序列的Observable。
- interval():创建一个按照给定的时间间隔发射整数序列的Observable。
- timer():创建一个在给定的延时之后发射耽搁数据的Observable。
- empty():创建一个什么都不做直接通知完成的Observable。
- error():创建一个什么都不做直接通知错误的Observable。
- never():创建一个不发射任何数据的Observable。
3.1 create、just和from
1. create
用函数从头创建一个Observable。RxJava建议我们在传递给create方法的函数时,先检查一下观察者的isDisposed状态,以便在没有观察者的时候,让我们的Observable停止发射数据,防止运行昂贵的运算。
2. just
just类似于from,但是from会将数组或Iterable数据取出然后逐个发射,而just只是简单的原样发射,将数组或iterable当做单个数据。
它可以接受一致十个参数,返回一个按参数列表顺序发射这些数据的Observable。
传入null会报npe
3.from
from有一个两个参数的方法,可以设置超时时间和超时单位。
Observable.fromFuture(future,3,TimeUnit.SECONDS)
3.2 repeat
创建一个发射特定数据重复多次的Observable。repeat会重复地发射数据,某些实现允许我们重复发射某个数据序列,还有一些允许我们限制重复的次数。
1. repeatWhen
repeatWhen不是缓存或重放原始Observable的数据序列,而是有条件的重新订阅和发射原来的Observable。
2.repeatUntil
表示直到某个条件就不再重复发射数据,即当条件为true时,表示中止重复发射上游的Observable。
3.3 defer、interval和timer
1. defer
直到有观察者订阅时才创建Observable,并且为每个观察者创建一个全新的Observable,如图:
defer操作符
2.interval
创建一个按照固定时间间隔发射整数序列的Observable。
3.timer
创建一个Observable,它在一个特定的延迟后发射一个特殊的值。timer操作符创建一个在特定的时间段之后返回一个特殊值的Observable。
第4章 RxJava的线程操作
4.1 调度器(Scheduler)种类
1. RxJava线程介绍
RxJava是为了异步编程实现的一个框架,异步是其重要的特性。
2.Scheduler
Scheduler是RxJava对线程控制器的一个抽象,RxJava内置了多个Scheduler的实现,他们基本满足绝大多数使用场景。
Scheduler | 作用 |
---|---|
single | 使用定长为1的线程池,重复利用这个线程 |
newThread | 每次启用新线程,并在新线程中执行操作 |
computation | 使用固定的线程池,大小为CPU核数,适用于CPU密集型计算 |
io | 适合I/O操作。行为模式和newTread()差不多,区别在于io()内部是一个无数量上限的线程池,可以重用空闲的线程。 |
trampoline | 直接在当前线程运行,如果当前线程中有其他任务正在之慈宁宫,则会先暂停其他任务 |
Scheduler.from | 将java.util.comcurrent.Executor转换成一个调度器实例,即可以自定义一个Executor来作为调度器 |
4.2 RxJava线程模型
2. 线程调度
默认情况下不做任何线程处理,Observable和Observer处于同一线程中。如果想要切换线程,则可以使用subscribeOn()和ObserverOn()
1)subscribeOn
subscribeOn()改变上游发射数据的线程池,只有第一次有效。
2)observeOn
observeOn用来指定下游操作运行在特定的线程调度器Scheduler上。
多次执行,每次执行都会切换。
4.3 Scheduler的测试
TestScheduler是专门用于测试的调度器,与其他调度器的区别是:TestScheduler只有被调用了时间才会继续,是一种特殊的、非线程安全的调度器,用于测试一些不引入真是并发性、允许手动推进虚拟时间的调度器。
TestScheduler用来测试一些需要精确时间的任务是非常合适的,减少了等待时间。
第5章 变换操作符和过滤操作符
RxJava的变换操作符主要包括以下几种。
- map:对序列的每一项都用一个函数来变换Observable发射的数据序列
- flapMap、concatMap、flatMapIterable:将Observable发射的数据集合变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable
- switchMap:将Observable发射的数据集合变换为Observables集合,然后值发射这些Observables
第6、7章
这两章都是一些操作符,没太多深入信息快速浏览。
第8章 RxJava的背压
8.1 背压
指在异步场景下,被观察者发送时间速度远快于观察者处理的速度,从而导致buffer溢出,这种现象叫做背压。
- 背压必须是在异步的场景下才会出现,即被观察者和观察者处于不同的线程中。
- RxJava是基于push模型的,生产者有数据就发送给消费者。
- 在RxJava 2.x中,只有新增的Flowable是支持背压的。
8.2 RxJava 2.x的背压策略
RxJava 2.x中,默认队列大小为128,并且要求所有操作符强制支持背压。一共有5种背压策略。
1. MISSING
此策略表示创建的Flowable没有指定背压策略,不会做缓存或丢弃处理,需要下游通过背压操作符指定背压策略
2. ERROR
表示如果放入Flowable的异步缓存池中的数据超限了,则会抛出MissingBackpressureException异常。
3. BUFFER
此策略表示异步患处吃同Observable的一样,没有固定大小,可以无限制添加数据,不会抛出MissingBackpressureException异常,但是会导致OOM。
4. DROP
此策略表示,如果FLowable的异步缓冲池满了,则会丢掉将要放入缓存池中的数据。
5.LATEST
此策略表示,如果缓存池满了,会丢掉将要放入缓存池中的数据,但是不管缓存池状态如何,LASTEST策略会将最后一条数据强行放入缓存池中。
第9章 Disposable和Transformer的使用
9.1 Disposable
Observable.subscribe()方法会返回一个Disposable的对象,可以通过调用dispose方法来切断数据流。
CompositeDisposable
可以使用CompositeDisposable.add()方法把多个Disposable对象添加到容器中,然后通过CompositeDisposable.clear()即可切断所有的事件。
第10章 RxJava的并行编程
10.1 RxJava并行操作
并发是指一个处理器同事处理多个任务。并行是多个处理器或者多核处理器同时处理多个不同的任务。
总结
这本书是一本流水账式的操作手册,感觉写的不是很好。对于初学者没有写如何上手,对于资深者又没有太深入的东西。全程快速的扫描了一遍,主要是弥补部分遗漏的知识点,看完收获不是很大。总体来说不值得卖纸质版,当当的5块钱电子版的定价就不错。
网友评论