美文网首页RxJava2学习记录
RxJava2笔记(四、观察者线程切换)

RxJava2笔记(四、观察者线程切换)

作者: WonderSky_HY | 来源:发表于2019-01-09 16:31 被阅读10次

    在上一篇文章RxJava2笔记(三、订阅线程切换)中,我们分析了订阅线程是如何切换的,即调用subscribeOn()来切换订阅线程时都执行了哪些操作。在本文我们将继续介绍观察者线程切换,也就是将线程由子线程切换回UI线程。

    继续在前面的基础上修改代码,在订阅线程切换方法后调用observeOn(AndroidSchedulers.mainThread())将线程切换回主线程:

    Observable.create(new ObservableOnSubscribe<Integer>() {
        @Override
        public void subscribe(ObservableEmitter<Integer> emitter) {
            Log.i(TAG, "subscribe--运行线程:" + Thread.currentThread().getName());
            emitter.onNext(1);
            emitter.onNext(2);
            try {
                TimeUnit.MILLISECONDS.sleep(1000);
            } catch (InterruptedException e) {
                e.printStackTrace();
            }
            emitter.onNext(3);
            emitter.onComplete();
        }
    })
        //将线程由UI线程切换到子线程执行IO请求
        .subscribeOn(Schedulers.io())
        //将线程切换回UI线程,方面后续操作更新UI界面
        .observeOn(AndroidSchedulers.mainThread())
        .subscribe(observer);
    

    看下运行结果:

    I/MainActivity: onSubscribe--运行线程:main
    I/MainActivity: subscribe--运行线程:RxCachedThreadScheduler-1
    I/MainActivity: onNext: 1 --运行线程:main
    I/MainActivity: onNext: 2 --运行线程:main
    I/MainActivity: onNext: 3 --运行线程:main
    I/MainActivity: onComplete--运行线程:main
    

    可以看到,subscribe方法运行在子线程中(也就是订阅线程运行在名为RxCachedThreadScheduler-1的一个子线程中,上文提到该线程是由RxJava实现的一个工厂类创建的),而observer运行在名为main的线程中,这个main线程就是UI线程。


    看完了输出结果,接下来就看看这个observeOn(AndroidSchedulers.mainThread())是如何将线程切换到UI线程的,点进去看下:

    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler) {
        //当出现异常时,默认无延迟发送错误。bufferSize()是缓冲区大小,RxJava设置了一个默认大小,为128。
        return observeOn(scheduler, false, bufferSize());
    }
    
    @CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        //包装类,保存上游observable
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
    

    熟悉的套路,跟调用subscribe.on方法时类似,只是多了一个验证缓冲区大小不为空的代码,这些我们都略过,直接看ObservableObserveOn这个类:

    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
        final boolean delayError;
        final int bufferSize;
        public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
            super(source);
            this.scheduler = scheduler;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            //如果传入的调度器是TrampolineScheduler,则不切换线程,在当前线程调度
            //但是调度的任务并不是马上执行,而是等待当前任务执行完毕再执行
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                //创建工作者worker
                Scheduler.Worker w = scheduler.createWorker();
                //上游的subscribe,该方法会触发上游的subscribeActual,
                //ObserveOnObserver也是一个包装类,保存下游的observer
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
        //......代码省略
    }
    

    熟悉的装饰器模式:

    • 1、ObservableObserveOn继承了AbstractObservableWithUpstream,其继承的ObservableSource类型的source成员变量用于保存上游的observable。
    • 2、AndroidSchedulers.mainThread()为本次传入的scheduler,负责将线程切换到UI线程。
    • 3、下游调用subscribe方法是触发=>当前observable的subscribeActual方法=>触发上游observable的subscribe方法=>传入参数包装类ObserveOnObserver(包装了下游的观察者observer)。

    这里简要介绍下步骤3:
    这里的source是上游的observable对象,source.subscribe()方法实际调用的是上游observable对象的subscribeActual方法,并将下游observer对象的包装类ObserveOnObserver作为参数传递进去,在上游observable对象的subscribeActual方法内,调用ObserveOnObserver包装类中的onSubscribe,onNext等方法,进而调用下游observer的onSubscribe,onNext等方法。

    接下来看下ObserveOnObserver这个下游observer包装类:

    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
            implements Observer<T>, Runnable {
        private static final long serialVersionUID = 6576896619930983584L;
        //下游observer
        final Observer<? super T> actual;
        //调度工作者
        final Scheduler.Worker worker;
        //当订阅任务执行出错时,是否延迟发送错误消息,默认为false,也就是不延迟
        final boolean delayError;
        //缓冲区大小,缓存上游发送的事件
        final int bufferSize;
        //存储上游observable发出的数据队列
        SimpleQueue<T> queue;
        //存储管理下游observer的订阅状态disposable
        Disposable s;
        //订阅任务执行出错时,存储错误信息
        Throwable error;
        //订阅任务是否终止
        volatile boolean done;
        //订阅任务是否被取消
        volatile boolean cancelled;
        //任务执行模式----同步还是异步
        int sourceMode;
        //是否输出融合(通常情况下该选项为false)
        boolean outputFused;
    
        ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
            this.actual = actual;
            this.worker = worker;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    
        @Override
        public void onSubscribe(Disposable s) {
            //当前的disposable为null,上游subscribe产生的disposable不为null,则验证通过
            if (DisposableHelper.validate(this.s, s)) {
                this.s = s;
                //如果订阅时获取的disposable对象s是QueueDisposable类型的
                if (s instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    //新建QueueDisposable队列并将订阅时获取的disposable对象s强转为QueueDisposable,然后赋值给queue
                    QueueDisposable<T> qd = (QueueDisposable<T>) s;
                    //获取任务运行模式
                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                    //判断运行模式,并且调用下游observer的onSubscribe方法将当前observer在订阅时产生的disposable传递给下游observer
                    if (m == QueueDisposable.SYNC) {
                        sourceMode = m;
                        queue = qd;
                        //为true 使得接下来的onXXX等方法均不会执行
                        done = true;
                        actual.onSubscribe(this);
                        //worker直接调度任务
                        schedule();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        sourceMode = m;
                        queue = qd;
                        actual.onSubscribe(this);
                        //在异步模式下,等待onXXX方法中的worker调度
                        return;
                    }
                }
                //否则创建一个支持单一生产者单一消费者的队列
                queue = new SpscLinkedArrayQueue<T>(bufferSize);
                //调用下游observer的onSubscribe方法将当前observer在订阅时产生的disposable传递给下游observer
                actual.onSubscribe(this);
            }
        }
    
        //......代码省略
    }
    

    onSubscribe方法调用后,就开始执行onXXX方法了,首先是onNext方法,这个方法可以反复调用:

    @Override
    public void onNext(T t) {
        //订阅模式是同步模式或者执行过onComplete/onError方法时,此时done为true,直接返回
        if (done) {
            return;
        }
    
        if (sourceMode != QueueDisposable.ASYNC) {
            //将上游数据源发射的数据添加到缓存队列中
            queue.offer(t);
        }
        //开始worker调度任务
        //(这里面调用了handler,将数据发送到主线程所在的消息队列,进而更新UI界面,这里稍后分析)
        schedule();
    }
    

    其次是onError和onComplete,这两个方法只能执行一次并且是互斥的:

    @Override
    public void onError(Throwable t) {
        //如果任务状态已经是终止状态,再执行该任务是就会抛出异常
        if (done) {
            RxJavaPlugins.onError(t);
            return;
        }
        error = t;
        //设置任务状态为终止状态
        done = true;
        //worker任务调度
        schedule();
    }
    
    @Override
    public void onComplete() {
        //如果任务是终止状态,直接返回
        if (done) {
            return;
        }
        //设置任务状态为终止状态
        done = true;
        //worker任务调度
        schedule();
    }
    

    这里onNext,onComplete,onError最后都调用schedule()来调度任务:

    //调用worker.schedule(this)开始任务调度
    void schedule() {
        //这里通过getAndIncrement() == 0原子性的保证了worker.schedule(this)在调度完之前不会再次被调度
        if (getAndIncrement() == 0) {
            worker.schedule(this);
        }
    }
    

    上面在执行worker.schedule(this)时传入了this,也就是当前对象ObserveOnObserver,ObserveOnObserver类实现了Runnable接口,因此worker.schedule(this)调度的任务就是自己run()实现方法中的任务:

    @Override
    public void run() {
        //outputFused通常情况下为false
        if (outputFused) {
            drainFused();
        } else {
            drainNormal();
        }
    }
    

    drainFused()通常情况下不会执行,我们只需要关注drainNormal()方法即可,在查看该方法之前,先看下drainNormal()内部调用的一个验证方法checkTerminated(boolean d, boolean empty, Observer<? super T> a),该方法主要是检测任务是否已终止:

    boolean checkTerminated(boolean d, boolean empty, Observer<? super T> a) {
        //如果订阅被取消,清空数据缓存队列
        if (cancelled) {
            queue.clear();
            return true;
        }
        
        //这个d就是done
        if (d) {
            //done为true时,有两种情况,在onNext调度完毕后执行onComplete或onError
            Throwable e = error;
            if (delayError) {
                //如果是延迟发送错误的情况,必须等到queue(缓存上游observable发出的数据)为空的情况下才能发送错误(有错误的情况下)
                if (empty) {
                    if (e != null) {
                        a.onError(e);
                    } else {
                        a.onComplete();
                    }
                    //终止worker任务调度
                    worker.dispose();
                    return true;
                }
            } else {
                //不延迟发送错误时,直接调用
                if (e != null) {
                    //如果任务执行出错,即调用了onNext方法,清空queue
                    queue.clear();
                    //调用下游observer的onError
                    a.onError(e);
                    //终止worker调度
                    worker.dispose();
                    return true;
                } else
                if (empty) {
                    //任务正常执行,未出现错误
                    //调用下游observer的onComplete,并终止worker调度
                    a.onComplete();
                    worker.dispose();
                    return true;
                }
            }
        }
        //否则返回false,任务还未终止
        return false;
    }
    

    任务结束情况分为以下两种

    • 1、订阅被取消,即cancelled==true,此时任务为终止状态。
    • 2、任务运行标记done==true,即onNext调用完毕,调用onComplete正常结束;或者在onNext调用过程中出现错误,调用了onNext,此时是异常结束,会发送异常信息。

    说完了这个方法,我们继续看drainNormal()这个方法:

    void drainNormal() {
        //这个变量只是一个控制变量,用来确保drainNormal()方法能被原子性调用
        int missed = 1;
    
        final SimpleQueue<T> q = queue;
        final Observer<? super T> a = actual;
        
        //外层死循环
        for (;;) {
            //根据数据缓存队列是否为空检查任务是否已终止
            //为空就会直接跳出外层循环,方法执行结束,内层的循环也不会执行
            if (checkTerminated(done, q.isEmpty(), a)) {
                return;
            }
    
            //内层死循环,负责具体的任务处理
            for (;;) {
                boolean d = done;
                T v;
    
                try {
                    //从队列中获取数据
                    v = q.poll();
                } catch (Throwable ex) {
                    //任务执行出现错误,抛出致命错误信息,结束循环,终止本次任务
                    Exceptions.throwIfFatal(ex);
                    //终止订阅
                    s.dispose();
                    //清空缓存队列
                    q.clear();
                    a.onError(ex);
                    //停止worker调度
                    worker.dispose();
                    return;
                }
                boolean empty = v == null;
                //判断从队列中取出的数据是否为空,即判断队列是否为空
                if (checkTerminated(d, empty, a)) {
                    return;
                }
                //队列中没有了数据,直接退出
                if (empty) {
                    break;
                }
                //调用下游observer的onNext(onComplete和onError均在checkTerminated方法里调用)
                a.onNext(v);
            }
            //这里主要是确保在同一时间只有一个worker.schedule(this)正在执行。
            //missed变量在方法最开始初始化为1,这里missed会被重置为0,这样下面的missed==0成立,当前任务结束。
            //addAndGet(-missed)方法也会将AtomicInteger内部的VALUE值设置为0。
            //同时,run()方法中的判定方法getAndIncrement() == 0成立,继续执行下一个worker.schedule(this)。
            //如果程序没有走到这里,那么missed==0也就不成立,相应的run()方法中的getAndIncrement() == 0不成立,也就不会执行下一个worker.schedule(this)。
            //这样就原子性的确保了同一时间只有一个worker.schedule(this)正在执行,即同一时间只有一个drainNormal()方法在执行。
            missed = addAndGet(-missed);
            //程序走到这里,表示当前任务正常结束,退出循环。
            //run()方法继续执行,getAndIncrement() == 0成立,开始执行下一个worker.schedule(this)。
            if (missed == 0) {
                break;
            }
        }
    }
    

    到这里,ObserveOnObserver这个下游observer包装类也就介绍的差不多了,简要总结下:

    • 1、构造方法接收了外部创建的工作者worker,负责任务调度。
    • 2、内部定义了一个缓存队列,缓存上游observable发射的数据。
    • 3、ObserveOnObserver的父类继承了AtomicInteger类,这样在多线程环境中,可以通过AtomicInteger的CAS操作确保线程安全。
    • 4、ObserveOnObserver类实现了Runnable接口,在run()方法内,使用worker进行任务调度,并通过getAndIncrement()==0来确保worker任务调度的原子性操作。
    • 5、drainNormal()为实际调度执行的方法,其内部声明了一个局部变量missed,并初始化为1。然后开始死循环,在外层循环末尾处(内层循环正常结束,消息队列中的数据都被取了出来并处理完毕,此时消息队列为空,结束内层循环),调用missed = addAndGet(-missed)后,ObserveOnObserver内部的VALUE值被重置为0,最后判定missed==0成立,程序跳出死循环,drainNormal()方法执行完毕。最后run()方法继续执行,此时getAndIncrement()==0成立,开始下一个worker任务调度。
      如果在循环途中因为某些原因(任务已执行完毕或出现异常)而中途退出循环没能执行到循环末尾,那么其内部的VALUE值就不为0,getAndIncrement()==0不成立,不会执行worker调度,因此整个订阅也就结束了。

    在上一篇文章中,订阅线程最终是通过调度器来执行具体切换过程的。同样的,对于观察者线程切换执行的也是类似的过程。前面分析道,ObservableObserveOn构造方法接收我们传入的调度器scheduler,并通过scheduler创建工作者worker,将其传入到ObserveOnObserver的构造方法中,最后在run()方法中执行具体的任务调度。因此观察者的线程切换肯定是发生在worker的调度过程中。

    先从observeOn(AndroidSchedulers.mainThread())的参数AndroidSchedulers.mainThread()开始,点进去看下:

    public final class AndroidSchedulers {
    
        private static final class MainHolder {
            //构造一个与UI线程关联的Handler,并将其作为参数构造HandlerScheduler调度器
            static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
        }
    
        private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
                new Callable<Scheduler>() {
                    @Override public Scheduler call() throws Exception {
                        return MainHolder.DEFAULT;
                    }
                });
    
        /** A {@link Scheduler} which executes actions on the Android main thread. */
        //返回与主线程相关的scheduler调度器
        public static Scheduler mainThread() {
            return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
        }
    
        /** A {@link Scheduler} which executes actions on {@code looper}. */
        //自己指定线程Looper自定义观察者线程切换
        public static Scheduler from(Looper looper) {
            if (looper == null) throw new NullPointerException("looper == null");
            return new HandlerScheduler(new Handler(looper));
        }
    
        private AndroidSchedulers() {
            throw new AssertionError("No instances.");
        }
    }
    

    AndroidSchedulers.mainThread()最终返回HandlerScheduler对象,HandlerScheduler也是继承自Scheduler,其构造方法接收一个Handler类型的参数,这个Handler通过Looper.getMainLooper()与UI线程关联起来,这样通过Handler发送的消息就能被UI线程接收,从而更新UI界面。具体返回HandlerScheduler对象的步骤与上文所介绍的订阅线程切换生成IoScheduler一致,不再详述。

    熟悉Handler的童鞋都知道,Handler在安卓的消息机制中占有重要的地位,它贯穿着整个安卓的体系,在很多地方我们都能见到它的身影。在刚开始接触安卓开发的时候我们都写过类似下面的代码:

    private Handler handler = new Handler(){
        @Override
        public void handleMessage(Message msg) {
            super.handleMessage(msg);
            //更新UI线程代码
            //获取子线程发送的message消息里面的请求数据,更新UI界面
            //操作省略
        }
    };
    
    public void getData(){
        new Thread(new Runnable() {
            @Override
            public void run() {
                //1、网络请求数据并返回
                //2、获取Message对象,并将请求到的数据用Message包裹起来
                //3、调用handler的sendMessage等方法发送Message
            }
        }).start();
    }
    

    随着现在各种各样网络请求框架的出现,大大简化了我们网络请求更新UI的操作,上面的代码我们很少再去写了,但并不意味着它就不重要了,尤其是Handler。其实很多的网络请求框架都只不过是将上面的操作封装起来了而已,RxJava虽然与网络请求无关,但在观察者线程切换里面同样也是将上面的过程封装起来,方便我们使用。

    我们接着来看HandlerScheduler这个类:

    final class HandlerScheduler extends Scheduler {
        private final Handler handler;
    
        HandlerScheduler(Handler handler) {
            this.handler = handler;
        }
    
        //......代码省略
    
        @Override
        public Worker createWorker() {
            return new HandlerWorker(handler);
        }
        //......代码省略
    }
    

    HandlerScheduler继承自Scheduler,并实现了createWorker()方法生成任务调度worker,这里返回的是一个HandlerWorker对象,前面提到的worker.schedule(this)中的worker实际上就是这个HandlerWorker。

    private static final class HandlerWorker extends Worker {
        //保存外部传进来的Handler,这里保存的是与UI线程关联的Handler,具体参见上面介绍
        private final Handler handler;
        //事件订阅状态标志位
        private volatile boolean disposed;
    
        HandlerWorker(Handler handler) {
            this.handler = handler;
        }
    
        //worker.schedule(this)最终调用的方法,这里delay为0,表示handler无延迟发送消息
        @Override
        public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");
            //如果订阅已终止,返回带有终止状态的disposable
            if (disposed) {
                return Disposables.disposed();
            }
            //对任务做一些自己的处理(默认情况下没做任何处理)
            run = RxJavaPlugins.onSchedule(run);
            //包装类
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            //获取message,设置其target成员为handler,callback成员为scheduled
            Message message = Message.obtain(handler, scheduled);
            message.obj = this; // Used as token for batch disposal of this worker's runnables.
            //将消息发送到UI关联的消息队列中,此时handler中的queue是UI线程中的queue
            //(参考new Handler(Looper.getMainLooper()))
            handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
    
            // Re-check disposed state for removing in case we were racing a call to dispose().
            //再次检查订阅是否被终止,若已被终止,移除handler中的callback,并返回带有终止状态的disposable
            if (disposed) {
                handler.removeCallbacks(scheduled);
                return Disposables.disposed();
            }
    
            return scheduled;
        }
        
        //订阅终止
        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacksAndMessages(this /* token */);
        }
    
        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
    

    在HandlerWorker中,将任务run用ScheduledRunnable包装起来,在设置到Message的callback中,生成一个Message,熟悉handler的都知道,handler通过dispatchMessage(Message msg)方法来进行消息的分发处理,这个方法的处理顺序如下:

    • 1、判断message中的callback(该callback是一个Runnable)是否为空,若不为空,调用handleCallback(msg),在handleCallback(msg)中,直接执行message.callback.run()。若为空,进入步骤2。
    • 2、判断handler中的callback(该callback是Handler内部的一个内部接口,由我们自己实现)是否为空,若不为空,执行callback.handleMessage(msg)。若为空,进入步骤3。
    • 3、若步骤1和步骤2中的判断均不成立,执行handleMessage(msg),也就是我们前面写的样例代码。
      在这里由于Message中的callback不为空,因此执行执行callback中的run()方法,也就是前面提到的ObserveOnObserver(实现了Runnable接口)内部的run()方法,进而调用drainNormal()方法,最终调用下游observer的onNext。

    生成Message后,调用handler的sendMessageDelayed方法发送消息(这里delay参数为0,因此是立即发送),由于这里的handler是与UI线程关联在一起的,因此ObserveOnObserver(实现了Runnable接口)内部的run()方法就被发送到了UI线程中的消息队列中,最终通过handler的dispatchMessage(Message msg)方法调用handleCallback(msg),最后调用message.callback.run()执行run()方法里面的代码,完成UI线程更新。

    在文章的最后,我们来看下这个run任务的包装类ScheduledRunnable:

    private static final class ScheduledRunnable implements Runnable, Disposable {
        private final Handler handler;
        private final Runnable delegate;
    
        private volatile boolean disposed;
    
        //保存外部传入的handler和runnable任务
        ScheduledRunnable(Handler handler, Runnable delegate) {
            this.handler = handler;
            this.delegate = delegate;
        }
    
        @Override
        public void run() {
            try {
                //执行run方法,即传入的ObserveOnObserver(实现了Runnable接口)内部的run()方法
                delegate.run();
            } catch (Throwable t) {
                IllegalStateException ie =
                        new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
                RxJavaPlugins.onError(ie);
                Thread thread = Thread.currentThread();
                thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
            }
        }
    
        //订阅终止,从handler中移除该runnable
        //(实际上是从UI线程内部的队列中将包装这个runnable的message移除,如果这个message还未处理的话)
        @Override
        public void dispose() {
            disposed = true;
            handler.removeCallbacks(this);
        }
    
        @Override
        public boolean isDisposed() {
            return disposed;
        }
    }
    

    至此,整个观察者线程切换也就介绍完了,最后我们再来梳理下思路:

    • 1、observeOn(AndroidSchedulers.mainThread())返回一个ObservableObserveOn对象,该对象内部(source)保存了上游传递过来的observable。
    • 2、下游执行subscribe时,实际上执行的是ObservableObserveOn对象内部的subscribeActual方法,该方法内部首先判断调度器的类型(具体参看上面介绍),这里传入的调度器类型是HandlerScheduler。
    • 3、将下游observer对象用ObserveOnObserver包装起来,生成ObserveOnObserver对象。
    • 4、执行source.subscribe方法,并将步骤3中生成的ObserveOnObserver对象作为参数传递进去。
    • 5、步骤4中的subscribe方法实际上执行的是上游observable的subscribeActual方法,在该方法中ObserveOnObserver内部的onSubscribe方法首先被执行,在这个onSubscribe方法内部调用下游observer的onSubscribe方法,建立订阅关系。
    • 6、执行完onSubscribe方法后,ObserveOnObserver内部的onNext和onComplete或者onNext依次被执行(onNext可被反复调用)。
    • 7、在ObserveOnObserver内部的onNext方法中,将上游发射的数据缓存到队列中,然后调用schedule方法,通过HandlerScheduler生成的worker对象(HandlerWorker)开始调度任务,通过步骤1中调用AndroidSchedulers.mainThread()时生成的关联UI线程的handler将Runnable任务发送到UI线程的消息队列中。
    • 8、ObserveOnObserver实现了Runnable接口,在其实现方法run()中,最终调用了drainNormal()方法,该方法内有两个嵌套的循环,且都是死循环。外层循环结束的条件是当次订阅任务被终止(外部取消或者任务执行完毕或任务执行出错),内层循环结束的条件是当次订阅任务执行完毕或执行出现错误。在内层循环内部,不断的从缓存队列中取出数据,最后调用下游observer的onNext方法。在步骤7中,Runnable任务最后被发送到了UI线程内的消息队列中,因此这个run()方法也是运行在UI线程,最终在UI线程中我们可以用这些数据更新UI界面。
    • 9、ObserveOnObserver内部的onComplete和onError执行过程与onNext类似,区别是onComplete和onNext只会调用一次并且是互斥调用(参见上面源码分析)。同样,最终也会调用drainNormal()方法,只不过在drainNormal()方法的外层死循环内,首先调用的是checkTerminated方法,用来判断当前订阅任务是否已被终止,下游observer的onComplete和onError就是在这个方法里调用的。

    下一章RxJava2笔记(五、订阅流程梳理以及线程切换次数有效性)将对前面做一个流程梳理,以此来结束RxJava的学习。

    相关文章

      网友评论

        本文标题:RxJava2笔记(四、观察者线程切换)

        本文链接:https://www.haomeiwen.com/subject/aekvrqtx.html