响应式编程是一种关注数据流和变化传递的异步编程方式
什么是数据流?
数据流被称为流处理
传统上的程序指的是按照特定的顺序执行的一系列操作,通常称为控制流
数据流编程强调数据的传递,并将程序构建为一系列的连接。数据库,用户输入,数组,网络请求都属于数据流
什么是变化传播?
一种数据流输入,经过一系列的操作之后转换为另外一个数据流,然后再分发给各个订阅者
什么是异步编程?
传统的编程方式是顺序执行的,必须在完成了上一个任务之后才能执行下一个任务。
比如我们有五个接口,个别接口有相互依赖关系,我们以前的做法就是顺序执行,一个一个执行完毕之后最 后才进行UI界面刷新,这样导致了程序依赖性大大增强并且程序可读性很差“,我们想让其响应速度升级应该 怎么做?我们可以把同步执行换成异步执行,这样的优点是,当你的任务不依赖彼此时,我们可以同时启动多 个任务,而不是等到执行完一个任务再执行下一个
什么是RxJava?
- Reactive Extensions
- 观察者模式
- 迭代器模式(聚合对象,不触碰对象本身,游标(Cursor)模式)
- 函数式编程
为什么要用RxJava?
- 线程方便切换
- 链式调用更易于代码阅读
- 使用Lambda表达式简化代码量
- 解耦单一,不嵌套
- 强大的操作符
我们看一个经典的例子
在子线程中把某个文件夹里面的以 png 结尾的图片文件解析出来,交给 UI 线程进行渲染
new Thread() {
@Override
public void run() {
super.run();
for (File folder : folders) {
File[] files = folder.listFiles();
for (File file : files) {
if (file.getName().endsWith(".png")) {
final Bitmap bitmap = getBitmapFromFile(file);
getActivity().runOnUiThread(new Runnable() {
@Override
public void run() {
imageCollectorView.addImage(bitmap);
}
});
}
}
}
}
}.start();
我们用RxJava修改一下上述代码,如下
Observable.from(folders)
.flatMap((Func1) (folder) -> { Observable.from(file.listFiles()) })
.filter((Func1) (file) -> { file.getName().endsWith(".png") })
.map((Func1) (file) -> { getBitmapFromFile(file) })
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe((Action1) (bitmap) -> { imageCollectorView.addImage(bitmap) });
我们从最基本开始看看
val observable = Observable.create(object :ObservableOnSubscribe<Int> {
override fun subscribe(it: ObservableEmitter<Int>?) {
it?.onNext(1)
it?.onNext(2)
it?.onComplete()
}
})
val observer = object: Observer<Int> {
override fun onComplete() {
debugMsg("onComplete")
}
override fun onSubscribe(d: Disposable?) {
debugMsg("onSubscribe")
}
override fun onNext(value: Int?) {
debugMsg("==",value)
}
override fun onError(e: Throwable?) {
debugMsg("onError")
}
}
observable.subscribe(observer)
RxJava创建操作符
create 从头创建一个Observable
from 将一个Iterable, 一个Future, 或者一个数组,内部通过代理的方式转换成一个Observable
just 将一个或多个对象转换成发射这个或这些对象的一个Observable
timer 创建一个在给定的延时之后发射数据项为0的Observable<Long>
empty 创建一个什么都不做直接通知完成的Observable
error 创建一个什么都不做直接通知错误的Observable
never 创建一个什么都不做的Observable
interval 创建一个按照给定的时间间隔发射从0开始的整数序列的Observable<Long>
range 创建一个发射指定范围的整数序列的Observable<Integer>
defer 只有当订阅者订阅才创建Observable,为每个订阅创建一个新的Observable
create操作符
Observable.create(ObservableOnSubscribe<Int?> { emitter ->
emitter.onNext(1)
emitter.onNext(2)
emitter.onNext(3)
}).map(Function<Int?, String?> { integer -> "This is new result $integer" })
.subscribe { t -> debugMsg("msg--->$t") }
empty, error, never操作符
Observable observable1=Observable.empty();//直接调用onCompleted。
Observable observable2=Observable.error(new RuntimeException());//直接调用onError。这里可以自定义异常
Observable observable3=Observable.never();//啥都不做
timer操作符 实现一个延时操作,类似Handler postDelay
Observable.timer(3000, TimeUnit.MILLISECONDS)
.subscribe(object : Consumer<Long?> {
override fun accept(t: Long?) {
debugMsg("msg--->$t")
}
})
interval操作符 创建一个按照给定的时间间隔发射从0开始的整数序列的Observable<Long>
Observable.interval(1, TimeUnit.SECONDS)
.subscribe(object : Consumer<Long?> {
override fun accept(t: Long?) {
debugMsg("msg--->$t")
}
})
//打印
.MainActivity$rxjavaInterval$1;msg--->0
.MainActivity$rxjavaInterval$1;msg--->1
.MainActivity$rxjavaInterval$1;msg--->2
.MainActivity$rxjavaInterval$1;msg--->3
.MainActivity$rxjavaInterval$1;msg--->4
.MainActivity$rxjavaInterval$1;msg--->5
过滤操作符
skip 跳过操作
filter 过滤数据
ofType 过滤指定类型的数据
take 只发射开始的N项数据或者一定时间内的数据
takeFirst 提取满足条件的第一项
takeLast 只发射最后的N项数据或者一定时间内的数据
throttleFirst 定期发射Observable发射的第一项数据
debounce 去抖动
distinct 过滤重复数据
distinctUntilChanged 过滤掉连续重复的数据
timeout 如果原始Observable过了指定的一段时长没有发射任何数据,就发射一个异常或者使用备用的Observable
debounce 去抖动过滤操作符 可作用于Flowable,Observable。在Android开发,通常为了防止用户重复点击而设置标记位,而通过RxJava的debounce操作符可以有效达到该效果。在规定时间内,用户重复点击只有最后一次有效
Observable<String> source = Observable.create(emitter -> {
emitter.onNext("A");
Thread.sleep(1_500);
emitter.onNext("B");
Thread.sleep(500);
emitter.onNext("C");
Thread.sleep(250);
emitter.onNext("D");
Thread.sleep(2_000);
emitter.onNext("E");
emitter.onComplete();
});
source.subscribeOn(Schedulers.io())
.debounce(1, TimeUnit.SECONDS)
.blockingSubscribe(
item -> System.out.print(item+" "),
Throwable::printStackTrace,
() -> System.out.println("onComplete"));
打印:A D E onComplete
上文代码中,数据源以一定的时间间隔发送A,B,C,D,E。操作符debounce的时间设为1秒,发送A后1.5秒并没有发射其他数据,所以A能成功发射。发射B后,在1秒之内,又发射了C和D,在D之后的2秒才发射E,所有B、C都失效,只有D有效;而E之后已经没有其他数据流了,所有E有效。
blockingSubscribe
可以阻塞当前线程,等待执行完成(https://stackoom.com/question/3f7BX/Rxjava-blockingSubscribe-vs-subscribe
)
take过滤操作符
Observable.just(2, 3, 4, "5", 6)
.take(4)
.subscribe{
debugMsg("msg--->$it")
}
//打印
MainActivity;msg--->2
MainActivity;msg--->3
MainActivity;msg--->4
MainActivity;msg--->5
distinct过滤操作符
Observable.just(2, 3, 4, 5, 4, 3, 6)
.distinct ()
.subscribe {
debugMsg("msg--->$it")
}
//打印
MainActivity;msg--->2
MainActivity;msg--->3
MainActivity;msg--->4
MainActivity;msg--->5
MainActivity;msg--->6
变换操作符
map 上游发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化
cast 在发射之前强制将Observable发射的所有数据转换为指定类型
flatMap 将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,内部采用merge合并
flatMapIterable 和flatMap的作用一样,只不过生成的是Iterable而不是Observable
concatMap 类似于flatMap,由于内部使用concat合并,所以是按照顺序连接发射
scan 对Observable发射的每一项数据应用一个函数,然后按顺序依次发射每一个值。
groupBy 将Observable分拆为Observable集合,将原始Observable发射的数据按Key分组,每一个Observable发射一组不同的数据。
buffer 它定期从Observable收集数据到一个集合,然后把这些数据集合打包发射,而不是一次发射一个
window 定期将来自Observable的数据分拆成一些Observable窗口,然后发射这些窗口,而不是每次发射一项。
map变换操作符 上游发送的每一个事件应用一个函数, 使得每一个事件都按照指定的函数去变化
image.png
Observable.create(ObservableOnSubscribe<Int?> { emitter ->
emitter.onNext(1)
emitter.onNext(2)
emitter.onNext(3)
}).map(Function<Int?, String?> { integer -> "This is new result $integer" })
.subscribe { t -> debugMsg("msg--->$t") }
//打印
MainActivity;msg--->This is new result 1
MainActivity;msg--->This is new result 2
MainActivity;msg--->This is new result 3
flatMap变换操作符 将Observable发射的数据变换为Observables集合,然后将这些Observable发射的数据平坦化的放进一个单独的Observable,内部采用merge合并
image.png
Observable.create(ObservableOnSubscribe<Int?> { emitter ->
emitter.onNext(1)
emitter.onNext(2)
emitter.onNext(3)
}).flatMap(object : Function<Int?, ObservableSource<String?>> {
override fun apply(t: Int): ObservableSource<String?> {
val list: MutableList<String> = ArrayList()
for (i in 0..2) {
list.add("I am flatMap value $t")
}
return Observable.fromIterable(list).delay(10, TimeUnit.MILLISECONDS)
}
}).subscribe { t -> debugMsg("msg--->$t") }
//打印
MainActivity;msg--->I am flatMap value 2
MainActivity;msg--->I am flatMap value 2
MainActivity;msg--->I am flatMap value 3
MainActivity;msg--->I am flatMap value 3
MainActivity;msg--->I am flatMap value 1
MainActivity;msg--->I am flatMap value 1
错误 / 重复操作符
onErrorResumeNext 当原始Observable在遇到错误时,使用备用Observable
onExceptionResumeNext 当原始Observable在遇到异常时,使用备用的Observable。与onErrorResumeNext类似,区别在于onErrorResumeNext可以处理所有的错误,onExceptionResumeNext只能处理异常
onErrorReturn 当原始Observable在遇到错误时发射一个特定的数据
onErrorReturnItem 当原始Observable在遇到错误时发射一个特定的数据
retry 当原始Observable在遇到错误时进行重试。
retryWhen 当原始Observable在遇到错误,将错误传递给另一个Observable来决定是否要重新订阅这个Observable
onErrorResumeNext操作符 当原始Observable在遇到错误时,使用备用Observable
image.png
val list = arrayOf(1, 2, 3, 4, 5)
Observable.create(ObservableOnSubscribe<Int> { emitter ->
list.forEach {
if (it == 3) emitter.onError(Throwable())
else emitter.onNext(it)
}
}).onErrorResumeNext(Function<Throwable, ObservableSource<Int?>> { throwable ->
//拦截到错误之后,重新定义了被观察者
Observable.just(100)
})
.subscribe{
debugMsg("msg--->$it")
}
//打印
MainActivity;msg--->1
MainActivity;msg--->2
MainActivity;msg--->100
onErrorReturnItem操作符 当原始Observable在遇到错误时发射一个特定的数据
val list = arrayOf(1, 2, 3, 4, 5)
Observable.create(ObservableOnSubscribe<Int> { emitter ->
list.forEach {
if (it == 3) emitter.onError(Throwable())
else emitter.onNext(it)
}
}).onErrorReturnItem(100)
.subscribe{
debugMsg("msg--->$it")
}
//打印
MainActivity;msg--->1
MainActivity;msg--->2
MainActivity;msg--->100
合并操作符 按顺序连接多个Observables
concat 按顺序连接多个Observables
startWith 在被观察发送元素之前追加数据或者追加新的被观察者
merge 将多个Observable合并为一个。不同于concat,merge不是按照添加顺序连接,而是按照时间线来连接
zip 将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数
concat合并操作符 按顺序连接多个Observables
image.png
Observable<String> names = Observable.just("1", "2");
Observable<String> otherNames = Observable.just("3", "4","5");
names.concat(otherNames).subscribe(item -> Log.d(TAG,item));
//打印:
RxJava: 1
RxJava: 2
RxJava: 3
RxJava: 4
RxJava: 5
startWith合并操作符 在被观察发送元素之前追加数据或者追加新的被观察者
image.png
Observable<String> names = Observable.just("Spock", "McCoy");
Observable<String> otherNames = Observable.just("Git", "Code","8");
names.startWith(otherNames).subscribe(item -> Log.d(TAG,item));
//打印:
RxJava: Git
RxJava: Code
RxJava: 8
RxJava: Spock
RxJava: McCo
merge合并操作符 将多个Observable合并为一个。不同于concat,merge不是按照添加顺序连接,而是按照时间线来连接
image.png
Observable<String> names = Observable.just("Hello", "world");
Observable<String> otherNames = Observable.just("Git", "Code","8");
Observable.merge(names,otherNames).subscribe(name -> Log.d(TAG,name));
//也可以是
//names.mergeWith(otherNames).subscribe(name -> Log.d(TAG,name));
//打印:
RxJava: Hello
RxJava: world
RxJava: Git
RxJava: Code
RxJava: 8
merge在合并数据源时,如果一个合并发生异常后会立即调用观察者的onError方法,并停止合并。可通过mergeDelayError操作符,将发生的异常留到最后处理。
Observable<String> names = Observable.just("Hello", "world");
Observable<String> otherNames = Observable.just("Git", "Code","8");
Observable<String> error = Observable.error(
new NullPointerException("Error!"));
Observable.mergeDelayError(names,error,otherNames).subscribe(
name -> Log.d(TAG,name), e->Log.d(TAG,e.getMessage()));
//打印:
RxJava: Hello
RxJava: world
RxJava: Git
RxJava: Code
RxJava: 8
RxJava: Error!
zip合并操作符 将多个Observable发送的事件结合到一起,然后发送这些组合到一起的事件. 它按照严格的顺序应用这个函数
image.png
val observable1: Observable<Int?>? = Observable.create(ObservableOnSubscribe<Int?> { emitter ->
emitter.onNext(1)
emitter.onNext(2)
emitter.onNext(3)
emitter.onNext(4)
emitter.onComplete()
}).subscribeOn(Schedulers.io())
val observable2: Observable<String?>? = Observable.create(ObservableOnSubscribe<String?> { emitter ->
emitter.onNext("A")
emitter.onNext("B")
emitter.onNext("C")
emitter.onComplete()
}).subscribeOn(Schedulers.io())
Observable.zip(
observable1,
observable2,
object : BiFunction<Int?, String?, String?> {
override fun apply(t1: Int, t2: String): String {
return t1.toString() + t2
}
}).subscribe(object : Consumer<String?> {
override fun accept(value: String?) {
debugMsg("msg--->$value")
}
})
//打印
.MainActivity$rxjavaZip$2;msg--->1A
.MainActivity$rxjavaZip$2;msg--->2B
.MainActivity$rxjavaZip$2;msg--->3C
如果想要合并更多的T,我们可以看如下Function函数,最多包含九个
T
网友评论