美文网首页
2019-11-08 Rxjava 源码解析<2>

2019-11-08 Rxjava 源码解析<2>

作者: 猫KK | 来源:发表于2019-11-08 20:39 被阅读0次

    上一篇描述了如何走到observer的onNext方法,下面加上进程调度,再来看看如何调度的

            var sources = object : ObservableOnSubscribe<String> {
                override fun subscribe(emitter: ObservableEmitter<String>) {
                    emitter.onNext("下一步")
                    emitter.onComplete()
                }
            }
            var observable = Observable.create(sources)
            var observable1 = observable.subscribeOn(Schedulers.io())
            var observable2 = observable1.observeOn(AndroidSchedulers.mainThread())
            var observer = object :Observer<String>{
                override fun onComplete() {
                }
    
                override fun onSubscribe(d: Disposable) {
                }
    
                override fun onNext(t: String) {
                }
    
                override fun onError(e: Throwable) {
                }
            }
            observable2.subscribe(observer)
    

    前面已经分析了observable的创建过程,下面来分析observable1的创建过程,也就是observable.subscribeOn(Schedulers.io())干了什么,通过前面的分析知道observable就是ObservableCreate对象,但是ObservableCreate中没有subscribeOn方法,所以到父类中找

        public final Observable<T> subscribeOn(Scheduler scheduler) {
            //判断scheduler是否为null
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            //判断onObservableAssembly是否为null
            //第一次默认为null,所以返回的是ObservableSubscribeOn
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
    

    所以observable1就是一个ObservableSubscribeOn对象,里面保存有observable和scheduler,所以当这时不考虑observeOn(AndroidSchedulers.mainThread()),这一段代时,直接调用observable1.subscribe(observer)是,就像下面这样

            var sources = object : ObservableOnSubscribe<String> {
                override fun subscribe(emitter: ObservableEmitter<String>) {
                    emitter.onNext("下一步")
                    emitter.onComplete()
                }
            }
            var observable = Observable.create(sources)
            var observable1 = observable.subscribeOn(Schedulers.io())
    //        var observable2 = observable1.observeOn(AndroidSchedulers.mainThread())
            var observer = object : Observer<String> {
                override fun onComplete() {
                }
    
                override fun onSubscribe(d: Disposable) {
                }
    
                override fun onNext(t: String) {
                }
    
                override fun onError(e: Throwable) {
                }
            }
            observable1.subscribe(observer)
    

    通过前面分析,当调用observable1.subscribe(observer)时,就会回调到ObservableSubscribeOn的subscribeActual方法中

        //ObservableSubscribeOn 的subscribeActual方法
        public void subscribeActual(final Observer<? super T> observer) {
            //创建SubscribeOnObserver对象
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
           //调用observer的onSubscribe方法
            observer.onSubscribe(parent);
            //重要的是括号里面的内容
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    

    到上面,重要的是scheduler.scheduleDirect(new SubscribeTask(parent))这一句代码,其中scheduler是Schedulers.io()返回的对象,SubscribeTask是一个实现Runnable的接口,先来看scheduler.scheduleDirect()方法做了什么

        public Disposable scheduleDirect(@NonNull Runnable run) {
            return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
        }
    
        public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            //创建一个Worker
            final Worker w = createWorker();
            //判断onScheduleHandler是否为null,默认为null,所以返回自身
           //这里的run就是前面的SubscribeTask
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            //创建DisposeTask,将Worker 和SubscribeTask传进去
            //DisposeTask实现了Runnable
            DisposeTask task = new DisposeTask(decoratedRun, w);
            //调用schedule方法
            w.schedule(task, delay, unit);
            //返回task
            return task;
        }
    

    createWorker()是一个抽象方法,所以要看scheduler的实例对象,看Schedulers.io()返回什么

        public static Scheduler io() {
            //判断onIoHandler是否为null默认为null,所以返回IO
            return RxJavaPlugins.onIoScheduler(IO);
        }
    

    所以看IO是什么

    public final class Schedulers {
    
        @NonNull
        static final Scheduler IO;
        //...
    
        static final class IoHolder {
            static final Scheduler DEFAULT = new IoScheduler();
        }
        //...
        static {
            //....
            IO = RxJavaPlugins.initIoScheduler(new IOTask());
        }
    }
    

    RxJavaPlugins.initIoScheduler(new IOTask())这一段代码最后会调用IOTask.call 方法

        static final class IOTask implements Callable<Scheduler> {
            @Override
            public Scheduler call() throws Exception {
                return IoHolder.DEFAULT;
            }
        }
    

    最后又会回调到IoHolder.DEFAULT中,所以最后返回的就是IoScheduler对象,
    来看IoScheduler的构造方法做了什么

        public IoScheduler() {
            //调用自身WORKER_THREAD_FACTORY是在static初始化的
            this(WORKER_THREAD_FACTORY);
        }
    
        public IoScheduler(ThreadFactory threadFactory) {
            //将threadFactory赋值
            this.threadFactory = threadFactory;
            //创建AtomicReference,用来持有CachedWorkerPool的引用
            this.pool = new AtomicReference<CachedWorkerPool>(NONE);
            //调用start方法
            start();
        }
    
        @Override
        public void start() {
            //创建CachedWorkerPool
            CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT, threadFactory);
            //将线程池放到pool中
            if (!pool.compareAndSet(NONE, update)) {
                update.shutdown();
            }
        }
    

    所以IoScheduler的构造方法主要是创建一个线程池,并保存,继续看createWorker()方法

        public Worker createWorker() {
            //创建EventLoopWorker,并将前面创建的线程池对象传过去
            return new EventLoopWorker(pool.get());
        }
    

    看看EventLoopWorker构造方法做了什么

        EventLoopWorker(CachedWorkerPool pool) {
                //赋值
                this.pool = pool;
                this.tasks = new CompositeDisposable();
                //获取threadWorker对象
                this.threadWorker = pool.get();
            }
    
        ThreadWorker get() {
                //判断是否解绑
                if (allWorkers.isDisposed()) {
                    return SHUTDOWN_THREAD_WORKER;
                }
                //从expiringWorkerQueue队列中获取,获取到就返回
                while (!expiringWorkerQueue.isEmpty()) {
                    ThreadWorker threadWorker = expiringWorkerQueue.poll();
                    if (threadWorker != null) {
                        return threadWorker;
                    }
                }
    
                // 没有获取到,创建一个返回
                ThreadWorker w = new ThreadWorker(threadFactory);
                //添加到allWorkers中
                allWorkers.add(w);
                return w;
            }
    

    往前看

         public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            //创建一个Worker,这里的Worker就是EventLoopWorker对象
            final Worker w = createWorker();
           //...
            //调用schedule方法
            w.schedule(task, delay, unit);
            //返回task
            return task;
        }
    

    继续看EventLoopWorker.schedule()方法

            @NonNull
            @Override
            public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
                //判断是否解绑
                if (tasks.isDisposed()) {
                    // don't schedule, we are unsubscribed
                    return EmptyDisposable.INSTANCE;
                }
                //调用scheduleActual方法
                //其中threadWorker是构造方法时初始化的
                return threadWorker.scheduleActual(action, delayTime, unit, tasks);
            }
    

    前面在EventLoopWorker的构造方法中已经介绍过threadWorker是如何初始化的了,看threadWorker.scheduleActual()方法做了什么

        public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            //检查并判断,返回自身
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            //创建ScheduledRunnable对象,并将decoratedRun传过去
            //ScheduledRunnable实现了Runnable
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
            //调用parent.add方法,判断返回值
            if (parent != null) {
                if (!parent.add(sr)) {
                    return sr;
                }
            }
    
            Future<?> f;
            try {
                if (delayTime <= 0) {
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                if (parent != null) {
                    parent.remove(sr);
                }
                RxJavaPlugins.onError(ex);
            }
    
            return sr;
        }
    

    parent 是前面传过来的,回溯一下很容易就知道是CompositeDisposable对象,看add()方法

        public boolean add(@NonNull Disposable disposable) {
            //判null
            ObjectHelper.requireNonNull(disposable, "disposable is null");
           //判断是否解绑,默认false
            if (!disposed) {
                synchronized (this) {
                    if (!disposed) {
                        //将disposable添加到OpenHashSet中去,返回true
                        OpenHashSet<Disposable> set = resources;
                        if (set == null) {
                            set = new OpenHashSet<Disposable>();
                            resources = set;
                        }
                        set.add(disposable);
                        return true;
                    }
                }
            }
            disposable.dispose();
            return false;
        }
    

    所以前面的parent.add(sr)返回true,所以继续往下走

         public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
           //....
            Future<?> f;
            try {
                //判断是否延迟,根据上面可以知道delayTime为0
                if (delayTime <= 0) {
                    //执行submit方法,executor是在构造方法中初始化的
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                if (parent != null) {
                    parent.remove(sr);
                }
                RxJavaPlugins.onError(ex);
            }
    
            return sr;
        }
    

    继续看executor怎么初始化的

    public class NewThreadWorker extends Scheduler.Worker implements Disposable {
        private final ScheduledExecutorService executor;
    
        volatile boolean disposed;
    
        public NewThreadWorker(ThreadFactory threadFactory) {
            //初始化
            executor = SchedulerPoolFactory.create(threadFactory);
        }
    //....
    }
    
        public static ScheduledExecutorService create(ThreadFactory factory) {
            //返回一个线程池
            final ScheduledExecutorService exec = Executors.newScheduledThreadPool(1, factory);
            tryPutIntoPool(PURGE_ENABLED, exec);
            return exec;
        }
    

    注意,前面说的threadWorker是NewThreadWorker的子类,所以executor就是一个线程池,executor.submit方法就会走传过去的Runnable的run方法,所以就会走到ScheduledRunnable.run 方法中

        public void run() {
            lazySet(THREAD_INDEX, Thread.currentThread());
            try {
                try {
                   //actual就是ScheduledRunnable构造方法中传过来的
                    actual.run();
                } catch (Throwable e) {
                    // Exceptions.throwIfFatal(e); nowhere to go
                    RxJavaPlugins.onError(e);
                }
            } finally {
                lazySet(THREAD_INDEX, null);
                Object o = get(PARENT_INDEX);
                if (o != PARENT_DISPOSED && compareAndSet(PARENT_INDEX, o, DONE) && o != null) {
                    ((DisposableContainer)o).delete(this);
                }
    
                for (;;) {
                    o = get(FUTURE_INDEX);
                    if (o == SYNC_DISPOSED || o == ASYNC_DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
                        break;
                    }
                }
            }
        }
    

    所以最后会回到我们传过来的Runnable的run 方法中,往前看,我们传过来的Runnable是什么

        //ObservableSubscribeOn中
        @Override
        public void subscribeActual(final Observer<? super T> observer) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
    
            observer.onSubscribe(parent);
            //SubscribeTask就是我们说的传过去的Runnable对象
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    

    所以走了一大圈,回到了SubscribeTask的run方法中

        final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver<T> parent;
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                //调用source.subscribe方法,parent 就是SubscribeOnObserver
                //source就是前面的Observable.create创建的对象
                source.subscribe(parent);
            }
        }
    

    又走到source.subscribe方法,其中source是Observable.create创建的对象,所以到了第一篇讲的内容里面了,所以回到下面这里

            var sources = object : ObservableOnSubscribe<String> {
                override fun subscribe(emitter: ObservableEmitter<String>) {
                    //走了一大圈,回到了这里,当前的emitter对象为SubscribeOnObserver
                    emitter.onNext("下一步")
                    emitter.onComplete()
                }
            }
            var observable = Observable.create(sources)
    

    走了一圈,回到了这里,不过需要注意的是,因为是在SubscribeTask的run方法中调用subscribe的方法,所以当前subscribe是运行在run方法中的,也就是说是运行在子线程中,所以我们的sources的subscribe方法就是在子线程中运行的,当调用emitter.onNext()时,就会到SubscribeOnObserver的onNext()方法,看SubscribeOnObserver做了什么

            @Override
            public void onNext(T t) {
                downstream.onNext(t);
            }
    
            @Override
            public void onError(Throwable t) {
                downstream.onError(t);
            }
    
            @Override
            public void onComplete() {
                downstream.onComplete();
            }
    

    直接调用downstream.onNext,其中downstream就是observer对象,这样就到到observer的onNext中。

    相关文章

      网友评论

          本文标题:2019-11-08 Rxjava 源码解析<2>

          本文链接:https://www.haomeiwen.com/subject/bkswbctx.html