美文网首页
RxJava执行流程分析

RxJava执行流程分析

作者: Smali_ling | 来源:发表于2020-04-11 13:58 被阅读0次

一直以来,想写点东西,但又不知道写什么。想到之前公司同事在使用使用RxJava的时候,总是抱怨对于RxJava的工作流程很难理解,今天就来说说RxJava吧,本文不讨论RxJava的各种操作符,只是从RxJava的源码分析一下它的执行流程。这也是自己第一次在网络写文章,培养一下自己的写作能力,写得不好,各位看官见笑了。

话不多说,直接开整,先来看一下RxJava的简单用法,例子如下:

  Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            String s = "10086";
            emitter.onNext(s);
        }
    })
    .map(new Function<String, Integer>() {
        @Override
        public Integer apply(String s) throws Exception {
            return Integer.parseInt(s);
        }
    })
    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    .subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println(integer);
        }
    });
}

功能很简单,就是使用RxJava从上游向下游发送数据:
1)使用Observable静态方法create创建ObServable对象
2)map将String数据转化成Integer
3)subscribeOn转成IO线程
4)observeOn转成主线成
5)subscribe传入Consumer(消费者)用于消费从上游传下来的数据。
为了方便分析,我们先将代码做一层转换(实际开发中不建议这么写,否则就不能体现链式调度这个优点)。

  //第一步
  ObservableCreate observableCreate =
            (ObservableCreate) Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("123");
        }
    });
    //第二步
    ObservableMap observableMap =
            (ObservableMap) observableCreate.map(new Function<String, Integer>() {
        @Override
        public Integer apply(String s) throws Exception {
            return Integer.parseInt(s);
        }
    });
    //第三步
    ObservableSubscribeOn observableSubscribeOn =
            (ObservableSubscribeOn) observableMap.subscribeOn(Schedulers.io());
    //第四步
    ObservableObserveOn observableObserveOn =
            (ObservableObserveOn) observableSubscribeOn.observeOn(AndroidSchedulers.mainThread());
    //第五步
    LambdaObserver observer =
            (LambdaObserver) observableObserveOn.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
            System.out.println(integer);
        }
    });

接下来,一步一步分析:
Observable调用静态方法create方法,传进去一个叫ObservableOnSubscribe的参数,我们来按着执行顺序看看源代码:

//第一步
ObservableCreate observableCreate =
            (ObservableCreate) Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            emitter.onNext("123");
        }
    });

//Observable类
@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
    ObjectHelper.requireNonNull(source, "source is null");
    return RxJavaPlugins.onAssembly(new ObservableCreate<T>(source));
} 
//Observable类
public static <T> Observable<T> onAssembly(@NonNull Observable<T> source) {
    Function<? super Observable, ? extends Observable> f = onObservableAssembly;
    if (f != null) {
        return apply(f, source);
    }
    return source;
}

我们重点关注ObservableCreate的实例化,onAssembly方法正常情况下就是返回实例化的ObservableCreate,之后会经常看到onAssembly这个方法,就自行忽略了。接下来跟进去,看看实例化都做了什么。

//ObservableCreate类
final ObservableOnSubscribe<T> source;

public ObservableCreate(ObservableOnSubscribe<T> source) {
    this.source = source;
}

可以看到,ObservableCreate对象持有外面传进来的ObservableOnSubscribe实例。回到例子,可以看出来,静态方法create方法返回的是ObservableCreate对象。
接下来看第二步:

 ObservableMap observableMap =
            (ObservableMap) observableCreate.map(new Function<String, Integer>() {
        @Override
        public Integer apply(String s) throws Exception {
            return Integer.parseInt(s);
        }
 });

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
    ObjectHelper.requireNonNull(mapper, "mapper is null");
    return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
}

可以看到,返回的ObservableCreate对象调用map方法,传进去一个Function对象。按前面第一步分析onAssembly方法,可以很容易知道,map方法返回的应该就是实例化的ObservableMap对象。继续往下看,ObservableMap实例化传了个this和mapper,跟进去看下:

//ObservableMap类
final Function<? super T, ? extends U> function;
public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
    super(source);
    this.function = function;
}

看下构造方法super(source),调用父类的构造方法,跟进去看看:

 //ObservableMap类的父类
abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
    /** The source consumable Observable. */
    protected final ObservableSource<T> source;
    /**
      * Constructs the ObservableSource with the given consumable.
      * @param source the consumable Observable
      */
    AbstractObservableWithUpstream(ObservableSource<T> source) {
         this.source = source;
    }
    @Override
    public final ObservableSource<T> source() {
         return source;
    }
}

可以知道,ObservableMap实例持有了this和mapper引用,这个this就是map调用者ObservableCreate实例,mapper就是Function实例。有没有看出什么特点?我们将持有引用链列出来看看:
1)ObservableMap ——> ObservableCreate和Function
2)ObservableCreate ——> ObservableOnSubscribe
想想看这个像什么?对,聪明的你可能想到了,俄罗斯套娃!一层套住一层。我们可以想象,之后的链每执行一个方法,也是在外面这么加一层套子。不信?我们继续往下看:

//第三步
ObservableSubscribeOn observableSubscribeOn =
            (ObservableSubscribeOn) observableMap.subscribeOn(Schedulers.io());


@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> subscribeOn(Scheduler scheduler) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
}

这里我们先记录一下,第三步subscribeOn方法传进去参数的是Schedulers.io(),一会儿用到的时候再分析,我们先知道它的作用是切换线程。

public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;

public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
    super(source);
    this.scheduler = scheduler;
}

看到了吧,一样的套路,一样的配方。这回加了一层ObservableSubscribeOn。接下来看第四步:

 //第四步
ObservableObserveOn observableObserveOn =
            (ObservableObserveOn) observableSubscribeOn.observeOn(AndroidSchedulers.mainThread());

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler) {
    return observeOn(scheduler, false, bufferSize());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.CUSTOM)
public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
    ObjectHelper.requireNonNull(scheduler, "scheduler is null");
    ObjectHelper.verifyPositive(bufferSize, "bufferSize");
    return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
}

这里我们先记录一下,第四步subscribeOn方法传进去参数的是AndroidSchedulers.mainThread(),一会儿用到的时候再分析,我们先知道它的作用是切换线程。至此,两个切换线程方式都出现了,不过到目前为止,还没触发切换线程的动作。

public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
final Scheduler scheduler;
final boolean delayError;
final int bufferSize;
public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
    super(source);
    this.scheduler = scheduler;
    this.delayError = delayError;
    this.bufferSize = bufferSize;
}

所以执行了observeOn方法,就是又加了一层ObservableObserveOn。
如图1,每一圈代表一个实例,从外向里,除了最内一层之外,每一层都有个一source指向里面一层,代表持有引用。

图1
从这些实例的命名,可以知道,这些类都是Oservabale。
俄罗斯套娃的玩法是怎么样,一层一层地套,套完了之后,就是解套。所以接下来是解套了。看第五步:
//第五步
LambdaObserver observer =
            (LambdaObserver) observableObserveOn.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {

        }
    });

从前面例子配合前面的分析,可以看出,最外层ObservableObserveOn实例调用subscribe方法,并且传入一个Consumer实例,如下:

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext) {
    return subscribe(onNext, Functions.ON_ERROR_MISSING, Functions.EMPTY_ACTION, Functions.emptyConsumer());
}

@CheckReturnValue
@SchedulerSupport(SchedulerSupport.NONE)
public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
        Action onComplete, Consumer<? super Disposable> onSubscribe) {
    ObjectHelper.requireNonNull(onNext, "onNext is null");
    ObjectHelper.requireNonNull(onError, "onError is null");
    ObjectHelper.requireNonNull(onComplete, "onComplete is null");
    ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");

    LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
    subscribe(ls);
    return ls;
}

这里先记录一下,执行subscribe会实例化一个LambdaObserver对象,并且当作参数往下执行。继续往下看:

@SchedulerSupport(SchedulerSupport.NONE)
@Override
public final void subscribe(Observer<? super T> observer) {
    ObjectHelper.requireNonNull(observer, "observer is null");
    try {
        observer = RxJavaPlugins.onSubscribe(this, observer);
        ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

        subscribeActual(observer);
    } catch (NullPointerException e) { // NOPMD
        throw e;
    } catch (Throwable e) {
        Exceptions.throwIfFatal(e);
        // can't call onError because no way to know if a Disposable has been set or not
        // can't call onSubscribe because the call might have set a Subscription already
        RxJavaPlugins.onError(e);

        NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
        npe.initCause(e);
        throw npe;
    }
}

