Rxjava优点
个人认为优点有下面几个:
1.使用结构、逻辑清晰;
2.线程切换非常方便;
3.只要流程发生错误,一定会执行onError(),不用到处处理;
4.操作符强大;
基础用法推荐
https://www.jianshu.com/p/cd3557b1a474
https://www.cnblogs.com/liushilin/p/7058302.html
以上两篇写的很全了,下面是 版本2和版本3的区别,有兴趣的去看下
https://blog.csdn.net/weixin_45258969/article/details/95386872?utm_medium=distribute.down_relevant_right.none-task-blog-BlogCommendFromBaidu-1.nonecase&depth_1-utm_source=distribute.down_relevant_right.none-task-blog-BlogCommendFromBaidu-1.nonecase
本文主要分析 subcribe、 线程切换、操作符map,相信看完你就明白Rxjava的玩法,其他的操作符也是基于这个思路。
源码分析
1、前提:看源码还是主要先看主线,不要高的无法自拔,尤其是Rxjava这种比较绕的,很多人对他源码不熟就是因为点进去一看,不知道在干什么,怎么实现的直接放弃。
2、Rxjava的观察者模型是基于Java的观察者模型的一个变种,最大区别就是标准观察者是一个被观察,多个观察者,比如微信公众号推文的模型。Rxjava则是只有一个观察者来消费,看源码重点关注,被观察调用 subscribe订阅方法,把二者绑在一起的。
subscribe 订阅源码分析
时序图如下:
image.png
我们看下调用方式:
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
//....
}
})
.subscribe(new Consumer<Integer>() {
//...
});
省略部分代码,核心就是 Observable.create(自定义发送事件)调用订阅(目标对象)。
我们首先理清Observable.create是什么,点进去看下源码,用到的是 RxJavaPlugins.onAssembly方法,里面其实啥都没干,直接把传入的对象返回了,因为判断变量是空的,如果我们想监控整个Rxjava执行了哪些,我们可以自定义一个,就会先走自定义的逻辑。继续看,下面代码的注释1处,是把传入的事件自定义发生源头直接赋值给了 ObservableCreate的成员变量source。
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
我们在扫一眼, subscribeActual方法,是RxJava中比较重要的一个方法,定义的一个抽象方法,整个Rxjava做事就是靠这个方法实现的,
public final class ObservableCreate<T> extends Observable<T> {
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source; // 1.这个是把目标 赋值给了对象的成员变量
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
// 对传入的观察者进行包装
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
// 调用观察者的订阅回调方法
observer.onSubscribe(parent);
try {
// 真正执行订阅的地方
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
static final class CreateEmitter<T> extends AtomicReference<Disposable>
implements ObservableEmitter<T>, Disposable {
final Observer<? super T> observer;
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;
}
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext ..."));
return;
}
if (!isDisposed()) {
// 调用传入的观察者的 onNext() 方法
observer.onNext(t);
}
}
@Override
public void dispose() {
// 取消订阅
DisposableHelper.dispose(this);
}
// ...
}
// ...
}
再看下 subscribe(目标)方法 ,调用这个方法的是 ObservebleCreate对象。目标就是观察找对象。
public final void subscribe(Observer<? super T> observer) {
...
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer); //调用这个核心方法
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
// ...
}
}
主要调用了抽象方法:
protected abstract void subscribeActual(Observer<? super T> observer);
看到实现类很多,各种操作符的都有,我们直接看 ObservebleCreate这个类中,上面代码
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer); //1.对目标包装成发射器
observer.onSubscribe(parent); //2,调用方法
try {
source.subscribe(parent); //3, 自定义被观察着调用 被包装的目标观察者
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
如上面的注释1:对目标包装成发射器,Rxjava的链式调用 就是各种包装再拆包装,
注释2处,主要是调用目标观察者的方法,所以每次这个方法调用;
注释3处,ObservebleCreate调用 订阅方法,同样,这个调用也是一个抽象方法
void subscribe(ObservableEmitter<T> e) throws Exception;
实现的就是我们自定义的 方法如下:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
e.onNext("XXX");
}
})
这下就比较清晰了,当执行订阅方法后,回调我们自定义的订阅方法,
我们在看线程切换实现源码
同样的还是先看下我们调用
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
//....
}
})
.subscribeOn(Schedulers.io()) // 上面 异步
.observeOn(AndroidSchedulers.mainThread()) // 下面 主线程
.subscribe(
我们重点看下面两行代码:
被观察者(ObservableCreate)
.observeOn(Schedulers.io())
.subscribeOn()
.subscribe(观察者
明确两个东西,谁来调用的,是ObservableCreate,传入的是什么参数,观察者
1.我们先看Schedulers.io()是什么:
还是一样的套路,schedulers中定义变量
public final class Schedulers {
@NonNull
static final Scheduler SINGLE;
@NonNull
static final Scheduler COMPUTATION;
@NonNull
static final Scheduler IO;
@NonNull
static final Scheduler TRAMPOLINE;
@NonNull
static final Scheduler NEW_THREAD;
IO 在静态代码块中初始化了:
IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
});
在看下IoHolder.DEFAULT :是直接new IoScheduler() 里面是直接创建 线程工厂new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority); 着就到了他的 线程池
总结起来就是Scheduler
2.subscribeOn方法:
先看ObservableSubscribeOn类,当发生订阅会执行到方法
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); //1
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() { //3
@Override
public void run() {
source.subscribe(parent);// 2
}
}));
}
我们可以看到上面的1处先把观察者包装为parent ,然后调用2处的方法传入包装后的观察者,很明显就是放到 子线程
3.再看observeOn()方法:
同样,传入的其实就是一个主线程 的handler ,
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
看到这个方法中还是同样的套路,直接把传入的 Scheduler包装成一个 ObservableObserveOn
忽略下 Rxjava 中的命名.之前的 ObservableCreate 和 ObservableObserveOn .都是Observable + 具体的方法
我们看下ObservableObserveOn 的方法:
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker(); //1.这里
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
...
省略了部分代码,首先是把刚才传入的schedule 线程池相关的赋值给自己成员变量.同样的,当调用订阅方法的时候,还是会调用到抽象方法 subscribeActual 就这样实现切换到主线程
操作符 map
.map(new Function<String, Bitmap>() {
@Override
public Bitmap apply(String s) throws Exception {
URL url = new URL(PATH);
HttpURLConnection httpURLConnection = (HttpURLConnection) url.openConnection();
httpURLConnection.setConnectTimeout(5000);
int responseCode = httpURLConnection.getResponseCode(); // 才开始 request
if (responseCode == HttpURLConnection.HTTP_OK) {
InputStream inputStream = httpURLConnection.getInputStream();
Bitmap bitmap = BitmapFactory.decodeStream(inputStream);
return bitmap;
}
return null;
}
})
操作符其实实现是一样的。
image.png
网友评论