- 使用
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(@NonNull ObservableEmitter<String> emitter) throws Throwable {
emitter.onNext("cc");
}
})
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Throwable {
return Integer.valueOf(s);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
}
@Override
public void onNext(@NonNull Integer integer) {
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
}
});
从create开始
public static <T> Observable<T> create(@NonNull ObservableOnSubscribe<T> source) {
Objects.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<>(source));
}
将创建的ObservableOnSubscribe包装成ObservableCreate对象
将上级的ObservableCreate包装成ObservableMap
走到Observable 的subscrible()中
public final void subscribe(@NonNull Observer<? super T> observer) {
Objects.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
Objects.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
}
调用到ObservableMap的subscribeActual方法中
@Override
public void subscribeActual(Observer<? super U> t) {
source.subscribe(new MapObserver<T, U>(t, function));
}
将observer放入到MapObserver中。再走到ObservableCreate的subscribeActual方法中
@Override
protected void subscribeActual(Observer<? super T> observer) {
CreateEmitter<T> parent = new CreateEmitter<>(observer);
observer.onSubscribe(parent);
source.subscribe(parent);
}
在这里observer成功订阅observable,也代表数据的传递在onsubscrible方法之后
然后就走到subscribe方法中
再来看onNext方法,其具体实现为ObservableEmitter中的onNext方法
@Override
public void onNext(T t) {
if (t == null) {
onError(ExceptionHelper.createNullPointerException("onNext called with a null value."));
return;
}
if (!isDisposed()) {
observer.onNext(t);
}
}
再走到MapObserver的onNext方法中
public void onNext(T t) {
downstream.onNext(v);
}
downstream也就是最下游的observer,也就走到最外层的observer的onNext方法中
这是rxjava的一个完整订阅接收过程。
- 接下来我们来看下线程切换的过程
首先是subscribeOn方法。和之前一样首先是将MapObserver放入到ObservableSubscribeOn方法中,存储了mapObserver和scheduler,本例中使用的是IoScheduler
于是在中间订阅时就会进入到ObservableSubscribeOn的subscribeActual方法中
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
将SubscribeOnObserver之后的subscribe放入到runnable中
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker(); //获取缓存线程池并放入work中
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
在获取的线程池中进行subcribe操作,就实现了线程切换的操作,且由于会继续往上subscribe,从下往上订阅,所以最后一次subscribeOn决定了被观察者发送事件的线程
再看observableOn方法
同理订阅时将observable包装成ObservableObserveOn
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<>(observer, w, delayError, bufferSize));
}
}
并将observer包装成ObserveOnObserver,由于我们知道最后会执行onNext(),我们直接看下onNext方法
@Override
public void onNext(T t) {
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
进入到IoSchedule中的schedule方法中
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
和subscribeOn一样走到这个方法中,在该线程中执行run(),完成线程的切换。并且我们进入map的onNext方法中
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
downstream.onNext(null);
return;
}
U v;
try {
v = Objects.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
downstream.onNext(v);
}
能发现observeOn多次切换线程会改变上游事件的所在线程,不会和subscribeOn一样只有第一次的有效
使用observeOn(AndroidSchedulers.mainThread())
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), true);
}
private static final Scheduler MAIN_THREAD =
RxAndroidPlugins.initMainThreadScheduler(() -> MainHolder.DEFAULT);
进入到scedule中
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposable.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposable.disposed();
}
return scheduled;
}
从上面步骤我们能看到AndroidSchedulers.mainThread()是创建了一个主线程的Handler,将后续操作通过handler发送到主线程中进行处理
网友评论