protected abstract void subscribeActual(Observer<? super T> observer);

我擦,抽象方法?那当然去调用者ObservableObserveOn里面找它的实现。

@Override
protected void subscribeActual(Observer<? super T> observer) {
    if (scheduler instanceof TrampolineScheduler) {
        source.subscribe(observer);
    } else {
        Scheduler.Worker w = scheduler.createWorker();
        //解ObservableSubscribeOn这一层
        source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
    }
}

source指向的是ObservableSubscribeOn的实例,实例化ObserveOnObserver传进去的observer是LambdaObserver对象。我们看下ObserveOnObserver的构造函数:

ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
        this.actual = actual;
        this.worker = worker;
        this.delayError = delayError;
        this.bufferSize = bufferSize;
    }

ObserveOnObserver将传入的LambdaObserver存起来,所以持有引用关系就是:
ObserveOnObserver ——> LambdaObserver
这里扯一下,ObserveOnObserver持有从外面传进来的Worker的引用,其实这个Worker就是之前AndroidSchedulers.mainThread()创建的,它的作用是用来做线程切换的,先记录一下,现在还没用到。
我们再去看ObservableSubscribeOn类里面找subscribe方法,找不到,去父类找,最后在Observable类找到了,其实也就是之前分析过的方法,所以回到ObservableSubscribeOn的subscribeActual,如下:

@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    s.onSubscribe(parent);
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}

又是实例化一个SubscribeOnObserver,传的参数是ObserveOnObserver,如下:

SubscribeOnObserver(Observer<? super T> actual) {
        this.actual = actual;
        this.s = new AtomicReference<Disposable>();
    }

由此,可以知道,SubscribeOnObserver持有ObserveOnObserver实例的引用。所以又有以下持有引用关系:
1)ObserveOnObserver ——> LambdaObserver
2)SubscribeOnObserver ——> ObserveOnObserver
从继承关系可知,每个都是实现了Observer接口。又来????又是一次俄罗斯套娃,第一次是套Observable,这次是这边解Observable的,那边又套Observer。嘿嘿~

继续分析ObservableSubscribeOn的subscribeActual方法,实例化SubscribeTask,如下,可以看到这个SubscribeTask是实现了Runnable接口:

final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;

    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;
    }

    @Override
    public void run() {
        source.subscribe(parent);
    }
}

继续分析,执行scheduler(这个scheduler就是例子里面的Schedulers.io())的scheduleDirect方法,把SubscribeTask实例传进去。我们先看一下Schedulers.io()是个什么东西,如下:

//第1步,执行subscribeOn传入Schedulers.io(),去看第2步
ObservableSubscribeOn observableSubscribeOn =
            (ObservableSubscribeOn) observableMap.subscribeOn(Schedulers.io());

@NonNull
public static Scheduler io() {
    //第2步调用onIoScheduler(在第6步)方法,传入IO
    //去第3步看IO是什么
    return RxJavaPlugins.onIoScheduler(IO);
}
  //第3步
 //静态代码块,类加载的时候执行
static {
    SINGLE = RxJavaPlugins.initSingleScheduler(new SingleTask());
    COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
    //可以看到IO就是这里初始化
    //new IOTask() 前往第4步
    //initIoScheduler前往第7步
    IO = RxJavaPlugins.initIoScheduler(new IOTask());
    TRAMPOLINE = TrampolineScheduler.instance();
    NEW_THREAD = RxJavaPlugins.initNewThreadScheduler(new NewThreadTask());
}
// 第4步
static final class IOTask implements Callable<Scheduler> {
    @Override
    public Scheduler call() throws Exception {
        return IoHolder.DEFAULT;
    }
}
//第5步,实例化IoScheduler,所以IO就是IoScheduler
static final class IoHolder {
    static final Scheduler DEFAULT = new IoScheduler();
}
//第6步
//是不是跟之前那个RxJavaPlugins.onAssembly很像,没错的,正常情况下就是返回传进来的参数
public static Scheduler onIoScheduler(@NonNull Scheduler defaultScheduler) {
    Function<? super Scheduler, ? extends Scheduler> f = onIoHandler;
    if (f == null) {
        return defaultScheduler;
    }
    return apply(f, defaultScheduler);
}
//第7步
//onInitIoHandler值是为空的,走f==null分支
@NonNull
public static Scheduler initIoScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
    ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
    Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitIoHandler;
    if (f == null) {
        //前往第8步
        return callRequireNonNull(defaultScheduler);
    }
    return applyRequireNonNull(f, defaultScheduler);
}
//第8步
@NonNull
static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
    try {
        //执行第4步的s.call(),返回IoScheduler对象
       //requireNonNull前往第9步
        return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
    } catch (Throwable ex) {
        throw ExceptionHelper.wrapOrThrow(ex);
    }
}
 //第9步
