美文网首页
2019-11-08 Rxjava 源码解析<3>

2019-11-08 Rxjava 源码解析<3>

作者: 猫KK | 来源:发表于2019-11-11 16:36 被阅读0次

    上一篇,我们看了如何使用.subscribeOn(Schedulers.io())方法就能使subscribe在子线程中运行,接下来继续看,如何在切回主线程

        var sources = object : ObservableOnSubscribe<String> {
                override fun subscribe(emitter: ObservableEmitter<String>) {
                    emitter.onNext("下一步")
                    emitter.onComplete()
                }
            }
            var observable = Observable.create(sources)
            var observable1 = observable.subscribeOn(Schedulers.io())
            var observable2 = observable1.observeOn(AndroidSchedulers.mainThread())
            var observer = object :Observer<String>{
                override fun onComplete() {
                }
    
                override fun onSubscribe(d: Disposable) {
                }
    
                override fun onNext(t: String) {
                }
    
                override fun onError(e: Throwable) {
                }
            }
            observable2.subscribe(observer)
    

    前面都分析了,看observable1.observeOn(AndroidSchedulers.mainThread())做了什么

        public final Observable<T> observeOn(Scheduler scheduler) {
           //bufferSize() 缓冲大小
            return observeOn(scheduler, false, bufferSize());
        }
    
        public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            //做判断
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            ObjectHelper.verifyPositive(bufferSize, "bufferSize");
            //返回ObservableObserveOn对象
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
        }
    

    所以observable1.observeOn(AndroidSchedulers.mainThread())返回的是ObservableObserveOn对象,那么当调用 observable2.subscribe(observer)方法时,就是调用ObservableObserveOn.subscribe()方法,根据前两篇可以知道,就会进入ObservableObserveOn的subscribeActual()方法中

        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            //判断scheduler的类型
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                Scheduler.Worker w = scheduler.createWorker();
    
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    

    首先会判断scheduler的类型,scheduler的类型就是AndroidSchedulers.mainThread()返回的对象,看这个返回什么

        public static Scheduler mainThread() {
            //返回MAIN_THREAD的值
            return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
        }
    
        //这里会调用call方法,所以返回MainHolder.DEFAULT
        private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
                new Callable<Scheduler>() {
                    @Override public Scheduler call() throws Exception {
                        return MainHolder.DEFAULT;
                    }
                });
    
        //静态内部类,所以返回的是HandlerScheduler对象
        private static final class MainHolder {
            static final Scheduler DEFAULT
                = new HandlerScheduler(new Handler(Looper.getMainLooper()), false);
        }
    

    通过上面可以知道scheduler是HandlerScheduler对象,其中HandlerScheduler是直接继承Scheduler,所以scheduler不是TrampolineScheduler类型,回到subscribeActual()方法中

        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            //前面分析了,这里是false,会进入else分支
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                //通过scheduler生成Worker
                Scheduler.Worker w = scheduler.createWorker();
                //绑定,其中source是上一层的observable,在这里就是observable1
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    

    根据前面的分析source.subscribe()就会进入ObserveOnObserver的onSubscribe方法中

        //ObserveOnObserver的onSubscribe方法
    
           @Override
            public void onSubscribe(Disposable d) {
                if (DisposableHelper.validate(this.upstream, d)) {
                    this.upstream = d;
                    //....
    
                    //初始化queue
                    queue = new SpscLinkedArrayQueue<T>(bufferSize);
                    //调用observer的onSubscribe方法,downstream就是我们的observer
                    downstream.onSubscribe(this);
                }
            }
    

    这个方法主要就是初始化queue,根据前面,可以知道,当调用onNext,也会来到这里的onNext

        //ObserveOnObserver的onNext
    
           @Override
            public void onNext(T t) {
                //判断是否停止,为false
                if (done) {
                    return;
                }
                //判断是否为不是异步
                if (sourceMode != QueueDisposable.ASYNC) {
                    //将该消息放入queue中
                    queue.offer(t);
                }
                //调度
                schedule();
            }
    
            void schedule() {
                if (getAndIncrement() == 0) {
                    //调用worker.schedule()方法
                    worker.schedule(this);
                }
            }
    

    又回到了worker.schedule方法,其中worker是通过HandlerScheduler.createWorker()生成的

        @Override
        public Worker createWorker() {
            //返回HandlerWorker对象
            return new HandlerWorker(handler, async);
        }
    

    当前worker是HandlerWorker对象,看schedule()方法

            //注意,这里是三个参数的方法,调用的时候是调用一个参数的方法
            //是因为在父类中一个参数的方法会调用三个参数的方法,所以来到这里
            @Override
            @SuppressLint("NewApi") // Async will only be true when the API is available to call.
            public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
                //做判断
                if (run == null) throw new NullPointerException("run == null");
                if (unit == null) throw new NullPointerException("unit == null");
                //判断是否解绑
                if (disposed) {
                    return Disposables.disposed();
                }
               //检验run
                run = RxJavaPlugins.onSchedule(run);
                //创建ScheduledRunnable对象,其中ScheduledRunnable是实现Runnable
                ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
                //生成Message
                Message message = Message.obtain(handler, scheduled);
                message.obj = this; // Used as token for batch disposal of this worker's runnables.
                //async为false
                if (async) {
                    message.setAsynchronous(true);
                }
                //通过handler发送消息
                handler.sendMessageDelayed(message, unit.toMillis(delay));
    
                // Re-check disposed state for removing in case we were racing a call to dispose().
                if (disposed) {
                    handler.removeCallbacks(scheduled);
                    return Disposables.disposed();
                }
    
                return scheduled;
            }
    

    通过handler发送一个消息,所以会走到ScheduledRunnable的run方法中,注意,这里的handler的通过Looper.getMainLooper()方法获取,是主线程的handler,所以run是运行在主线程中的,这样就实现了从其他线程切换到主线程中

          //ScheduledRunnable的run方法
    
           @Override
            public void run() {
                try {
                    //调用delegate的run方法
                    delegate.run();
                } catch (Throwable t) {
                    RxJavaPlugins.onError(t);
                }
            }
    

    其中delegate就是我们传过来的run对象,也就是worker.schedule(this);传过来的this,所以又会回到ObserveOnObserver的run方法中

            @Override
            public void run() {
                //outputFused默认为false
                if (outputFused) {
                    drainFused();
                } else {
                    drainNormal();
                }
            }
        
            void drainNormal() {
                int missed = 1;
                //获取queue,是在onSubscribe方法中初始化的
                final SimpleQueue<T> q = queue;
                //获取Observer,也就是我们的observer
                final Observer<? super T> a = downstream;
                //死循环
                for (;;) {
                    if (checkTerminated(done, q.isEmpty(), a)) {
                        return;
                    }
                    //双重死循环
                    for (;;) {
                        //判断是否已经停止
                        boolean d = done;
                        T v;
    
                        try {
                            //获取onNext(T t)中传过来的参数
                            v = q.poll();
                        } catch (Throwable ex) {
                            Exceptions.throwIfFatal(ex);
                            disposed = true;
                            upstream.dispose();
                            q.clear();
                            a.onError(ex);
                            worker.dispose();
                            return;
                        }
                        //判断是否获取到
                        boolean empty = v == null;
                        //如果没有获取到,则调用observer.onComplete()方法,并跳出循环
                        if (checkTerminated(d, empty, a)) {
                            return;
                        }
                        //没有获取到,跳出循环
                        if (empty) {
                            break;
                        }
                        //调用observer.onNext并将参数传过去
                        a.onNext(v);
                    }
    
                    missed = addAndGet(-missed);
                    if (missed == 0) {
                        break;
                    }
                }
            }
    

    之后就走到我们自己的observer的onNext方法中,并且,此时是运行在主线程中的,这样就实现了从其他线程切换到主线程的功能。

    相关文章

      网友评论

          本文标题:2019-11-08 Rxjava 源码解析<3>

          本文链接:https://www.haomeiwen.com/subject/whgwbctx.html