简单示例
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
Log.d("XXW", "subscribe");
emitter.onNext(1);
emitter.onComplete();
}
}).subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer s) {
LogUtil.d("onNext " + s);
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
打印结果
D/XXW: onSubscribe Thread Name : main
D/XXW: subscribe
D/XXW: subscribe Thread Name : RxCachedThreadScheduler-1
D/XXW: onNext 1
D/XXW: onNext 2
D/XXW: onNext 3
D/XXW: onNext 4
切换线程
subscribeOn
上一篇文章我们分析了RxJava的订阅流程,接下来我们来看RxJava如何通过一句话来切换线程的实现. 首先我们先Schedulers.io()方法, 这个只会对被观察者的线程进行影响. 直接看源码
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
上篇文章分析过, onAssmbly会直接返回一个Observable类, 所以我们直接看ObservableSubscribeOn这个Observable的子类.我看的时候就猜会不会和ObservableCreate一样,走Observable的抽象方法 subscribeActual方法
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
果然这里和之前分析的一样, 还是会调用subscribeActual这个方法, 和之前一样 我们还是分析他做了几件事情
- 包装observer
- 调用下游Observer的onSubscribe方法, 所以这里是没有切换线程的 还是当前线程(UI线程) ,从打印结果也可以验证.
- 调用scheduler的schedulerDirrect方法
- 创立SubscribeTask类
我们从下往上分析,先看SubscribeTask类
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
这里我们会发现, 我们这个只是一个实现了Runnable的类, 我们之后再看这里,先看scheduleDirect方法
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
- 通过createWorker创建一个Worker类,
- 创建一个DisposeTask的线程
- 调用schedule方法
第一步,会发现这个createWorker是一个抽象方法, 所以我们又要去找他的子类, 因为我们SubscribeOn 是传入的一个Schedulers.io()方法.,所以我们先看看这个实现.
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
IO = RxJavaPlugins.initIoScheduler(new IOTask());
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
通过源码 知道这个子类其实就是IoScheduler类, 所以我们IoScheduler的createWorker方法.
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
发现创建了一个EventLoopWorker的类, 先不管 继续看schedule方法,
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
//初始化
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
//创建ThreadWorker
this.threadWorker = pool.get();
}
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
pool.release(threadWorker);
}
}
@Override
public boolean isDisposed() {
return once.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
调用ThreadWorker的scheduleActural方法.
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
static final class ThreadWorker extends NewThreadWorker {
private long expirationTime;
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
public long getExpirationTime() {
return expirationTime;
}
public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}
从上面代码注释可以发现,. 我们在之前createWorker的时候 已经创建好了EventLoopWorker对象, 而且初始化的时候也创建一个叫ThreadWorker的对象, 我们的schedule方法其实最后就是调用的它的schedualActual方法. 从上面代码ThreadWoeker是没有schedualActual方法的. 所以我们猜测只有NewThreadWorker有
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
当我们看到这个方法, 我就立马看到executor, 这不就是线程池吗, 所以我立马看了一下成员变量发现如下代码
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
我们接着看scheduleActual方法, 我们之前创建的线程SubscribeTask 这个时候被当作一个run传进来, 然后封装两层后被添加到一个队列中,然后通过强转后就调用了 我们的run方法, 去执行上一个Observable的subscribe方法, 即subscribeActual方法,
observerOn
observerOn 我们一般都是传入AndroidSchedulers.mainThread() ,我们先从这个Scheduler这个子类看
private static final class MainHolder {
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
/** A {@link Scheduler} which executes actions on the Android main thread. */
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
显而易见,我们创建了一个叫HandlerScheduler的子类, 比较有趣的是 这里创建了一个Handler,而且还是主线程的(Looper.getMainLooper). 我们在看observerOn方法.
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
我们会发现 这个和之前的subscribeOn方法很类似, 所以我们猜测到最后还是到MainHolder 里面进行切换. 这里就不进行深入研究了, 有兴趣的同学可以自己研究研究.
网友评论