//返回的是s.call()的值,也就是IoScheduler对象。
public static <T> T requireNonNull(T object, String message) {
    if (object == null) {
        throw new NullPointerException(message);
    }
    return object;
}

所以最终可以看到,Schedulers.io()是IoScheduler对象。回到ObservableSubscribeOn的subscribeActual方法:

//第0步
@Override
public void subscribeActual(final Observer<? super T> s) {
    final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    //与本文分析关系不重要,先忽略,有时间再分析
    s.onSubscribe(parent);
    //scheduler就是IoScheduler对象,scheduleDirect方法去看第1步
    //SubscribeTask前往第12步
   //setDisposable这个操作是跟拉断流dispose()有关,与本文关注点关系不大
    parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
}
 //第1步
@NonNull
public Disposable scheduleDirect(@NonNull Runnable run) {
    return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
//第2步
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    final Worker w = createWorker();//createWorker是抽象方法,具体实现在IoScheduler类,前往第3步
    final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);//返回的还是传进去的Runnable
    //这个操作是跟拉断流dispose()有关,与本文关注点关系不大
   //就是多加一层Runnable最后还是执行decoratedRun的run方法
   //前往第9步
    DisposeTask task = new DisposeTask(decoratedRun, w);
    w.schedule(task, delay, unit);//前往第4步,在EventLoopWorker类
    return task;
}
//第3步
@NonNull
@Override
public Worker createWorker() {
    return new EventLoopWorker(pool.get());
}
 //第4步
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
     if (tasks.isDisposed()) {
         // don't schedule, we are unsubscribed
         return EmptyDisposable.INSTANCE;
     }
     //前往第5步
     return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
//第5步
@NonNull
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
    Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    //前往第6步看ScheduledRunnable
    ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

    if (parent != null) {
        if (!parent.add(sr)) {
            return sr;
        }
    }

    Future<?> f;
    try {
        //传进来的delayTime为0
        if (delayTime <= 0) {
            //提交给executor线程池执行
           //最后调用sr的call方法,前往第7步
            f = executor.submit((Callable<Object>)sr);
        } else {
            f = executor.schedule((Callable<Object>)sr, delayTime, unit);
        }
        sr.setFuture(f);
    } catch (RejectedExecutionException ex) {
        if (parent != null) {
            parent.remove(sr);
        }
        RxJavaPlugins.onError(ex);
    }

    return sr;
}
//第6步
public ScheduledRunnable(Runnable actual, DisposableContainer parent) {
    super(3);
    this.actual = actual;//就是之前的SubscribeTask
    this.lazySet(0, parent);
}
//第7步
@Override
public Object call() {
    // Being Callable saves an allocation in ThreadPoolExecutor
    run();//前往第8步
    return null;
}
//第8步
@Override
public void run() {
    lazySet(THREAD_INDEX, Thread.currentThread());
    try {
        try {
            //actual在第6步可知,就是传进来的,也就是第2步的DisposeTask,前往第10步
            actual.run();
        } catch (Throwable e) {
            // Exceptions.throwIfFatal(e); nowhere to go
            RxJavaPlugins.onError(e);
        }
    } finally {
      、、、省略部分不关键代码、、、
    }
}
 //第9步
 DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
        this.decoratedRun = decoratedRun;//由第0步可知,是SubscribeTask
        this.w = w;
    }
     //第10步
    @Override
    public void run() {
        runner = Thread.currentThread();
        try {
            decoratedRun.run();//前往第13步
        } finally {
            dispose();
            runner = null;
        }
    }
  //第11步
 final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;
    //第12步
    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;//parent是SubscribeOnObserver
    }
    第13步
    @Override
    public void run() {
        //由图1的套娃关系图可知,source是ObservableMap对象
        source.subscribe(parent);
    }
}
至此,subscribeOn(Schedulers.io())已经执行完成,subscribeOn完成了线程切换,也就是说每执行一次,就切换一次线程。注意此时只是解套过程,并没有触发上游发送数据,而且这个切换线程只是发生在subscribe的方法, 这也是为什么多次执行subscribeOn方法会只有第一次subscribeOn有效的原因,来张图吧,容易理解一点: 图2

