美文网首页
RaJava 切换源码解析

RaJava 切换源码解析

作者: Wynne丶XXW | 来源:发表于2019-03-03 19:17 被阅读0次

    RxJava 订阅流程

    简单示例

    Observable.create(new ObservableOnSubscribe<Integer>() {
                @Override
                public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                    Log.d("XXW", "subscribe");
                    emitter.onNext(1);
                    emitter.onComplete();
                }
            }).subscribe(new Observer<Integer>() {
                @Override
                public void onSubscribe(Disposable d) {
    
                }
    
                @Override
                public void onNext(Integer s) {
                    LogUtil.d("onNext " + s);
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onComplete() {
    
                }
            });
    

    打印结果

    D/XXW: onSubscribe  Thread Name : main
    D/XXW: subscribe
    D/XXW: subscribe Thread Name : RxCachedThreadScheduler-1
    D/XXW: onNext 1
    D/XXW: onNext 2
    D/XXW: onNext 3
    D/XXW: onNext 4
    

    切换线程

    subscribeOn

    上一篇文章我们分析了RxJava的订阅流程,接下来我们来看RxJava如何通过一句话来切换线程的实现. 首先我们先Schedulers.io()方法, 这个只会对被观察者的线程进行影响. 直接看源码

    public final Observable<T> subscribeOn(Scheduler scheduler) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
    

    上篇文章分析过, onAssmbly会直接返回一个Observable类, 所以我们直接看ObservableSubscribeOn这个Observable的子类.我看的时候就猜会不会和ObservableCreate一样,走Observable的抽象方法 subscribeActual方法

     public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            super(source);
            this.scheduler = scheduler;
        }
    
        @Override
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
            s.onSubscribe(parent);
    
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    

    果然这里和之前分析的一样, 还是会调用subscribeActual这个方法, 和之前一样 我们还是分析他做了几件事情

    1. 包装observer
    2. 调用下游Observer的onSubscribe方法, 所以这里是没有切换线程的 还是当前线程(UI线程) ,从打印结果也可以验证.
    3. 调用scheduler的schedulerDirrect方法
    4. 创立SubscribeTask类

    我们从下往上分析,先看SubscribeTask类

     final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver<T> parent;
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }
    

    这里我们会发现, 我们这个只是一个实现了Runnable的类, 我们之后再看这里,先看scheduleDirect方法

    
     public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            final Worker w = createWorker();
    
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            DisposeTask task = new DisposeTask(decoratedRun, w);
    
            w.schedule(task, delay, unit);
    
            return task;
        }
    
    1. 通过createWorker创建一个Worker类,
    2. 创建一个DisposeTask的线程
    3. 调用schedule方法
      第一步,会发现这个createWorker是一个抽象方法, 所以我们又要去找他的子类, 因为我们SubscribeOn 是传入的一个Schedulers.io()方法.,所以我们先看看这个实现.
    public static Scheduler io() {
            return RxJavaPlugins.onIoScheduler(IO);
        }
    
    IO = RxJavaPlugins.initIoScheduler(new IOTask());
    
     static final class IOTask implements Callable<Scheduler> {
            @Override
            public Scheduler call() throws Exception {
                return IoHolder.DEFAULT;
            }
        }
    
    
    static final class IoHolder {
            static final Scheduler DEFAULT = new IoScheduler();
        }
    
    

    通过源码 知道这个子类其实就是IoScheduler类, 所以我们IoScheduler的createWorker方法.

    public Worker createWorker() {
            return new EventLoopWorker(pool.get());
        }
    

    发现创建了一个EventLoopWorker的类, 先不管 继续看schedule方法,

     static final class EventLoopWorker extends Scheduler.Worker {
            private final CompositeDisposable tasks;
            private final CachedWorkerPool pool;
            private final ThreadWorker threadWorker;
    
            final AtomicBoolean once = new AtomicBoolean();
    
            //初始化
            EventLoopWorker(CachedWorkerPool pool) {
                this.pool = pool;
                this.tasks = new CompositeDisposable();
                //创建ThreadWorker
                this.threadWorker = pool.get();
            }
    
            @Override
            public void dispose() {
                if (once.compareAndSet(false, true)) {
                    tasks.dispose();
    
                    // releasing the pool should be the last action
                    pool.release(threadWorker);
                }
            }
    
            @Override
            public boolean isDisposed() {
                return once.get();
            }
    
            @NonNull
            @Override
            public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
                if (tasks.isDisposed()) {
                    // don't schedule, we are unsubscribed
                    return EmptyDisposable.INSTANCE;
                }
    
                调用ThreadWorker的scheduleActural方法.
                return threadWorker.scheduleActual(action, delayTime, unit, tasks);
            }
        }
    
    
    static final class ThreadWorker extends NewThreadWorker {
            private long expirationTime;
    
            ThreadWorker(ThreadFactory threadFactory) {
                super(threadFactory);
                this.expirationTime = 0L;
            }
    
            public long getExpirationTime() {
                return expirationTime;
            }
    
            public void setExpirationTime(long expirationTime) {
                this.expirationTime = expirationTime;
            }
        }
    

    从上面代码注释可以发现,. 我们在之前createWorker的时候 已经创建好了EventLoopWorker对象, 而且初始化的时候也创建一个叫ThreadWorker的对象, 我们的schedule方法其实最后就是调用的它的schedualActual方法. 从上面代码ThreadWoeker是没有schedualActual方法的. 所以我们猜测只有NewThreadWorker有

    @NonNull
        public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
            if (parent != null) {
                if (!parent.add(sr)) {
                    return sr;
                }
            }
    
            Future<?> f;
            try {
                if (delayTime <= 0) {
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                if (parent != null) {
                    parent.remove(sr);
                }
                RxJavaPlugins.onError(ex);
            }
    
            return sr;
        }
    

    当我们看到这个方法, 我就立马看到executor, 这不就是线程池吗, 所以我立马看了一下成员变量发现如下代码

    public NewThreadWorker(ThreadFactory threadFactory) {
            executor = SchedulerPoolFactory.create(threadFactory);
        }
    

    我们接着看scheduleActual方法, 我们之前创建的线程SubscribeTask 这个时候被当作一个run传进来, 然后封装两层后被添加到一个队列中,然后通过强转后就调用了 我们的run方法, 去执行上一个Observable的subscribe方法, 即subscribeActual方法,

    observerOn

    observerOn 我们一般都是传入AndroidSchedulers.mainThread() ,我们先从这个Scheduler这个子类看

     private static final class MainHolder {
    
            static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
        }
    
        private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
                new Callable<Scheduler>() {
                    @Override public Scheduler call() throws Exception {
                        return MainHolder.DEFAULT;
                    }
                });
    
        /** A {@link Scheduler} which executes actions on the Android main thread. */
        public static Scheduler mainThread() {
            return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
        }
    

    显而易见,我们创建了一个叫HandlerScheduler的子类, 比较有趣的是 这里创建了一个Handler,而且还是主线程的(Looper.getMainLooper). 我们在看observerOn方法.

    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            ObjectHelper.verifyPositive(bufferSize, "bufferSize");
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
        }
    
     public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
            super(source);
            this.scheduler = scheduler;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    
        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                Scheduler.Worker w = scheduler.createWorker();
    
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    

    我们会发现 这个和之前的subscribeOn方法很类似, 所以我们猜测到最后还是到MainHolder 里面进行切换. 这里就不进行深入研究了, 有兴趣的同学可以自己研究研究.

    相关文章

      网友评论

          本文标题:RaJava 切换源码解析

          本文链接:https://www.haomeiwen.com/subject/yuymuqtx.html