美文网首页
rxjava源码解析

rxjava源码解析

作者: 帽子lucio | 来源:发表于2019-01-10 17:34 被阅读0次

    线程切换原理

    image.png
    • 案例
    apiService.getBoundAppInfo(pageNumber)
                    .subscribeOn(Schedulers.io())
                    .observeOn(AndroidSchedulers.mainThread())
                    .bindLifecycle(owner)
                    .subscribe {  }
    
    • subscribeOn切换子线程

    先看subscribe的执行,最后会执行Observable中subscribeActual

     @SchedulerSupport(SchedulerSupport.NONE)
        @Override
        public final void subscribe(Observer<? super T> observer) {
            ObjectHelper.requireNonNull(observer, "observer is null");
            try {
                observer = RxJavaPlugins.onSubscribe(this, observer);
    
                ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
    
                subscribeActual(observer);
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                // can't call onError because no way to know if a Disposable has been set or not
                // can't call onSubscribe because the call might have set a Subscription already
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
        }
    
        protected abstract void subscribeActual(Observer<? super T> observer);
    

    ObservableSubscribeOn中subscribeOn切换为子线程

    @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> subscribeOn(Scheduler scheduler) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
    

    而ObservableSubscribeOn会继承Observable,从而执行subscribeActual方法

      @Override
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
            s.onSubscribe(parent);
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    

    source.subscribe即Observable的subscribe,可见ObservableSubscribeOn是对Observable的监听
    这里scheduler即subscribeOn(Schedulers.io())中的ioSchedulers,获取之后传入ObserveOnObserver
    接着分析scheduler.scheduleDirect(new SubscribeTask(parent))

    @NonNull
        public Disposable scheduleDirect(@NonNull Runnable run) {
            return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
        }
    
      @NonNull
        public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
            final Worker w = createWorker();
    
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            DisposeTask task = new DisposeTask(decoratedRun, w);
    
            w.schedule(task, delay, unit);
    
            return task;
        }
    
    

    w.schedule(task, delay, unit);会执行DisposeTask的run方法,同时执行IoSchedule的schedule方法,而Schedulers.io()时就创建了线程池,并在scheduleActual中执行

     @NonNull
            @Override
            public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
                if (tasks.isDisposed()) {
                    // don't schedule, we are unsubscribed
                    return EmptyDisposable.INSTANCE;
                }
    
                return threadWorker.scheduleActual(action, delayTime, unit, tasks);
            }
    
     @NonNull
        public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
            if (parent != null) {
                if (!parent.add(sr)) {
                    return sr;
                }
            }
    
            Future<?> f;
            try {
                if (delayTime <= 0) {
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                if (parent != null) {
                    parent.remove(sr);
                }
                RxJavaPlugins.onError(ex);
            }
    
            return sr;
        }
    
    • observeOn切换主线程

    创建ObservableObserveOn对象

    @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.CUSTOM)
        public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            ObjectHelper.verifyPositive(bufferSize, "bufferSize");
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
        }
    

    执行subscribeActual方法

     @Override
        protected void subscribeActual(Observer<? super T> observer) {
            if (scheduler instanceof TrampolineScheduler) {
                source.subscribe(observer);
            } else {
                Scheduler.Worker w = scheduler.createWorker();
    
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    

    接着看ObserveOnObserver中的next

    @Override
    public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != QueueDisposable.ASYNC) {
                    queue.offer(t);
                }
                schedule();
            }
    
     void schedule() {
                if (getAndIncrement() == 0) {
                    worker.schedule(this);
                }
            }
    

    worker.schedule(this);中传入this,机会执行ObserveOnObserver中run方法
    run()方法中执行了 a.onNext(v);即切换到主线程之后执行了onNext

    相关文章

      网友评论

          本文标题:rxjava源码解析

          本文链接:https://www.haomeiwen.com/subject/hvoorqtx.html