美文网首页
rxjava源码解析

rxjava源码解析

作者: 帽子lucio | 来源:发表于2019-01-10 17:34 被阅读0次

线程切换原理

image.png
  • 案例
apiService.getBoundAppInfo(pageNumber)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .bindLifecycle(owner)
                .subscribe {  }
  • subscribeOn切换子线程

先看subscribe的执行,最后会执行Observable中subscribeActual

 @SchedulerSupport(SchedulerSupport.NONE)
    @Override
    public final void subscribe(Observer<? super T> observer) {
        ObjectHelper.requireNonNull(observer, "observer is null");
        try {
            observer = RxJavaPlugins.onSubscribe(this, observer);

            ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");

            subscribeActual(observer);
        } catch (NullPointerException e) { // NOPMD
            throw e;
        } catch (Throwable e) {
            Exceptions.throwIfFatal(e);
            // can't call onError because no way to know if a Disposable has been set or not
            // can't call onSubscribe because the call might have set a Subscription already
            RxJavaPlugins.onError(e);

            NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
            npe.initCause(e);
            throw npe;
        }
    }

    protected abstract void subscribeActual(Observer<? super T> observer);

ObservableSubscribeOn中subscribeOn切换为子线程

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }

而ObservableSubscribeOn会继承Observable,从而执行subscribeActual方法

  @Override
    public void subscribeActual(final Observer<? super T> s) {
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        s.onSubscribe(parent);
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }

source.subscribe即Observable的subscribe,可见ObservableSubscribeOn是对Observable的监听
这里scheduler即subscribeOn(Schedulers.io())中的ioSchedulers,获取之后传入ObserveOnObserver
接着分析scheduler.scheduleDirect(new SubscribeTask(parent))

@NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }

  @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }

w.schedule(task, delay, unit);会执行DisposeTask的run方法,同时执行IoSchedule的schedule方法,而Schedulers.io()时就创建了线程池,并在scheduleActual中执行

 @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }

            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
 @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);

        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }

        Future<?> f;
        try {
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }

        return sr;
    }
  • observeOn切换主线程

创建ObservableObserveOn对象

@CheckReturnValue
    @SchedulerSupport(SchedulerSupport.CUSTOM)
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
        ObjectHelper.requireNonNull(scheduler, "scheduler is null");
        ObjectHelper.verifyPositive(bufferSize, "bufferSize");
        return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }

执行subscribeActual方法

 @Override
    protected void subscribeActual(Observer<? super T> observer) {
        if (scheduler instanceof TrampolineScheduler) {
            source.subscribe(observer);
        } else {
            Scheduler.Worker w = scheduler.createWorker();

            source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
        }
    }

接着看ObserveOnObserver中的next

@Override
public void onNext(T t) {
            if (done) {
                return;
            }

            if (sourceMode != QueueDisposable.ASYNC) {
                queue.offer(t);
            }
            schedule();
        }

 void schedule() {
            if (getAndIncrement() == 0) {
                worker.schedule(this);
            }
        }

worker.schedule(this);中传入this,机会执行ObserveOnObserver中run方法
run()方法中执行了 a.onNext(v);即切换到主线程之后执行了onNext

相关文章

  • RxPermissions 源码解析之举一反三

    [toc] RxPermissions 源码解析 简介 RxPermissions 是基于 RxJava 开发的用...

  • RxJava2框架源码分析三(map篇)

    1.回顾 上篇已经讲了RxJava2创建操作符create源码解析,不清楚的可以查看RxJava2框架源码分析二(...

  • Rxjava2.x源码解析(二): 线程切换

    上一篇文章Rxjava2.x源码解析(一): 订阅流程中我们讲了 RxJava2 的订阅部分的源码。但 RxJav...

  • RxJava 2 源码解析之线程切换

    在分析RxJava2的线程切换源码之前,再看看在上一篇RxJava 2 源码解析之创建-订阅-变换-发布里总结的流...

  • rxjava2理解

    本文从建立模型的角度分析rxjava2的源码实现,适合看了众多rxjava2源码解析还是一头雾水的同学,附带少量代...

  • Rxjava源码解析

    先上代码: 上面是Rxjava最简单的实现模型。从链式调用的返回值来看: 所以最后的调用对象是 从上面的返回值可以...

  • RxJava源码解析

    基本框架 Observable (可观察者,即被观察者) Observer (观察者) subscribe (订阅...

  • Rxjava源码解析

    这边文章主要记录使用Rxjava过程中对map方法以及flatmap方法的源码理解,自认为也是RxJava的一个精...

  • Rxjava源码解析

    Rxjava本质上是一个异步操作库。是一个能让你用非常简单的逻辑 去处理那些繁琐复杂任务的 异步的操作库。 一、观...

  • rxjava源码解析

    线程切换原理 案例 subscribeOn切换子线程 先看subscribe的执行,最后会执行Observable...

网友评论

      本文标题:rxjava源码解析

      本文链接:https://www.haomeiwen.com/subject/hvoorqtx.html