一直以来,想写点东西,但又不知道写什么。想到之前公司同事在使用使用RxJava的时候,总是抱怨对于RxJava的工作流程很难理解,今天就来说说RxJava吧,本文不讨论RxJava的各种操作符,只是从RxJava的源码分析一下它的执行流程。这也是自己第一次在网络写文章,培养一下自己的写作能力,写得不好,各位看官见笑了。
话不多说,直接开整,先来看一下RxJava的简单用法,例子如下:
Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
String s = "10086";
emitter.onNext(s);
}
})
.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
})
.subscribeOn(Schedulers.io())
.observeOn(AndroidSchedulers.mainThread())
.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
}
功能很简单,就是使用RxJava从上游向下游发送数据:
1)使用Observable静态方法create创建ObServable对象
2)map将String数据转化成Integer
3)subscribeOn转成IO线程
4)observeOn转成主线成
5)subscribe传入Consumer(消费者)用于消费从上游传下来的数据。
为了方便分析,我们先将代码做一层转换(实际开发中不建议这么写,否则就不能体现链式调度这个优点)。
//第一步
ObservableCreate observableCreate =
(ObservableCreate) Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("123");
}
});
//第二步
ObservableMap observableMap =
(ObservableMap) observableCreate.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
});
//第三步
ObservableSubscribeOn observableSubscribeOn =
(ObservableSubscribeOn) observableMap.subscribeOn(Schedulers.io());
//第四步
ObservableObserveOn observableObserveOn =
(ObservableObserveOn) observableSubscribeOn.observeOn(AndroidSchedulers.mainThread());
//第五步
LambdaObserver observer =
(LambdaObserver) observableObserveOn.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
接下来,一步一步分析:
Observable调用静态方法create方法,传进去一个叫ObservableOnSubscribe的参数,我们来按着执行顺序看看源代码:
//第一步
ObservableCreate observableCreate =
(ObservableCreate) Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
emitter.onNext("123");
}
});
//Observable类
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
ObjectHelper.requireNonNull(source, "source is null");
return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
}
//Observable类
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
Function<? super Observable, ? extends Observable> f = onObservableAssembly;
if (f != null) {
return apply(f, source);
}
return source;
}
我们重点关注ObservableCreate的实例化,onAssembly方法正常情况下就是返回实例化的ObservableCreate,之后会经常看到onAssembly这个方法,就自行忽略了。接下来跟进去,看看实例化都做了什么。
//ObservableCreate类
final ObservableOnSubscribe<T> source;
public ObservableCreate(ObservableOnSubscribe<T> source) {
this.source = source;
}
可以看到,ObservableCreate对象持有外面传进来的ObservableOnSubscribe实例。回到例子,可以看出来,静态方法create方法返回的是ObservableCreate对象。
接下来看第二步:
ObservableMap observableMap =
(ObservableMap) observableCreate.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
});
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
ObjectHelper.requireNonNull(mapper, "mapper is null");
return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}
可以看到,返回的ObservableCreate对象调用map方法,传进去一个Function对象。按前面第一步分析onAssembly方法,可以很容易知道,map方法返回的应该就是实例化的ObservableMap对象。继续往下看,ObservableMap实例化传了个this和mapper,跟进去看下:
//ObservableMap类
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
super(source);
this.function = function;
}
看下构造方法super(source),调用父类的构造方法,跟进去看看:
//ObservableMap类的父类
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
/** The source consumable Observable. */
protected final ObservableSource<T> source;
/**
* Constructs the ObservableSource with the given consumable.
* @param source the consumable Observable
*/
AbstractObservableWithUpstream(ObservableSource<T> source) {
this.source = source;
}
@Override
public final ObservableSource<T> source() {
return source;
}
}
可以知道,ObservableMap实例持有了this和mapper引用,这个this就是map调用者ObservableCreate实例,mapper就是Function实例。有没有看出什么特点?我们将持有引用链列出来看看:
1)ObservableMap ——> ObservableCreate和Function
2)ObservableCreate ——> ObservableOnSubscribe
想想看这个像什么?对,聪明的你可能想到了,俄罗斯套娃!一层套住一层。我们可以想象,之后的链每执行一个方法,也是在外面这么加一层套子。不信?我们继续往下看:
//第三步
ObservableSubscribeOn observableSubscribeOn =
(ObservableSubscribeOn) observableMap.subscribeOn(Schedulers.io());
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}
这里我们先记录一下,第三步subscribeOn方法传进去参数的是Schedulers.io(),一会儿用到的时候再分析,我们先知道它的作用是切换线程。
public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
super(source);
this.scheduler = scheduler;
}
看到了吧,一样的套路,一样的配方。这回加了一层ObservableSubscribeOn。接下来看第四步:
//第四步
ObservableObserveOn observableObserveOn =
(ObservableObserveOn) observableSubscribeOn.observeOn(AndroidSchedulers.mainThread());
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
return observeOn(scheduler, false, bufferSize());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
ObjectHelper.requireNonNull(scheduler, "scheduler is null");
ObjectHelper.verifyPositive(bufferSize, "bufferSize");
return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}
这里我们先记录一下,第四步subscribeOn方法传进去参数的是AndroidSchedulers.mainThread(),一会儿用到的时候再分析,我们先知道它的作用是切换线程。至此,两个切换线程方式都出现了,不过到目前为止,还没触发切换线程的动作。
public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
super(source);
this.scheduler = scheduler;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
所以执行了observeOn方法,就是又加了一层ObservableObserveOn。
如图1,每一圈代表一个实例,从外向里,除了最内一层之外,每一层都有个一source指向里面一层,代表持有引用。
从这些实例的命名,可以知道,这些类都是Oservabale。
俄罗斯套娃的玩法是怎么样,一层一层地套,套完了之后,就是解套。所以接下来是解套了。看第五步:
//第五步
LambdaObserver observer =
(LambdaObserver) observableObserveOn.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
}
});
从前面例子配合前面的分析,可以看出,最外层ObservableObserveOn实例调用subscribe方法,并且传入一个Consumer实例,如下:
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext) {
return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
Action onComplete, Consumer<? super Disposable> onSubscribe) {
ObjectHelper.requireNonNull(onNext, "onNext is null");
ObjectHelper.requireNonNull(onError, "onError is null");
ObjectHelper.requireNonNull(onComplete, "onComplete is null");
ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
subscribe(ls);
return ls;
}
这里先记录一下,执行subscribe会实例化一个LambdaObserver对象,并且当作参数往下执行。继续往下看:
@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
ObjectHelper.requireNonNull(observer, "observer is null");
try {
observer = RxJavaPlugins.onSubscribe(this, observer);
ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
subscribeActual(observer);
} catch (NullPointerException e) { // NOPMD
throw e;
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
// can't call onError because no way to know if a Disposable has been set or not
// can't call onSubscribe because the call might have set a Subscription already
RxJavaPlugins.onError(e);
NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
npe.initCause(e);
throw npe;
}
}
protected abstract void subscribeActual(Observer<? super T> observer);
我擦,抽象方法?那当然去调用者ObservableObserveOn里面找它的实现。
@Override
protected void subscribeActual(Observer<? super T> observer) {
if (scheduler instanceof TrampolineScheduler) {
source.subscribe(observer);
} else {
Scheduler.Worker w = scheduler.createWorker();
//解ObservableSubscribeOn这一层
source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
}
}
source指向的是ObservableSubscribeOn的实例,实例化ObserveOnObserver传进去的observer是LambdaObserver对象。我们看下ObserveOnObserver的构造函数:
ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
this.actual = actual;
this.worker = worker;
this.delayError = delayError;
this.bufferSize = bufferSize;
}
ObserveOnObserver将传入的LambdaObserver存起来,所以持有引用关系就是:
ObserveOnObserver ——> LambdaObserver
这里扯一下,ObserveOnObserver持有从外面传进来的Worker的引用,其实这个Worker就是之前AndroidSchedulers.mainThread()创建的,它的作用是用来做线程切换的,先记录一下,现在还没用到。
我们再去看ObservableSubscribeOn类里面找subscribe方法,找不到,去父类找,最后在Observable类找到了,其实也就是之前分析过的方法,所以回到ObservableSubscribeOn的subscribeActual,如下:
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
s.onSubscribe(parent);
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
又是实例化一个SubscribeOnObserver,传的参数是ObserveOnObserver,如下:
SubscribeOnObserver(Observer<? super T> actual) {
this.actual = actual;
this.s = new AtomicReference<Disposable>();
}
由此,可以知道,SubscribeOnObserver持有ObserveOnObserver实例的引用。所以又有以下持有引用关系:
1)ObserveOnObserver ——> LambdaObserver
2)SubscribeOnObserver ——> ObserveOnObserver
从继承关系可知,每个都是实现了Observer接口。又来????又是一次俄罗斯套娃,第一次是套Observable,这次是这边解Observable的,那边又套Observer。嘿嘿~
继续分析ObservableSubscribeOn的subscribeActual方法,实例化SubscribeTask,如下,可以看到这个SubscribeTask是实现了Runnable接口:
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;
}
@Override
public void run() {
source.subscribe(parent);
}
}
继续分析,执行scheduler(这个scheduler就是例子里面的Schedulers.io())的scheduleDirect方法,把SubscribeTask实例传进去。我们先看一下Schedulers.io()是个什么东西,如下:
//第1步,执行subscribeOn传入Schedulers.io(),去看第2步
ObservableSubscribeOn observableSubscribeOn =
(ObservableSubscribeOn) observableMap.subscribeOn(Schedulers.io());
@NonNull
public static Scheduler io() {
//第2步调用onIoScheduler(在第6步)方法,传入IO
//去第3步看IO是什么
return RxJavaPlugins.onIoScheduler(IO);
}
//第3步
//静态代码块,类加载的时候执行
static {
SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
//可以看到IO就是这里初始化
//new IOTask() 前往第4步
//initIoScheduler前往第7步
IO = RxJavaPlugins.initIoScheduler(new IOTask());
TRAMPOLINE = TrampolineScheduler.instance();
NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
// 第4步
static final class IOTask implements Callable<Scheduler> {
@Override
public Scheduler call() throws Exception {
return IoHolder.DEFAULT;
}
}
//第5步,实例化IoScheduler,所以IO就是IoScheduler
static final class IoHolder {
static final Scheduler DEFAULT = new IoScheduler();
}
//第6步
//是不是跟之前那个RxJavaPlugins.onAssembly很像,没错的,正常情况下就是返回传进来的参数
public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
if (f == null) {
return defaultScheduler;
}
return apply(f, defaultScheduler);
}
//第7步
//onInitIoHandler值是为空的,走f==null分支
@NonNull
public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
if (f == null) {
//前往第8步
return callRequireNonNull(defaultScheduler);
}
return applyRequireNonNull(f, defaultScheduler);
}
//第8步
@NonNull
static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
try {
//执行第4步的s.call(),返回IoScheduler对象
//requireNonNull前往第9步
return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
} catch (Throwable ex) {
throw ExceptionHelper.wrapOrThrow(ex);
}
}
//第9步
//返回的是s.call()的值,也就是IoScheduler对象。
public static <T> T requireNonNull(T object, String message) {
if (object == null) {
throw new NullPointerException(message);
}
return object;
}
所以最终可以看到,Schedulers.io()是IoScheduler对象。回到ObservableSubscribeOn的subscribeActual方法:
//第0步
@Override
public void subscribeActual(final Observer<? super T> s) {
final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
//与本文分析关系不重要,先忽略,有时间再分析
s.onSubscribe(parent);
//scheduler就是IoScheduler对象,scheduleDirect方法去看第1步
//SubscribeTask前往第12步
//setDisposable这个操作是跟拉断流dispose()有关,与本文关注点关系不大
parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
//第1步
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
//第2步
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();//createWorker是抽象方法,具体实现在IoScheduler类,前往第3步
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);//返回的还是传进去的Runnable
//这个操作是跟拉断流dispose()有关,与本文关注点关系不大
//就是多加一层Runnable最后还是执行decoratedRun的run方法
//前往第9步
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);//前往第4步,在EventLoopWorker类
return task;
}
//第3步
@NonNull
@Override
public Worker createWorker() {
return new EventLoopWorker(pool.get());
}
//第4步
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
//前往第5步
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
//第5步
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
//前往第6步看ScheduledRunnable
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
//传进来的delayTime为0
if (delayTime <= 0) {
//提交给executor线程池执行
//最后调用sr的call方法,前往第7步
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
//第6步
public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
super(3);
this.actual = actual;//就是之前的SubscribeTask
this.lazySet(0, parent);
}
//第7步
@Override
public Object call() {
// Being Callable saves an allocation in ThreadPoolExecutor
run();//前往第8步
return null;
}
//第8步
@Override
public void run() {
lazySet(THREAD_INDEX, Thread.currentThread());
try {
try {
//actual在第6步可知,就是传进来的,也就是第2步的DisposeTask,前往第10步
actual.run();
} catch (Throwable e) {
// Exceptions.throwIfFatal(e); nowhere to go
RxJavaPlugins.onError(e);
}
} finally {
、、、省略部分不关键代码、、、
}
}
//第9步
DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
this.decoratedRun = decoratedRun;//由第0步可知,是SubscribeTask
this.w = w;
}
//第10步
@Override
public void run() {
runner = Thread.currentThread();
try {
decoratedRun.run();//前往第13步
} finally {
dispose();
runner = null;
}
}
//第11步
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
//第12步
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;//parent是SubscribeOnObserver
}
第13步
@Override
public void run() {
//由图1的套娃关系图可知,source是ObservableMap对象
source.subscribe(parent);
}
}
至此,subscribeOn(Schedulers.io())已经执行完成,subscribeOn完成了线程切换,也就是说每执行一次,就切换一次线程。注意此时只是解套过程,并没有触发上游发送数据,而且这个切换线程只是发生在subscribe的方法, 这也是为什么多次执行subscribeOn方法会只有第一次subscribeOn有效的原因,来张图吧,容易理解一点:
图2
很多人会问,都已经有observeOn,为什么还要subscribeOn?因为observeOn(还没分析)是没办法控制数据源Observable.create的线程,所以得使用subscribeOn来控制。就像图2中,最后Observable.create是处于io线程的。
接下来继续之前的解俄罗斯套娃,先把之前解的步骤列出来:
1)解observableObserveOn,生成LambdaObserver和ObserveOnObserver对象,并且存在引用关系:ObserveOnObserver——>LambdaObserver
2)ObservableSubscribeOn,生成SubscribeOnObserver对象,并存在引用关系:SubscribeOnObserver——>ObserveOnObserver
所以接下来是解ObservableMap这一层:
//第1步
final class SubscribeTask implements Runnable {
private final SubscribeOnObserver<T> parent;
SubscribeTask(SubscribeOnObserver<T> parent) {
this.parent = parent;//之前分析可知,parent是SubscribeOnObserver
}
@Override
public void run() {
//之前分析可知,source是ObservableMap对象,
//subscribe最后还是调用ObservableMap的subscribeActual方法,
//就是解ObservableMap这一层,前往第2步
source.subscribe(parent);
}
}
//第2步
@Override
public void subscribeActual(Observer<? super U> t) {
//由第1步可知,t是SubscribeOnObserver对象
//实例化MapObserver前往第3步
//由例子可以知道function就是map方法传入的参数,如第5步所示
//source就是之前ObservableCreate,这里就是解ObservableCreate这里一层
source.subscribe(new MapObserver<T, U>(t, function));
}
//第3步
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
super(actual);//actual就是第1步的SubscribeOnObserver
this.mapper = mapper;
}
//第5步
ObservableMap observableMap =
(ObservableMap) observableCreate.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
});
所以解ObservableMap这一层,在subscribeActual会生成一个MapObserver对象,并且存在引用关系:
MapObserver——>SubscribeOnObserver
继续解ObservableCreate这一层:
//第1步
@Override
protected void subscribeActual(Observer<? super T> observer) {
//observer就是上面传过来的MapObserver
//创建CreateEmitter发射器
//所以持有引用关系CreateEmitter——>MapObserver
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//这个操作是跟拉断流dispose()有关,与本文关注点关系不大
observer.onSubscribe(parent);
try {
//source例子里面创建数据源的步骤,把CreateEmitter实例parent传进去
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
所以第一个套娃解开了,生成第二个套娃,引用链如下:
接下来开始解第二个套娃,也就是数据从上游向下游真正的发送过程:
//第1步
@Override
protected void subscribeActual(Observer<? super T> observer) {
//observer就是上面传过来的MapObserver
//创建CreateEmitter发射器,前往第2步
//所以持有引用关系CreateEmitter——>MapObserver
CreateEmitter<T> parent = new CreateEmitter<T>(observer);
//这个操作是跟拉断流dispose()有关,与本文关注点关系不大
observer.onSubscribe(parent);
try {
//source例子里面创建数据源的步骤,把CreateEmitter实例parent传进去,前往第3步
source.subscribe(parent);
} catch (Throwable ex) {
Exceptions.throwIfFatal(ex);
parent.onError(ex);
}
}
//第2步
CreateEmitter(Observer<? super T> observer) {
this.observer = observer;//observer就是MapObserver
}
//第3步
ObservableCreate observableCreate =
(ObservableCreate) Observable.create(new ObservableOnSubscribe<String>() {
@Override
public void subscribe(ObservableEmitter<String> emitter) throws Exception {
//emitter是第1步传过来的CreateEmitter,前往第4步
emitter.onNext("123");
}
});
//第4步
@Override
public void onNext(T t) {
if (t == null) {
onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
return;
}
if (!isDisposed()) {//数据流没被拉断
//从第2步看出,observer就是MapObserver
//所以这是去解MapObserver这一层,前往第5步
observer.onNext(t);
}
}
//第5步
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != NONE) {
actual.onNext(null);
return;
}
U v;
try {
//事件变换:mapper.apply(t),mapper是例子执行map方法传进来的,就是执行Integer.parseInt(s),如第6步
v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
} catch (Throwable ex) {
fail(ex);
return;
}
//由图3可知,actual指向ObservableSubscribeOn,
//也就是去解ObservableSubscribeOn这一层,如第7步
actual.onNext(v);
}
//第6步
ObservableMap observableMap =
(ObservableMap) observableCreate.map(new Function<String, Integer>() {
@Override
public Integer apply(String s) throws Exception {
return Integer.parseInt(s);
}
});
//第7步
@Override
public void onNext(T t) {
//由图3可知,actual指向ObserveOnObserver
//也就是解ObserveOnObserver这一层,前往第8步
actual.onNext(t);
}
//第8步
@Override
public void onNext(T t) {
if (done) {
return;
}
if (sourceMode != QueueDisposable.ASYNC) {
queue.offer(t);//把数据存起来,切换线程后使用
}
//前往第9步
schedule();
}
//第9步
void schedule() {
if (getAndIncrement() == 0) {
//切换线程,并执行本实例的run方法
worker.schedule(this);
}
}
其实worker就是之前传进来的AndroidSchedulers.mainThread(),就跟之前分析Schedulers.io()一样,最终切换线程,并执行本实例的run方法,这个切换是在数据发送过程的,所以每执行一个,RxJava的下一个操作就切换一次线程。
接着往下分析,从ObserveOnObserver的run方法开始:
//第1步
@Override
public void run() {
if (outputFused) {
drainFused();
} else {
//走这里
drainNormal();
}
}
//第2步
void drainNormal() {
int missed = 1;
final SimpleQueue<T> q = queue;//前面保存数据的队列
final Observer<? super T> a = actual;//指向LambdaObserver实例
for (;;) {
if (checkTerminated(done, q.isEmpty(), a)) {
return;
}
for (;;) {
boolean d = done;
T v;
v = q.poll();//从队列里面取出数据
//、、、省略一堆判断、、、
//往下发送
//去LambdaObserver这一层,前往第3步
a.onNext(v);
}
//、、、省略一堆判断、、、
}
}
//第3步
@Override
public void onNext(T t) {
if (!isDisposed()) {
try {
//onNext就是例子里面传进来的Function,如第4步
onNext.accept(t);
} catch (Throwable e) {
Exceptions.throwIfFatal(e);
get().dispose();
onError(e);
}
}
}
//第4步
LambdaObserver observer =
(LambdaObserver) observableObserveOn.subscribe(new Consumer<Integer>() {
@Override
public void accept(Integer integer) throws Exception {
System.out.println(integer);
}
});
至此,整个流程就解析完了。
总结一下:
1)套娃:ObservableObserveOn——>ObservableSubscribeOn——>ObservableMap——>ObservableCreate——>ObservableOnSubscribe,遇到observeOn,就把需要切换线程信息存起来,并不执行,发送数据的时候才使用。
2)之后调用subscribe,一层一层地解,每解一层就生成对应的Observer对象。最后解完第一个套娃,又生成新的套娃:CreateEmitter——>MapObserver——>SubscribeOnObserver——>ObserveOnObserver——>LambdaObserver,遇到subscribeOn,就直接执行切换线程,这是一个从下游往上游执行的过程,所以流下游的subscribeOn会被上游的覆盖,最终影响数据发送的就只有一个subscribeOn就是离源头最近的那个。
3)最后解第二个套娃,从源头一层一层地调用Observer的onNext()方法向下游发送数据
网友评论