很多人会问,都已经有observeOn,为什么还要subscribeOn?因为observeOn(还没分析)是没办法控制数据源Observable.create的线程,所以得使用subscribeOn来控制。就像图2中,最后Observable.create是处于io线程的。
接下来继续之前的解俄罗斯套娃,先把之前解的步骤列出来:
1)解observableObserveOn,生成LambdaObserver和ObserveOnObserver对象,并且存在引用关系:ObserveOnObserver——>LambdaObserver
2)ObservableSubscribeOn,生成SubscribeOnObserver对象,并存在引用关系:SubscribeOnObserver——>ObserveOnObserver
所以接下来是解ObservableMap这一层:

 //第1步
final class SubscribeTask implements Runnable {
    private final SubscribeOnObserver<T> parent;
    SubscribeTask(SubscribeOnObserver<T> parent) {
        this.parent = parent;//之前分析可知,parent是SubscribeOnObserver
    }
    @Override
    public void run() {
        //之前分析可知,source是ObservableMap对象,
       //subscribe最后还是调用ObservableMap的subscribeActual方法,
       //就是解ObservableMap这一层,前往第2步
        source.subscribe(parent);
    }
}
//第2步
@Override
public void subscribeActual(Observer<? super U> t) {
     //由第1步可知,t是SubscribeOnObserver对象
     //实例化MapObserver前往第3步
     //由例子可以知道function就是map方法传入的参数,如第5步所示
    //source就是之前ObservableCreate,这里就是解ObservableCreate这里一层
    source.subscribe(new MapObserver<T, U>(t, function));
}
//第3步 
MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
      super(actual);//actual就是第1步的SubscribeOnObserver
      this.mapper = mapper;
}   
//第5步
ObservableMap observableMap =
            (ObservableMap) observableCreate.map(new Function<String, Integer>() {
        @Override
        public Integer apply(String s) throws Exception {
            return Integer.parseInt(s);
        }
    });

所以解ObservableMap这一层,在subscribeActual会生成一个MapObserver对象,并且存在引用关系:
MapObserver——>SubscribeOnObserver
继续解ObservableCreate这一层:

//第1步
@Override
protected void subscribeActual(Observer<? super T> observer) {
    //observer就是上面传过来的MapObserver
   //创建CreateEmitter发射器
   //所以持有引用关系CreateEmitter——>MapObserver
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //这个操作是跟拉断流dispose()有关,与本文关注点关系不大
    observer.onSubscribe(parent);

    try {
        //source例子里面创建数据源的步骤,把CreateEmitter实例parent传进去
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}

所以第一个套娃解开了,生成第二个套娃,引用链如下:

CreateEmitter——>MapObserver——>SubscribeOnObserver——>ObserveOnObserver——>LambdaObserver,来张图吧(颜色跟图1无关,只为区分不同实例): 图3
接下来开始解第二个套娃,也就是数据从上游向下游真正的发送过程:
 //第1步
@Override
protected void subscribeActual(Observer<? super T> observer) {
    //observer就是上面传过来的MapObserver
   //创建CreateEmitter发射器,前往第2步
   //所以持有引用关系CreateEmitter——>MapObserver
    CreateEmitter<T> parent = new CreateEmitter<T>(observer);
    //这个操作是跟拉断流dispose()有关,与本文关注点关系不大
    observer.onSubscribe(parent);

    try {
        //source例子里面创建数据源的步骤,把CreateEmitter实例parent传进去,前往第3步
        source.subscribe(parent);
    } catch (Throwable ex) {
        Exceptions.throwIfFatal(ex);
        parent.onError(ex);
    }
}
//第2步
CreateEmitter(Observer<? super T> observer) {
     this.observer = observer;//observer就是MapObserver
}
//第3步
ObservableCreate observableCreate =
            (ObservableCreate) Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
            //emitter是第1步传过来的CreateEmitter,前往第4步
            emitter.onNext("123");
        }
    });
   //第4步
   @Override
    public void onNext(T t) {
        if (t == null) {
            onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
            return;
        }
        if (!isDisposed()) {//数据流没被拉断
            //从第2步看出,observer就是MapObserver
           //所以这是去解MapObserver这一层,前往第5步
            observer.onNext(t);
        }
    }
    //第5步
    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }

        if (sourceMode != NONE) {
            actual.onNext(null);
            return;
        }

        U v;

        try {
            //事件变换:mapper.apply(t),mapper是例子执行map方法传进来的,就是执行Integer.parseInt(s),如第6步
            v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
        } catch (Throwable ex) {
            fail(ex);
            return;
        }
        //由图3可知,actual指向ObservableSubscribeOn,
       //也就是去解ObservableSubscribeOn这一层,如第7步
        actual.onNext(v);
    }
   //第6步
   ObservableMap observableMap =
            (ObservableMap) observableCreate.map(new Function<String, Integer>() {
        @Override
        public Integer apply(String s) throws Exception {
            return Integer.parseInt(s);
        }
    });
     //第7步
     @Override
    public void onNext(T t) {
        //由图3可知,actual指向ObserveOnObserver
       //也就是解ObserveOnObserver这一层,前往第8步
        actual.onNext(t);
    }
    //第8步
    @Override
    public void onNext(T t) {
        if (done) {
            return;
        }

        if (sourceMode != QueueDisposable.ASYNC) {
            queue.offer(t);//把数据存起来,切换线程后使用
        }
        //前往第9步
        schedule();
    }
   //第9步
   void schedule() {
        if (getAndIncrement() == 0) {
            //切换线程,并执行本实例的run方法
            worker.schedule(this);
        }
    }

