背景:
在实际开发中
- .subscribeOn(Schedulers.newThread())
- .subscribeOn(Schedulers.io())
这样的代码随处可见,因为默认不设置就会在主线程执行。
要弄明白两个问题
- 子线程是如何创建的,怎么切换过去的
- .subscribeOn为什么写多次只有第一次起作用
前提:
implementation 'io.reactivex.rxjava2:rxjava:2.2.21'
implementation 'io.reactivex.rxjava2:rxandroid:2.1.1'
源码分析
Observable.create(new ObservableOnSubscribe<Integer>() {
@Override
public void subscribe(@NonNull ObservableEmitter<Integer> emitter) throws Exception {
emitter.onNext(1);
emitter.onNext(2);
emitter.onComplete();
}
})
.subscribeOn(Schedulers.newThread())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(@NonNull Disposable d) {
Log.d(TAG, "开始采用subscribe连接");
}
@Override
public void onNext(@NonNull Integer integer) {
Log.d(TAG, "对Next事件" + integer + "作出响应");
}
@Override
public void onError(@NonNull Throwable e) {
}
@Override
public void onComplete() {
Log.d(TAG, "对Complete事件作出响应");
}
});
我们看下subscribeOn方法:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
关键点1:这里有两个参数一个是this,一个是scheduler。
这个this是什么呢,就是Observable,就是被观察者,对于上面例子来说就是ObservableCreate对象。
scheduler是Schedulers.newThread()
点击ObservableSubscribeOn类看看
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
关键点2:看到没有它实现了subscribeActual方法,可以看下它的父类,它也是Observable的子类。即到时.subscribe订阅后,执行的是这里的方法。不是ObservableCreate里面的subscribeActual方法了。
关键点3:可能你会疑问上面调了ObservableSubscribeOn这个子类的方法,是不是就没执行ObservableCreate的方法了。这个你放心,上面是不是传了source下来嘛?它就是ObservableCreate
分析最关键代码:
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
它就是使得被观察者发送的事件发生在子线程的代码
点击SubscribeTask看下:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
它实现了Runnable接口,在run()方法中调source.subscribe(parent)方法。
关键点4:从这就可以看出只要这个Runnable在某个子线程执行,那么所有事件只能在这个子线程中执行了
再往前看下scheduleDirect方法:
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
请盯着Runnable这个参数,因为它决定着被观察的事件在那发生。可以看出它最终是被Worker类执行。可惜它是个抽象类,到这就没法继续往下走了。
关键点5:被观察者事件在run方法中被Worker类执行,接下来找到Worker具体实现类即可
下面让目光回到最开始的
.subscribeOn(Schedulers.newThread())
看下Schedulers.newThread()是什么东西
@NonNull
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
点击NEW_THREAD,类中搜下它会发现
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
点击NewThreadTask
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
点击NewThreadHolder.DEFAULT
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
再点击NewThreadScheduler,点了这么是不是累了,很快就到真相地方了(一般人还不这样手把手教学的呢)
public final class NewThreadScheduler extends Scheduler {
final ThreadFactory threadFactory;
private static final String THREAD_NAME_PREFIX = "RxNewThreadScheduler";
private static final RxThreadFactory THREAD_FACTORY;
/** The name of the system property for setting the thread priority for this Scheduler. */
private static final String KEY_NEWTHREAD_PRIORITY = "rx2.newthread-priority";
static {
int priority = Math.max(Thread.MIN_PRIORITY, Math.min(Thread.MAX_PRIORITY,
Integer.getInteger(KEY_NEWTHREAD_PRIORITY, Thread.NORM_PRIORITY)));
THREAD_FACTORY = new RxThreadFactory(THREAD_NAME_PREFIX, priority);
}
public NewThreadScheduler() {
this(THREAD_FACTORY);
}
public NewThreadScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
}
@NonNull
@Override
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
}
看到没,它实现了createWorker方法。
触发时机就是之前代码的
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
点击NewThreadWorker类:
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
private final ScheduledExecutorService executor;
volatile boolean disposed;
public NewThreadWorker(ThreadFactory threadFactory) {
executor = SchedulerPoolFactory.create(threadFactory);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable run) {
return schedule(run, 0, null);
}
@NonNull
@Override
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}
public Disposable scheduleDirect(final Runnable run, long delayTime, TimeUnit unit) {
ScheduledDirectTask task = new ScheduledDirectTask(RxJavaPlugins.onSchedule(run));
try {
Future<?> f;
if (delayTime <= 0L) {
f = executor.submit(task);
} else {
f = executor.schedule(task, delayTime, unit);
}
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}
public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
if (period <= 0L) {
InstantPeriodicTask periodicWrapper = new InstantPeriodicTask(decoratedRun, executor);
try {
Future<?> f;
if (initialDelay <= 0L) {
f = executor.submit(periodicWrapper);
} else {
f = executor.schedule(periodicWrapper, initialDelay, unit);
}
periodicWrapper.setFirst(f);
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
return periodicWrapper;
}
ScheduledDirectPeriodicTask task = new ScheduledDirectPeriodicTask(decoratedRun);
try {
Future<?> f = executor.scheduleAtFixedRate(task, initialDelay, period, unit);
task.setFuture(f);
return task;
} catch (RejectedExecutionException ex) {
RxJavaPlugins.onError(ex);
return EmptyDisposable.INSTANCE;
}
}
关键点6:构造方法中通过线程池生成一个新线程(不是主线程都是子线程),回答了问题1前半部分
其中代码:
executor.submit((Callable<Object>)sr);
之前叫你盯着的Runnable是不是在这执行了。回答了问题1的后半部分
到此源码分析完
至于问题2,就不需过多回答了吧。比如我碰到了第一个subscribeOn,我的被观察者事件已经在这线程中执行发送了,后面你再写一个还有什么意思呢。
网友评论