0x01 RxJava 简介
github 地址:https://github.com/ReactiveX/RxJava
a library for composing asynchronous and event-based programs by using observable sequences for the Java VM.
一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库
关键字:异步,事件,可观测,序列
RxJava1原理读自 扔物线 于2015年10月写的 给 Android 开发者的 RxJava 详解
功能:异步,解决异步处理问题
好处:简洁,链式调用,逻辑顺序执行
0x02 RxJava1 原理
鉴于大多数同学已经相对熟悉RxJava,不再逐一深入,只进行核心说明。
本节依据以下版本进行探讨
implementation 'io.reactivex:rxjava:1.0.14'
implementation 'io.reactivex:rxandroid:1.0.1'
抛出一个示例
简化代码如下
Observable
.create( new OnSubscribe() {
void call(Subscriber subscriber) {}
})
.subscribeOn()
.flatMap()
.observeOn()
.map()
.subscribeOn()
.take()
.observeOn()
.subscribe(new Subscriber())
具体代码如下,可暂时忽略
Observable
.create(new Observable.OnSubscribe<Integer>() {
@Override
public void call(Subscriber<? super Integer> subscriber) {
subscriber.onNext(1);
subscriber.onNext(2);
subscriber.onNext(3);
subscriber.onCompleted();
}
})
.subscribeOn(Schedulers.io())
.flatMap(new Func1<Integer, Observable<String>>() {
@Override
public Observable<String> call(Integer integer) {
return Observable.just("flatMap-" + integer);
}
})
.observeOn(AndroidSchedulers.mainThread())
.map(new Func1<String, String>() {
@Override
public String call(String s) {
return "map-" + s;
}
})
.doOnSubscribe(new Action0() {
@Override
public void call() {
Toast.makeText(context, "doOnSubscribe call", Toast.LENGTH_SHORT).show();
}
})
.subscribeOn(AndroidSchedulers.mainThread())
.take(2)
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Subscriber<String>() {
@Override
public void onStart() {
Log.i(TAG, "Subscriber onStart");
}
@Override
public void onNext(String s) {
Log.i(TAG, "Subscriber onNext " + s);
}
@Override
public void onCompleted() {
Log.i(TAG, "Subscriber onStart");
}
@Override
public void onError(Throwable e) {
Log.i(TAG, "Subscriber onStart");
}
});
Point 核心知识点
- 关键词:Observable、onSubscribe、subscribe()、Subscriber
- Subscriber 可看做等价 Observer。Subscriber 实现 Observer;subscribe() 注册时总会将 Observer 转换成 Subscriber 再使用;有区别是 onStart() 和 unsubscribe();
- 创建 Observable 需传入OnSubscribe对象,该对象即为 Observable 类中的 onSubscribe 属性
- subscribe() 返回 Subscription,其他系列操作符如map()、 subscribeOn()、observeOn()等都返回 Observable
- map()、flatMap()、subscribeOn() 等所有的操作符基于 lift()
// 这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码
public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
return Observable.create(new OnSubscribe<R>() {
@Override
public void call(Subscriber subscriber) {
Subscriber newSubscriber = operator.call(subscriber);
newSubscriber.onStart();
onSubscribe.call(newSubscriber);
}
});
}
// operator.call() 的核心代码
public final class OperatorXXX<T> implements Observable.Operator<T, T> {
@Override
public Subscriber<? super T> call(final Subscriber<? super T> child) {
final Subscriber<T> parent = new Subscriber<T>() {
@Override
public void onNext(T i) {
child.onNext(i);
}
};
return parent;
}
}
- Observable.subscribe(Subscriber)
// subscribe()的核心代码
public Subscription subscribe(Subscriber subscriber) {
subscriber.onStart();
onSubscribe.call(subscriber);
return subscriber;
}
- 通过 lift() 代码可知,下一级 (新) Observable
包含
上一级 (旧) Observable, 最后 subscribe() 时进行向上打call 依次调用 Subscriber - 通过operator.call() 的核心代码,下一级(新) Subscriber
包含
上一级 (旧) Subscriber, 最后得到最终的 Subscriber - 最终 subscribe() 启动全过程,然后向上依次回退
// 整体链式调用理解
Observable
.create( new OnSubscribe() {
void call(Subscriber subscriber) {}
})
.lift()
.lift()
.subscribe(new Subscriber())
- subscribeOn() 指定 subscribe()所发生的线程,即 Observable.OnSubscribe 被激活时所在的线程
- observeOn() 指定 Subscriber 所运行线程,是当前Observable 所对应的 Subscriber,即它的直接下级 Subscriber。
- subscribeOn() 只有第一个有效,subscribeOn() 的线程切换发生在 OnSubscribe 中,即在它通知上一级 OnSubscribe 时,这时事件还没有开始发送,因此 subscribeOn() 的线程控制可以从事件发出的开端就造成影响;而 observeOn() 的线程切换则发生在它内建的 Subscriber 中,即发生在它即将给下一级 Subscriber 发送事件时,因此 observeOn() 控制的是它后面的线程
-
与 Subscriber.onStart() 相对应的,有一个方法 Observable.doOnSubscribe() 。它和 Subscriber.onStart() 同样是在 subscribe() 调用后而且在事件发送前执行,但区别在于它可以指定线程。默认情况下, doOnSubscribe() 执行在 subscribe() 发生的线程;而如果在 doOnSubscribe() 之后有 subscribeOn() 的话,它将执行在离它最近的 subscribeOn() 所指定的线程
两个 lift() 原理
多次 lift() 原理
网友评论