其实worker就是之前传进来的AndroidSchedulers.mainThread(),就跟之前分析Schedulers.io()一样,最终切换线程,并执行本实例的run方法,这个切换是在数据发送过程的,所以每执行一个,RxJava的下一个操作就切换一次线程。
接着往下分析,从ObserveOnObserver的run方法开始:

    //第1步
   @Override
    public void run() {
        if (outputFused) {
            drainFused();
        } else {
            //走这里
            drainNormal();
        }
    }
  //第2步
  void drainNormal() {
        int missed = 1;

        final SimpleQueue<T> q = queue;//前面保存数据的队列
        final Observer<? super T> a = actual;//指向LambdaObserver实例

        for (;;) {
            if (checkTerminated(done, q.isEmpty(), a)) {
                return;
            }
            for (;;) {
                boolean d = done;
                T v;
                 v = q.poll();//从队列里面取出数据
               //、、、省略一堆判断、、、
               //往下发送
               //去LambdaObserver这一层,前往第3步
                a.onNext(v);
            }
           //、、、省略一堆判断、、、
        }
    }
 //第3步
@Override
public void onNext(T t) {
    if (!isDisposed()) {
        try {
            //onNext就是例子里面传进来的Function,如第4步
            onNext.accept(t);
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            get().dispose();
            onError(e);
        }
    }
}
//第4步
LambdaObserver observer =
            (LambdaObserver) observableObserveOn.subscribe(new Consumer<Integer>() {
        @Override
        public void accept(Integer integer) throws Exception {
             System.out.println(integer);
        }
    });

至此,整个流程就解析完了。
总结一下:
1)套娃:ObservableObserveOn——>ObservableSubscribeOn——>ObservableMap——>ObservableCreate——>ObservableOnSubscribe,遇到observeOn,就把需要切换线程信息存起来,并不执行,发送数据的时候才使用。
2)之后调用subscribe,一层一层地解,每解一层就生成对应的Observer对象。最后解完第一个套娃,又生成新的套娃:CreateEmitter——>MapObserver——>SubscribeOnObserver——>ObserveOnObserver——>LambdaObserver,遇到subscribeOn,就直接执行切换线程,这是一个从下游往上游执行的过程,所以流下游的subscribeOn会被上游的覆盖,最终影响数据发送的就只有一个subscribeOn就是离源头最近的那个。
3)最后解第二个套娃,从源头一层一层地调用Observer的onNext()方法向下游发送数据

相关文章

网友评论

      本文标题:RxJava执行流程分析

      本文链接:https://www.haomeiwen.com/subject/hxfsmhtx.html