一、subscribeOn(Schedulers.io())原理
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
}
})
.subscribeOn(Schedulers.io())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
源码分析
-
Schedulers.io(),表明上面的代码执行在子线程中
IO = RxJavaPlugins.initIoScheduler(new IOTask());
static final class IOTask implements Callable<Scheduler> { @Override public Scheduler call() throws Exception { return IoHolder.DEFAULT; } }
static final class IoHolder { static final Scheduler DEFAULT = new IoScheduler(); }
public IoScheduler() { this(WORKER_THREAD_FACTORY); }
public IoScheduler(ThreadFactory threadFactory) { this.threadFactory = threadFactory; this.pool = new AtomicReference<CachedWorkerPool>(NONE); start(); }
-
.subscribeOn(Schedulers.io()),ObservableCreate和Schedulers.io()作为参数,new出一个ObservableSubscribeOn对象,返回
public final Observable<T> subscribeOn(Scheduler scheduler) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler)); }
ObservableSubscribeOn中source就是ObservableCreate
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler; public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) { super(source); this.scheduler = scheduler; } }
-
.subscribe(new Observer<String>())
.subscribe(new Observer<Boolean>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: 终点的监听执行 onSubscribe"); } @Override public void onNext(Boolean aBoolean) { Log.d(TAG, "onNext: 终点的监听执行 onNext aBoolean=" + aBoolean); } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.d(TAG, "onComplete: 终点的监听执行 onComplete"); } });
将终点的监听作为参数传入,subscribe()是ObservableSubscribeOn父类Observable的方法。实际调用的是ObservableSubscribeOn.subscribeActual(observer)方法
@Override public void subscribeActual(final Observer<? super T> s) { //将终点observer封装一层SubscribeOnObserver final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s); //先调用终点observer的onSubscribe方法 s.onSubscribe(parent); //下面代码重点是这个scheduler.scheduleDirect(new SubscribeTask(parent)) parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent))); }
-
scheduler.scheduleDirect(new SubscribeTask(parent))
-
首先是创建了一个SubscribeTask对象,他是实现了runnable接口的,看下它的run方法,调用了source.subscribe(parent),source就是obserableCreate,所以它的run方法是调用了obserableCreate.subscribe(parent)。
final class SubscribeTask implements Runnable { private final SubscribeOnObserver<T> parent; SubscribeTask(SubscribeOnObserver<T> parent) { this.parent = parent; } @Override public void run() { source.subscribe(parent); } }
-
scheduler就是IoScheduler。scheduler.scheduleDirect(),scheduleDirect是IoScheduler父类Scheduler的方法,方法中通过createWorker()创建了一个Worker对象,createWorker()具体实现是在IoScheduler中。IoScheduler的createWorker()创建并返回了一个EventLoopWorker对象。
public Disposable scheduleDirect(@NonNull Runnable run) { return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS); } public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) { //createWorker()具体实现是在IoScheduler中。IoScheduler的createWorker()创建并返回了一个EventLoopWorker对象。 final Worker w = createWorker(); //继续封装 final Runnable decoratedRun = RxJavaPlugins.onSchedule(run); DisposeTask task = new DisposeTask(decoratedRun, w); //正在开始执行了,EventLoopWorker.schedule(task, delay, unit); w.schedule(task, delay, unit); return task; }
@Override public Disposable schedule(@NonNull Runnable action) { if (disposed) { return EmptyDisposable.INSTANCE; } return poolWorker.scheduleActual(action, 0, TimeUnit.MILLISECONDS, serial); }
@NonNull public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) { Runnable decoratedRun = RxJavaPlugins.onSchedule(run); //再封装 ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent); if (parent != null) { if (!parent.add(sr)) { return sr; } } Future<?> f; //这里交由线程池处理,obserableCreate.subscribe(parent)就运行在了子线程中。 try { if (delayTime <= 0) { f = executor.submit((Callable<Object>)sr); } else { f = executor.schedule((Callable<Object>)sr, delayTime, unit); } sr.setFuture(f); } catch (RejectedExecutionException ex) { if (parent != null) { parent.remove(sr); } RxJavaPlugins.onError(ex); } return sr; }
-
代码执行流程:
- observable.create传入了自定义source,返回一个obserableCreate对象;
- obserableCreate.subscribeOn(Schedulers.io())。Schedulers.io()返回的是一个IoScheduler对象,将其作为参数。在subscribeOn方法中,返回了一个ObservableSubscribeOn对象,将obserableCreate(即source)和IoScheduler(即scheduler)传入;
- ObservableSubscribeOn.subscribe(observer),传入一个自定义的终点监听observer。
- 在ObservableSubscribeOn.subscribeActual(observer)方法中,将传入的终点监听封装了一层,将终点observer对象作为参数,定义了一个SubscribeOnObserver对象(下面的parent)。然后先调用了终点observer的onSubscribe(parent),参数是刚定义的SubscribeOnObserver对象。
- 接着又调用了IoScheduler.scheduleDirect(new SubscribeTask(parent))。这个方法很重要。首先是创建了一个SubscribeTask对象,他是实现了runnable接口的,看下它的run方法,调用了source.subscribe(parent),source就是obserableCreate,所以它的run方法是调用了obserableCreate.subscribe(parent)。
- scheduleDirect是IoScheduler父类Scheduler的方法,方法中通过createWorker()创建了一个Worker对象,createWorker()具体实现是在IoScheduler中。IoScheduler的createWorker()创建并返回了一个EventLoopWorker对象。
- EventLoopWorker.schedule()方法中threadWorker.scheduleActual(),scheduleActual方法中继续将SubscribeTask进行封装一层,然后交由线程池去处理。这样就obserableCreate.subscribe(parent)就运行在了子线程中。
二、observeOn(AndroidSchedulers.mainThread())原理
Observable
.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> e) throws Exception {
}
})
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
源码分析
observeOn(AndroidSchedulers.mainThread()),表明下面的代码执行在主线程中
订阅过程
-
AndroidSchedulers.mainThread(),返回的是HandlerScheduler对象,内部维护了一个主线程的Handler
public final class AndroidSchedulers { private static final class MainHolder { //返回的是HandlerScheduler对象,内部维护了一个主线程的Handler static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper())); } private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler( new Callable<Scheduler>() { @Override public Scheduler call() throws Exception { return MainHolder.DEFAULT; } }); public static Scheduler mainThread() { return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD); } }
final class HandlerScheduler extends Scheduler { private final Handler handler; HandlerScheduler(Handler handler) { this.handler = handler; } }
-
.observeOn(AndroidSchedulers.mainThread()),HandlerScheduler作为参数,返回ObservableObserveOn对象
public abstract class Observable<T> implements ObservableSource<T> { public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) { ObjectHelper.requireNonNull(scheduler, "scheduler is null"); ObjectHelper.verifyPositive(bufferSize, "bufferSize"); return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize)); } }
ObservableObserveOn中scheduler就是HandlerScheduler
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { final Scheduler scheduler;//HandlerScheduler,内部维护了一个主线程的Handler final boolean delayError; final int bufferSize; public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) { super(source); this.scheduler = scheduler; this.delayError = delayError; this.bufferSize = bufferSize; } }
-
.subscribe(new Observer<String>())
.subscribe(new Observer<Boolean>() { @Override public void onSubscribe(Disposable d) { Log.d(TAG, "onSubscribe: 终点的监听执行 onSubscribe"); } @Override public void onNext(Boolean aBoolean) { Log.d(TAG, "onNext: 终点的监听执行 onNext aBoolean=" + aBoolean); } @Override public void onError(Throwable e) { } @Override public void onComplete() { Log.d(TAG, "onComplete: 终点的监听执行 onComplete"); } });
将终点的监听作为参数传入,subscribe()是ObservableObserveOn父类Observable的方法,ObservableObserveOn没有重写subscribe()。而实际调用的是subscribeActual(),ObservableObserveOn重写了,所以走的是ObservableObserveOn.subscribeActual(observer)方法;
public final void subscribe(Observer<? super T> observer) { ObjectHelper.requireNonNull(observer, "observer is null"); try { observer = RxJavaPlugins.onSubscribe(this, observer); ObjectHelper.requireNonNull(observer, "Plugin returned null Observer"); subscribeActual(observer);//这句是重点 } catch (NullPointerException e) { // NOPMD throw e; } catch (Throwable e) { Exceptions.throwIfFatal(e); RxJavaPlugins.onError(e); NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS"); npe.initCause(e); throw npe; } }
再来看下ObservableObserveOn中subscribeActual的具体实现,实际调用的又是source.subscribe(),这个source就是前面保存的ObservableCreate对象。所以实际调用的是ObservableCreate.subscribe()。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> { @Override protected void subscribeActual(Observer<? super T> observer) { if (scheduler instanceof TrampolineScheduler) { source.subscribe(observer); } else { //走的是这个。HandlerScheduler.createWorker(),返回HandlerWorker对象 Scheduler.Worker w = scheduler.createWorker(); //source就是自定义source,又包了一层,将终点observer和HandlerWorker作为参数 source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize)); } } }
HandlerScheduler.createWorker(),返回HandlerWorker对象。HandlerScheduler中的handler前面已经定义了,是一个主线程的Handler
final class HandlerScheduler extends Scheduler { @Override public Worker createWorker() { return new HandlerWorker(handler); } }
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))。source就是自定义source,又包了一层,将终点observer和HandlerWorker作为参数
public final class ObservableCreate<T> extends Observable<T> { @Override protected void subscribeActual(Observer<? super T> observer) { CreateEmitter<T> parent = new CreateEmitter<T>(observer);//ObserveOnObserver,这边封了一层箱 observer.onSubscribe(parent);//调用了终点的监听的onSubscribe方法 try { source.subscribe(parent);//这边调用的是我们自定义source的subscribe方法 } catch (Throwable ex) { Exceptions.throwIfFatal(ex); parent.onError(ex); } } }
响应事件过程
-
在自定义source中模拟调用onNext方法
new ObservableOnSubscribe<String>() { @Override public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception { //observableEmitter就是CreateEmitter observableEmitter.onNext("测试onNext()"); } }
-
CreateEmitter.onNext(T t)内部又继续调用ObserveOnObserver.onNext(t);
static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable { final Observer<? super T> observer; // ObserveOnObserver CreateEmitter(Observer<? super T> observer) { this.observer = observer; } @Override public void onNext(T t) { if (t == null) { onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources.")); return; } if (!isDisposed()) { observer.onNext(t);//ObserveOnObserver.onNext(t); } }
-
ObserveOnObserver.onNext(T t)
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { final Observer<? super T> actual;//终点observer final Scheduler.Worker worker;//HandlerWorker ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) { this.actual = actual; this.worker = worker; this.delayError = delayError; this.bufferSize = bufferSize; } @Override public void onNext(T t) { if (done) { return; } if (sourceMode != QueueDisposable.ASYNC) { queue.offer(t); } schedule(); } void schedule() { if (getAndIncrement() == 0) { worker.schedule(this);//HandlerWorker.schedule(this) } } }
-
HandlerWorker.schedule(this),向主线程中发送消息,执行ScheduledRunnable的run方法
private static final class HandlerWorker extends Worker { private final Handler handler;//是一个主线程的Handler HandlerWorker(Handler handler) { this.handler = handler; } @Override public Disposable schedule(Runnable run, long delay, TimeUnit unit) { if (run == null) throw new NullPointerException("run == null"); if (unit == null) throw new NullPointerException("unit == null"); if (disposed) { return Disposables.disposed(); } run = RxJavaPlugins.onSchedule(run); //handler:主线程的Handler。run:ObserveOnObserver ScheduledRunnable scheduled = new ScheduledRunnable(handler, run); //创建 Message message = Message.obtain(handler, scheduled); message.obj = this; // Used as token for batch disposal of this worker's runnables. handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay))); // Re-check disposed state for removing in case we were racing a call to dispose(). if (disposed) { handler.removeCallbacks(scheduled); return Disposables.disposed(); } return scheduled; } }
-
ScheduledRunnable.run(),run()又调用了ObserveOnObserver.run()
private static final class ScheduledRunnable implements Runnable, Disposable { private final Handler handler; private final Runnable delegate; ScheduledRunnable(Handler handler, Runnable delegate) { this.handler = handler; this.delegate = delegate; } @Override public void run() { try { delegate.run();//ObserveOnObserver.run() } catch (Throwable t) { IllegalStateException ie = new IllegalStateException("Fatal Exception thrown on Scheduler.", t); RxJavaPlugins.onError(ie); Thread thread = Thread.currentThread(); thread.getUncaughtExceptionHandler().uncaughtException(thread, ie); } } }
-
ObserveOnObserver.run();
static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable { @Override public void run() { if (outputFused) { drainFused(); } else { drainNormal(); } } } void drainFused() { ... actual.onNext(null);//执行终点observer的next方法,在主线程中执行 ... }
网友评论