本文探索2个方向
- 发布/订阅机制的实现架构
- 如何实现的一行代码切换线程
此外对于操作符的使用,我建议结合其他库一起学习,比如Retrofit+RxJava使用flatMap解决嵌套请求,比如结合Room、LifeCycle,比如RxBus
发布/订阅
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(Integer value) {}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
当调用了Observable.create后,目前的Observable结构是这样的
image.png可以把它理解为发布者,它接受订阅者的参数
当我们调用了subscribe方法后,他会执行获取数据的逻辑,然后传递数据给订阅者
总结来说就是装饰者模式+观察者模式的灵活使用,每一级都返回Observable,并对之前的callback进行保存或执行
线程切换
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) {
emitter.onNext(1);
emitter.onNext(2);
emitter.onNext(3);
emitter.onComplete();
}
}).observeOn(Schedulers.io())
.subscribeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {}
@Override
public void onNext(Integer value) {}
@Override
public void onError(Throwable e) {}
@Override
public void onComplete() {}
});
observeOn后,现在Observable的封装如下
image.pngsubscribeOn同理,封装后如下
image.pngObservableObserveOn是Observable,他对于subscribe有自己的实现
它的实现就是根据具体传入的Scheduler在对应的线程执行逻辑
可以参考如下代码
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker(); // 创建对应的Worker
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
举个例子,newThread的对应实现
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
可以看到内置了一个线程池,在线程池里执行实际逻辑
总结
RxJava的函数式编程核心就是通过装饰者+观察者实现的,十分巧妙
这一点很像JS的Promise
后记
有什么写得错误、让人费解或遗漏的地方,希望可以不吝赐教,我会马上更改
学习自
https://www.jianshu.com/p/88aacbed8aa5
http://ju.outofmemory.cn/entry/358094
网友评论