rxjava代码
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
Log.e("qwer", Thread.currentThread().getName());
emitter.onNext("有情况");
}
}).subscribeOn(Schedulers.newThread()).
subscribe(new Observer<String>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(String s) {
Log.e("qwer", s);
Log.e("qwer", Thread.currentThread().getName());
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
log--
E/qwer: main
E/qwer: 有情况
E/qwer: main
由前面两篇文章我们得知,如果不指定线程切换,那么我们在哪个线程操作,事件的发送和接收就发生在哪个线程。然后create和subscribe也不讲了(可以看前两篇文章)
1、直接看subscribeOn
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
2、进入ObservableSubscribeOn
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
看过前两篇文章的都知道,自此,赋值准备工作结束,然后被观察者开始发送事件的时候,会调用ObservableSubscribeOn的subscribeActual方法
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
这个方法的参数observer就是我们自己new的观察者,SubscribeOnObserver持有observer的引用,这样SubscribeOnObserver在调用onNext的时候,就会再调用observer的onNext方法。这些看过前两篇的都会知道
直接第三行代码
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
3、我们首先看最里面的参数new SubscribeTask(parent),进入SubscribeTask
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
其实就是一个线程,然后在run()方法里调用了
source.subscribe(parent);
这个source就是我们自己new的观察者(其实这个rxjava是链式调用,如果有好几层的话也可以理解为source就是上一层的Observable,比如说上面还有一个map转换,那么这个地方的source就是ObservableMap了)
好了,那么接下来就开始看这个线程在哪里启动的了
4、我们继续看这一行代码
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
进入scheduleDirect
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
看注释得知,这里其实就是该线程会被无延迟执行,再次进入该重载的方法
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
在这里我们发现RxJavaPlugins.onSchedule(run)返回的还是run本身
然后decoratedRun 又传给了DisposeTask
DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
this.decoratedRun = decoratedRun;
this.w = w;
}
我们发现执行线程赋值给了decoratedRun ,而且DisposeTask实现了Runnable接口,然后在它的run方法里我们看到decoratedRun.run();
再回到前门的代码
w.schedule(task, delay, unit);
w是createWorker()获取的,而createWorker();是个抽象方法,那么只能找Schedule的实现类了
5、那么我们当前执行的这个scheduler又是谁呢?它其实就是我们.subscribeOn(Schedulers.newThread())时传进来的Schedulers.newThread(),好,继续看看Schedulers.newThread()是何许人也,进入newThread()方法
public static Scheduler newThread() {
return RxJavaPlugins.onNewThreadScheduler(NEW_THREAD);
}
再进入onNewThreadScheduler方法
public static Scheduler onNewThreadScheduler(@NonNull Scheduler defaultScheduler) {
Function<? super Scheduler, ? extends Scheduler> f = onNewThreadHandler;
if (f == null) {
return defaultScheduler;
}
return apply(f, defaultScheduler);
}
这里其实就是返回本身
6、那就再看看NEW_THREAD的本身
static final Scheduler NEW_THREAD;
我们一看是final,就接着找到了
static {
......
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
其实initNewThreadScheduler方法还是返回本身,再进入NewThreadTask
static final class NewThreadTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return NewThreadHolder.DEFAULT;
}
}
再继续看DEFAULT
static final class NewThreadHolder {
static final Scheduler DEFAULT = new NewThreadScheduler();
}
好,到此为止,我们发现了步骤4最后要找的Schedule的实现类了
我们进入NewThreadScheduler,找到createWorker方法
public Worker createWorker() {
return new NewThreadWorker(threadFactory);
}
还记得之前步骤4中的 w.schedule(task, delay, unit)么?不错,这里的w就是这里返回的对象,我们直接进入NewThreadWorker的schedule方法
public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (disposed) {
return EmptyDisposable.INSTANCE;
}
return scheduleActual(action, delayTime, unit, null);
}
再进入scheduleActual方法
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
啊!!到这里可算结束啦
在代码中,delayTime == 0
然后执行了f = executor.submit((Callable<Object>)sr);
其实也就是立即执行了我们的操作线程,是不是已经忘记了哪个操作线程了,就是步骤3里的那个SubscribeTask ,而SubscribeTask 的run()方法里的source.subscribe(parent),就这一句代码,就串起了整个观察者模式(其实就是连接了观察者和被观察者而已),如果不能够理解这里,可以回头看看前两篇文章
网友评论