美文网首页
RxJava2 源码分析(二) : subscribeOn()线

RxJava2 源码分析(二) : subscribeOn()线

作者: ShawZ | 来源:发表于2019-05-27 18:02 被阅读0次

    前言

    上回讲到subscribe()订阅观察者,这回咱们来聊聊subscribeOn()方法,官方解释是:在指定的线程中订阅该被观察者,你肯定知道subscribeOn调用多次只有最上面的那一次是生效的,真的是这样吗?留着这个问题,start read the fucking code~~

    1.示例

     Observable.create(...)
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "accept: first Thread = " + Thread.currentThread().getName());
                        }
                    })
                    .subscribeOn(Schedulers.computation())
                    .doOnNext(new Consumer<Integer>() {
                        @Override
                        public void accept(Integer integer) throws Exception {
                            Log.d(TAG, "accept: second Thread = " + Thread.currentThread().getName());
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .subscribe(new Observer<Integer>() {...}
                    });
    
    
    //日志
    D: onSubscribe: Thread = main
    D: accept: first Thread = RxComputationThreadPool-2
    D: accept: second Thread = RxComputationThreadPool-2
    D: onNext: Thread = RxComputationThreadPool-2
    D: onComplete: Thread = RxComputationThreadPool-2
    

    我们先不关心just,doOnNext操作符,这里仅仅是用来打印日志,结合上回的分析开始本次源码分析~

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        //懂的自然懂~,这里直接返回ObservableSubscribeOn实例
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    
    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
        final Scheduler scheduler;
    
        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            //持有上游的Source对象,也就是ObservableCreate被观察者对象
            super(source);
            //持有我们传入的线程调度对象
            this.scheduler = scheduler;
        }
    }
    

    2.开始订阅

    有了上篇的分析,我们知道这里直接调用的是ObservableSubscribeOn的subscribeActual()方法

        public void subscribeActual(final Observer<? super T> observer) {
            //实例化SubscribeOnObserver观察者对象,传入observer进行持有
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(observer);
            //打印第一行日志 ---onSubscribe: Thread = main
            //因为当前线程是UI线程,所以是线程名为main
            observer.onSubscribe(parent);
            //这里没有直接订阅,而是先进行了线程切换,才开始继续’向上订阅‘
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    
        //SubscribeTask是ObservableSubscribeOn的内部类,实现了Runnable接口,注定要在线程里完成使命
        final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver<T> parent;
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                //持有我们生成的SubscribeOnObserver对象
                this.parent = parent;
            }
    
            @Override
            public void run() {
                //source是指‘上游的’ObservableSource对象
                source.subscribe(parent);
            }
        }
    
        //接着再看scheduler.scheduleDirect()
        public Disposable scheduleDirect(@NonNull Runnable run) {
            return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
        }
    
        public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            //1.createWorker是Scheduler的抽象函数,我们外部传入的是Schedulers.io()对象,即IoScheduler实例
            //这里的Worker是EventLoopWorker类实例,持有线程池之类的,这里不究细节
            final Worker w = createWorker();
            //属性的钩子函数
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            //DisposeTask实现Runnable,Disposable接口,进行一层封装
            DisposeTask task = new DisposeTask(decoratedRun, w);
            //将我们创建的task传进去
            w.schedule(task, delay, unit);
    
            return task;
        }
    
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            //首先判断是否已经被安排了(取消订阅了)
                if (tasks.isDisposed()) {
                    // don't schedule, we are unsubscribed
                    return EmptyDisposable.INSTANCE;
                }
            //这里有很多线程池,Worker缓存池的东西,我们先不纠这些细节,跳过往下看scheduleActual()
                return threadWorker.scheduleActual(action, delayTime, unit, tasks);
            }
    
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            //我也不知道,为什么要将SubscribeTask对象包裹这么多次。。。。。。
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
            if (parent != null) {
                if (!parent.add(sr)) {
                    return sr;
                }
            }
    
            Future<?> f;
            try {
                //延迟默认是0
                if (delayTime <= 0) {
                    //最终看到这里,我们的SubscribeTask对象在线程池中终于被安排上了,好累~~
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                if (parent != null) {
                    parent.remove(sr);
                }
                RxJavaPlugins.onError(ex);
            }
    
            return sr;
        }
    

    3.总结

    subscribeOn的职责

    1. 调用onSubscribe()开始订阅
    2. 切换指定线程线程
    3. 在指定线程中继续向上订阅

    后续的向上订阅到发射数据,都是在指定的线程中执行,这样也解释了我们之前打印的日志,在UI线程中订阅,切换到io线程中继续向上订阅,再次切换到Computation线程向上订阅,后续的发射发射数据都是RxComputationThreadPool线程中,也证明了调用多次subscribeOn()只有最上面那个有效。

    相关文章

      网友评论

          本文标题:RxJava2 源码分析(二) : subscribeOn()线

          本文链接:https://www.haomeiwen.com/subject/mvhktctx.html