RxJava 是一个在 Java VM 上使用可观测的序列来组成异步的、基于事件的程序的库,而且是链式调用、逻辑简洁 。
这里是基于RxJava2.0源码,以这个调用顺序来进行讲解,其他的操作符也大体基本差不多。根据源码可以多看看。
Observable.just(1)
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Observer<Integer>() {
@Override
public void onSubscribe(Disposable d) {
}
@Override
public void onNext(Integer value) {
}
@Override
public void onError(Throwable e) {
}
@Override
public void onComplete() {
}
});
- Observable是被观察者,调用just()方法,会返回一个经过钩子函数(Hook)调用的被观察者ObservableJust,
public static <T> Observable<T> just(T item) {
ObjectHelper.requireNonNull(item, "The item is null");
// 钩子函数调用
return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
}
- ObservableJust 会调用父类的subscribeOn(Schedulers.io())方法指定被观察者线程,同样经过钩子函数返回一个被观测者ObservableSubscribeOn,
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
// 这个this 就是ObservableJust,把这个被观察者赋值给父类,并传入线程类型
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
static final class IoHolder {
// 这个就是Schedulers类中的线程类型Schedulers.io()
static final Scheduler DEFAULT = new IoScheduler();
}
- ObservableSubscribeOn调用observeOn(AndroidSchedulers.mainThread())指定观察者线程类型。返回经过钩子函数调用的ObservableObserveOn,传入的参数是,ObservableSubscribeOn这个被观察者,观察者的线程类型AndroidSchedulers.mainThread(),是否延时错误delayError,缓冲区大小bufferSize
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
/**
* this 是ObservableSubscribeOn
* scheduler是AndroidSchedulers.mainThread()
* delayError是延时错误(onError延时调用)
* bufferSize是缓冲区大小
*/
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
- AndroidSchedulers.mainThread()其实就是一个handler
private static final class MainHolder {
//这个就是AndroidSchedulers类中的AndroidSchedulers.mainThread()
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
- ObservableObserveOn调用subscribe(Observer)方法,传入ObservableObserveOn被观察者和Observer观察者对象调用钩子函数处理得到一个observer ,Observer是一个接口,通过subscribeActual(observer)订阅观察者
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
// 这里的this 就是ObservableObserveOn,调用钩子函数重新得到一个observer
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
// 这个方法就是订阅观察者的方法
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
- ObservableObserveOn中调用subscribeActual(observer)方法,这里对观察者进行了封装,参数的观察者是内部类ObserveOnObserver。(其实这里不同的被观察者都会有这样的一个内部类)
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//创建一个线程,这里是传入的AndroidSchedulers.mainThread()
Scheduler.Worker w = scheduler.createWorker();
//调用这个subscribe方法,传入一个观察者(observer),delayError是延时错误(onError延时调用)
// bufferSize是缓冲区大小
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
- 实现的Observer接口的内容在这个内部类ObserveOnObserver中,
- onSubscribe方法
@Override
public void onSubscribe(Disposable s) {
if (DisposableHelper.validate(this.s, s)) {
this.s = s;
if (s instanceof QueueDisposable) {
@SuppressWarnings("unchecked")
QueueDisposable<T> qd = (QueueDisposable<T>) s;
int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
if (m == QueueDisposable.SYNC) {
sourceMode = m;
queue = qd;
done = true;
actual.onSubscribe(this);
schedule();
return;
}
if (m == QueueDisposable.ASYNC) {
sourceMode = m;
queue = qd;
actual.onSubscribe(this);
return;
}
}
queue = new SpscLinkedArrayQueue<T>(bufferSize);
actual.onSubscribe(this);
}
}
- onNext
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
- onError
@Override
public void onError(Throwable t) {
if (done) {
RxJavaPlugins.onError(t);
return;
}
error = t;
done = true;
schedule();
}
- onComplete
@Override
public void onComplete() {
if (done) {
return;
}
done = true;
schedule();
}
- dispose
@Override
public void dispose() {
if (!cancelled) {
cancelled = true;
s.dispose();
worker.dispose();
if (getAndIncrement() == 0) {
queue.clear();
}
}
}
- 这些方法都会调用schedule()方法,worker 就是传入的创建线程的对象Scheduler
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
subscribeOn和observeOn的线程切换
先看subscribeOn的流程
- 在subscribeOn(Schedulers.io())方法中传入的Schedulers这个类中有很多不同的线程对象,比如IO,SINGLE,COMPUTATION,TRAMPOLINE,NEW_THREAD。这些都是抽象类Scheduler。我们在代码中使用的时候调用的subscribe()方法中会调用subscribeActual(final Observer<? super T> s)方法,我们以ObservableSubscribeOn中的这个方法来举例,
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
// 这里就是调用的Scheduler中的scheduleDirect方法,这个parent会在后面使用,可以注意下
//parent作为一个dispose
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
source.subscribe(parent);
}
}));
}
2.在上面中的scheduler.scheduleDirect最终会调用前面提到的创建线程对象的createWorker()方法,但是这个方法是在Scheduler的子类中实现的,比如IoScheduler
public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
w.schedule(new Runnable() {
@Override
public void run() {
try {
decoratedRun.run();
} finally {
w.dispose();
}
}
}, delay, unit);
return w;
}
3.IoScheduler中的createWorker 方法返回的是一个实现Disposable的内部类EventLoopWorker,传入的参数是内部类CachedWorkerPool,这个CachedWorkerPool是一个实现Runnable的类,线程是从这个里面获取的ThreadWorker,
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
- EventLoopWorker 类
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
@Override
public void dispose() {
if (once.compareAndSet(false, true)) {
tasks.dispose();
// releasing the pool should be the last action
// should prevent pool reuse in case there is a blocking
// action not responding to cancellation
// threadWorker.scheduleDirect(() -> {
// pool.release(threadWorker);
// }, 0, TimeUnit.MILLISECONDS);
pool.release(threadWorker);
}
}
@Override
public boolean isDisposed() {
return once.get();
}
@Override
public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
// 主要是这个里面
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
- ThreadWorker 类
static final class ThreadWorker extends NewThreadWorker {
private long expirationTime;
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
public long getExpirationTime() {
return expirationTime;
}
public void setExpirationTime(long expirationTime) {
this.expirationTime = expirationTime;
}
}
- 看看上面的threadWorker.scheduleActual(action, delayTime, unit, tasks)方法
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
// 这个run 就是第一步中的parent ,是一个观察者SubscribeOnObserver
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//进一步的封装
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
// 执行线程池ScheduledExecutorService的submit方法 ,然后parent也就是SubscribeOnObserver的 方法会调用
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
parent.remove(sr);
RxJavaPlugins.onError(ex);
}
return sr;
}
- ObservableSubscribeOn中的subscribeActual会调用,也就是Observable对象的的订阅会执行,任务也就在子线程中执行了,线程就切换到了工作线程,这就是subscribeOn(Schedulers.io())的基本流程
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
@Override
public void run() {
// Observable对象的的订阅会执行
source.subscribe(parent);
}
}));
}
再来看看observeOn(AndroidSchedulers.mainThread())的基本流程
- 主要是在ObservableObserveOn中的subscribeActual方法,这里面也会调用createWorker方法,然后执行schedule方法,但是这里涉及到了RxAndroid,在之前提及到AndroidSchedulers.mainThread()是一个Handler
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
这里再看下AndroidSchedulers.mainThread()的值
private static final class MainHolder {
// 这里的是HandlerScheduler ,是RxAndroid中封装的,继承自RxJava中的Scheduler
static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
}
- HandlerScheduler中调用的createWorker返回的是一个HandlerWorker(handler),参数传入的是一个Handler
@Override
public Worker createWorker() {
return new HandlerWorker(handler);
}
- 再看看HandlerWorker 中的schedule方法,这里最终也是调用的传入的线程的run方法,也就是MainHandler的run方法,
private static final class HandlerWorker extends Worker {
private final Handler handler;
private volatile boolean disposed;
HandlerWorker(Handler handler) {
this.handler = handler;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (delay < 0) throw new IllegalArgumentException("delay < 0: " + delay);
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this; // Used as token for batch disposal of this worker's runnables.
// 通过handler发送过去的消息
handler.sendMessageDelayed(message, unit.toMillis(delay));
// Re-check disposed state for removing in case we were racing a call to dispose().
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
@Override
public void dispose() {
disposed = true;
handler.removeCallbacksAndMessages(this /* token */);
}
@Override
public boolean isDisposed() {
return disposed;
}
}
- 这里基本切换到主线程的流程也完了,大体都是通过不同的Scheduler(IoScheduler,HandlerScheduler等)来处理不同的schedule()方法进行线程间的切换。
网友评论