代码调用
Observable.just(1)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
直接进入主题,先看subscribe中调用了哪些方法
//Observable.java
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
//最终调用了Observable的subscribeActual方法
protected abstract void subscribeActual(Observer<? super T> observer);
接下来我们看下subscribeOn方法中进行了什么操作
//Observable.java
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
//这里返回了一个ObservableSubscribeOn对象,参考RxJavaPlugins.onAssembly方法,
//返回的值就是传入的值,再根据流式调用,
//即上面分析调用的subscribeActual方法,即是ObservableSubscribeOn的subscribeActual方法
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
接下来我们看ObservableSubscribeOn的subscribeActual方法
//ObservableSubscribeOn.java
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
//这里生成了一个SubscribeTask,查看源码可知实现了Runnable接口
//这里调用了scheduler.scheduleDirect
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
看下scheduler.scheduleDirect,再次之前,我们先看下传入的Scheduler.io
查看传入的Schedule
public static Scheduler io() {
// 这里查看下IO
return RxJavaPlugins.onIoScheduler(IO);
}
//new IOTask
IO = RxJavaPlugins.initIoScheduler(new IOTask());
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
//由此可见,最后Schedulers.io就是IoScheduler
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
//scheduler
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
//这里生成一个Worker,但是createWorker是一个虚方法,有上可知
//这里调用了IoScheduler.createWorker,生成EventLoopWorker对象
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
//调用了EventLoopWorker.schedule
w.schedule(task, delay, unit);
return task;
}
接下来看EventLoopWorker
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
//取消注册
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
//NewThreadWorker.scheduleActual
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
真正进入线程调度的代码,在NewThreadWorker中
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
//executor是一个线程池
f = executor.submit((Callable<Object>)sr);
} else {
//存在延迟的
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
所以到最后,真正进行线程调度的,其实是一个线程池,看完了subscribeOn,我们再来看看observeOn,首先看下AndroidSchedulers.mainThread()到底是哪个线程
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
private static final class MainHolder {
static final Scheduler DEFAULT
//从Looper.getMainLooper()可以看出,这里是获取了主线程的Looper
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
好确定了这个问题,我们再继续往下看
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
//生成一个新的ObservableObserverOn对象
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
接下去看ObservableObserveOn对象
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//跟之前一样还是调用createWorker,从上面代码可知调用了HandlerScheduler.createWorker返回HandlerWorker
Scheduler.Worker w = scheduler.createWorker();
//这里有一个内部类对象ObserveOnObserver
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
//内部类ObserveOnObserver,以下是回调方法
public void onSubscribe(Disposable d) {
if (DisposableHelper.validate(this.upstream, d)) {
this.upstream = d;
if (d instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) d;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
downstream.onSubscribe(this);
//调用schedule
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
downstream.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
downstream.onSubscribe(this);
}
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
//调用schedule
schedule();
}
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
//调用schedule
schedule();
}
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
//调用schedule
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
//所以当回调的时候,最终是调用了worker.schedule
worker.schedule(this);
}
}
//最终看一下HandlerWorker的schedule方法,一看源码,老朋友了,Handler就不解释了
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
网友评论