RxJava 是一种响应式编程,来创建基于事件的异步操作库。基于事件流的链式调用、逻辑清晰简洁。
平时用的多但是没认真分析源码总感觉虚的很废话不多说直接上代码
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("567");
emitter.onNext("789");
emitter.onNext("112");
emitter.onNext("666");
emitter.onNext("666");
emitter.onNext("666");
emitter.onNext("666");
}
}).map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
}).subscribeOn(Schedulers.io()).observeOn(AndroidSchedulers.mainThread()).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer integer) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
看看 Observable.create做了什么就是创建了ObservableCreate对象其实也是个Observable接着链式调用map,RxJavaPlugins.onAssembly:默认情况下是返回source,如果需要监听Rx的操作符,可以调用setOnObservableAssembly方法,
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
map方法里面也是创建了ObservableMap对象也是Observable接着链式调用subscribeOn方法,这个map方法就是做数据类型转换的就不展开
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
subscribeOn方法同样也是创建一个ObservableSubscribeOn是个Observablej接着调用observeOn,这个方法不用多说就是线程切换的,一般用于把被观察者放到子线程里去执行一些耗时或者io操作
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
observeOn方法最终创建了一个ObservableObserveOn对象当然也是个Observable,一串链式调用下来创建了一堆Observable对象不过别急重点来了,接着调用了subscribe方法
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
subcribe方法最关键的方法是subscribeActual,这个方法是ObservableObserveOn里面方法创建了一个Worker ,这个是用来处理从子线程切换到主线程的对象,线程切换这里不展开了,接着调用了source.subscribe(observer)方法,source是上一个observable对象也就是ObservableSubscribeOn,也就是调用了上一个obserable的subcribe方法,并且把自定义的observer也传过去,这是要干啥我们接着进去看看, observer.onSubscribe(),observer不就是下一个observable传进来的吗就是ObserveOnObserver对象,我们进去看看里面的onSubscribe方法,这个方法关键代码是downstream.onSubscribe(this);downstream称为下游就是最终调用到我们自定义的observer里面了,回调到onSubscribe里了给我们自行处理;回到ObservableSubscribeOn的subscribeActual方法中接着调用了parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));这个方法就是让线程切换到io线程中并且执行一个Runnable任务,这个任务就是SubscribeTask里面具体做的事是source.subscribe(parent);又是一样的味道不就是调用上一个obervable的subscribe方法然后执行对应observable的subscribeActual方法,接着是不是一直调用上一个observable的直到第一个observable的subscribeActual这里,对的就是这样的,一直到ObservableCreate的subscribeActual方法,这个方法调用了 首先调用observer.onSubscribe(parent);又是一样的套路就是依次调用下一个observable的里面内部类observer的onSubscribe一直到ObservableSubscribeOn这里的onsubcribe,由于ObservableSubscribeOn在这里做了拦截往下一个abservable里面的observer调用onSubscribe已经调用过了,好的我们继续往下看调用了 source.subscribe(parent);source就是我们自定义的ObservableOnSubscribe,调用这个方法就是发射数据了接着进到CreateEmitter的onNext方法,里面关键方法是 observer.onNext(t);observer就是下一个observable里面的observer,也就是把数据传到下一个observable里面进行处理,那是不是依次调用把数据传到下个abservable去处理一直传到我们自定义的Observer的onnext里面,没错就是这样的,这样就完成了rxjava的整个数据处理流程了。
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
//位于ObservableObserveOn中
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
//位于ObservableSubscribeOn中
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
//位于ObservableObserveOn中
@Override
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) d;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
downstream.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
downstream.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
downstream.onSubscribe(this);
}
}
//位于ObservableSubscribeOn中
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
}
//位于ObservableMap中
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
// ObservableCreate中
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
observer.onSubscribe(parent);
try {
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
//CreateEmitter中
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
为了方便理解画了一份思维导图
网友评论