下面谈谈我的理解
rxjava原理说白了就是三条流:
- API构建流
- 事件订阅流
- 事件回调流
这里结合retrofit以接口调用流程为例做源码分析
retrofit源码浅析有说到,retrofit设置RxJava2CallAdapterFactory最终会创建CallObservable,在CallObservable中发起网络请求。
如此说来
retrofit配合rxjava网络调用流程
CallObservable.
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe{bean->
}
1. 构建流
既然是链式调用,各种操作符必然都生成同类型对象。
-
CallObservable--->Observable
-
subscribeOn--->ObservableSubscribeOn
--->AbstractObservableWithUpstream--->Observable
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
AbstractObservableWithUpstream抽象类实现HasUpstreamObservableSource接口
重写了source()方法
protected final ObservableSource<T> source;
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
subscribeOn操作符--->new ObservableSubscribeOn<T>(this, scheduler)
此处this即为source,也就是上游Observable作为下游source对象。
- observeOn--->ObservableObserveOn
--->AbstractObservableWithUpstream--->Observable
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
同样ObservableObserveOn继承AbstractObservableWithUpstream实现HasUpstreamObservableSource接口保存了上游Observable对象。
- subscribe开始订阅
2. 订阅流
Observable实现ObservableSource接口,重写订阅方法。
public interface ObservableSource<T> {
void subscribe(@NonNull Observer<? super T> observer);
}
上述构建流调用Observable.subscribe(Consumer<? super T> onNext)
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "The RxJavaPlugins.onSubscribe hook returned a null Observer. Please change the handler provided to RxJavaPlugins.setOnObservableSubscribe for invalid null returns. Further reading: https://github.com/ReactiveX/RxJava/wiki/Plugins");
//重点
subscribeActual(observer);
} catch (NullPointerException e) {
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
无论哪个subscribe()方法最终都会走到ObservableSource接口subscribe(observer)方法
内部调用 subscribeActual(observer)
protected abstract void subscribeActual(Observer<? super T> observer);
subscribeActual(observer)是个抽象方法,各Observable包装类重写该方法。
subscribe订阅逻辑实际上就是调用Observable.subscribeActual(observer)方法。
回到订阅流,上述构建流最后由observeOn操作符生成ObservableObserveOn对象。
即ObservableObserveOn.subscribeActual(observer)
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
//包装observer
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
source.subscribe(observer)
前面订阅流说过source是上游Observable对象即
subscribeOn操作符生成的ObservableSubscribeOn对象
ObservableSubscribeOn.subscribeActual(observer)
@Override
public void subscribeActual(final Observer<? super T> observer) {
//包装下游observer
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
这里绕了一下
new SubscribeTask(parent)
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
SubscribeTask实现Runnable接口
在run()方法中调用source.subscribe(parent)
也就是上游对象CallObservable.subscribeActual(observer)
@Override protected void subscribeActual(Observer<? super Response<T>> observer) {
Call<T> call = originalCall.clone();
observer.onSubscribe(new CallDisposable(call));
boolean terminated = false;
try {
//okhttp同步请求
Response<T> response = call.execute();
if (!call.isCanceled()) {
observer.onNext(response);
}
if (!call.isCanceled()) {
terminated = true;
observer.onComplete();
}
} catch (Throwable t) {
Exceptions.throwIfFatal(t);
if (terminated) {
RxJavaPlugins.onError(t);
} else if (!call.isCanceled()) {
try {
observer.onError(t);
} catch (Throwable inner) {
Exceptions.throwIfFatal(inner);
RxJavaPlugins.onError(new CompositeException(t, inner));
}
}
}
}
订阅流由下游回到上游
网络请求Call封装到Disposable
observer.onSubscribe(new CallDisposable(call));
private static final class CallDisposable implements Disposable {
private final Call<?> call;
CallDisposable(Call<?> call) {
this.call = call;
}
@Override public void dispose() {
call.cancel();
}
@Override public boolean isDisposed() {
return call.isCanceled();
}
}
CallObservable.subscribeActual()中发起网络请求
然后回调observer.onSubscribe()/onNext()/onComplete()/onError()
3. 回调流
订阅流中subscribeActual()方法调用上游Observable.subscribe(observer)
将下游observer对象包装传递到上游
CallObservable中observer.onNext()即调用下游observer对象onNext()方法。
回看ObservableSubscribeOn.subscribeActual()
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
parent--->SubscribeOnObserver
final Observer<? super T> downstream;
SubscribeOnObserver(Observer<? super T> downstream) {
this.downstream = downstream;
this.upstream = new AtomicReference<Disposable>();
}
@Override
public void onSubscribe(Disposable d) {
DisposableHelper.setOnce(this.upstream, d);
}
@Override
public void onNext(T t) {
downstream.onNext(t);
}
@Override
public void onError(Throwable t) {
downstream.onError(t);
}
@Override
public void onComplete() {
downstream.onComplete();
}
downstream.onNext(t)继续调用下游Observer.onNext()
回看ObservableObserveOn.subscribeActual()
下游Observer--->ObserveOnObserver
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
//scheduler是ObserveOn操作符传入的AndroidSchedulers.mainThread()
Scheduler.Worker w = scheduler.createWorker();
//ObserveOnObserver传递到上游
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
顺带看下线程切换
AndroidSchedulers.mainThread()
private static final class MainHolder {
static final Scheduler DEFAULT
= new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
}
private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
new Callable<Scheduler>() {
@Override public Scheduler call() throws Exception {
return MainHolder.DEFAULT;
}
});
public static Scheduler mainThread() {
return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
}
MainHolder.DEFAULT即HandlerScheduler--->new Handler(Looper.getMainLooper())
封装了主线程handler
RxAndroidPlugins
public static Scheduler initMainThreadScheduler(Callable<Scheduler> scheduler) {
if (scheduler == null) {
throw new NullPointerException("scheduler == null");
}
Function<Callable<Scheduler>, Scheduler> f = onInitMainThreadHandler;
if (f == null) {
return callRequireNonNull(scheduler);
}
return applyRequireNonNull(f, scheduler);
}
public static Scheduler onMainThreadScheduler(Scheduler scheduler) {
if (scheduler == null) {
throw new NullPointerException("scheduler == null");
}
Function<Scheduler, Scheduler> f = onMainThreadHandler;
if (f == null) {
return scheduler;
}
return apply(f, scheduler);
}
AndroidSchedulers.mainThread()返回HandlerScheduler
HandlerScheduler.createWorker()--->HandlerWorker
public Worker createWorker() {
return new HandlerWorker(handler, async);
}
private static final class HandlerWorker extends Worker {
private final Handler handler;
private final boolean async;
private volatile boolean disposed;
HandlerWorker(Handler handler, boolean async) {
this.handler = handler;
this.async = async;
}
@Override
public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
if (run == null) throw new NullPointerException("run == null");
if (unit == null) throw new NullPointerException("unit == null");
if (disposed) {
return Disposables.disposed();
}
run = RxJavaPlugins.onSchedule(run);
ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
Message message = Message.obtain(handler, scheduled);
message.obj = this;
if (async) {
message.setAsynchronous(true);
}
handler.sendMessageDelayed(message, unit.toMillis(delay));
if (disposed) {
handler.removeCallbacks(scheduled);
return Disposables.disposed();
}
return scheduled;
}
}
通过handler和message切换到主线程执行Runnable
回到ObserveOnObserver.onNext()
final Scheduler.Worker worker;
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.downstream = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);
}
schedule();
}
void schedule() {
if (getAndIncrement() == 0) {
worker.schedule(this);
}
}
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
ObserveOnObserver实现Runnable接口
worker.schedule(this)
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
调用到上述实现类
HandlerWorker.schedule(Runnable run, long delay, TimeUnit unit)
Runnable即ObserveOnObserver
ObserveOnObserver.run()
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
drainNormal();
}
}
ObserveOnObserver.drainNormal()
void drainNormal() {
...
final Observer<? super T> a = downstream;
...
a.onNext(v);
}
继续调用下游Observer.onNext
也就是subscribe操作符中用户自定义的Observer.onNext()
至此回调流结束,整个事件消费完毕。
ObserveOn操作符切换线程,影响回调流执行。也就是操作符下游Observer回调方法执行所在线程。
继续看subscribeOn操作符
构建流中subscribeOn(Schedulers.io())
Schedulers.io() static块中初始化
@NonNull
static final Scheduler IO;
public static Scheduler io() {
return RxJavaPlugins.onIoScheduler(IO);
}
static {
...
IO = RxJavaPlugins.initIoScheduler(new IOTask());
...
}
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
IoScheduler
static final RxThreadFactory WORKER_THREAD_FACTORY;
public IoScheduler() {
this(WORKER_THREAD_FACTORY);
}
public IoScheduler(ThreadFactory threadFactory) {
this.threadFactory = threadFactory;
this.pool = new AtomicReference<CachedWorkerPool>(NONE);
start();
}
static {
...
WORKER_THREAD_FACTORY = new RxThreadFactory(WORKER_THREAD_NAME_PREFIX, priority);
NONE = new CachedWorkerPool(0, null, WORKER_THREAD_FACTORY);
...
}
RxJavaPlugins
@NonNull
public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
if (f == null) {
return callRequireNonNull(defaultScheduler);
}
return applyRequireNonNull(f, defaultScheduler);
}
@NonNull
public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
if (f == null) {
return defaultScheduler;
}
return apply(f, defaultScheduler);
}
回到ObservableSubscribeOn.subscribeActual()
@Override
public void subscribeActual(final Observer<? super T> observer) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
observer.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
scheduler.scheduleDirect()
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
createWorker()是抽象方法,这里调用上述实现类IoScheduler.createWorker()
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
return EmptyDisposable.INSTANCE;
}
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
w.schedule(task, delay, unit)即EventLoopWorker.schedule()
threadWorker.scheduleActual()
threadWorker本身有缓存池操作,这里不细说。
threadWorker--->NewThreadWorker
NewThreadWorker.scheduleActual()
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
将订阅流中SubscribeTask这个Runnable包装成ScheduledRunnable
executor.submit()调用线程池执行
网友评论