美文网首页RxJavaAndroid开发经验谈Android开发
RxJava线程切换流程分析_subscribeOn

RxJava线程切换流程分析_subscribeOn

作者: 未见哥哥 | 来源:发表于2017-06-12 22:19 被阅读214次

    在上一小节中RxJava2_整体流程分析,有这么一个结论,那就是每一次调用 Observable 的操作符都会返回一个新的 Observable 对象,并且会通过构造的方式传入上一级创建的 Observable 对象,将其保存起来,下面是示例代码。那么接下来操作的 subscribeOn、observeOn 操作符都会分别创建新的 Observable 对象,并存储上一级创建的 observable。

    //上一级创建的 observable 对象:ObservableOnSubscribe
    Observable.create(new ObservableOnSubscribe<String>() {...}
    
    public final class ObservableCreate<T> extends Observable<T> {
        final ObservableOnSubscribe<T> source;
        //保存上一级创建的 Observable 对象 : ObservableOnSubscribe
        public ObservableCreate(ObservableOnSubscribe<T> source) {
            this.source = source;
        }
    }
    

    一、执行流程图

    RxJava2_执行流程分析图.png

    二、示例代码

    下面这段代码的功能就是在 subscribe 方法内部通过调用 getBitampFormServer 去请求一个 Bitmap 对象,这个方法是耗时操作,当前的操作应该在子线程中执行,得到 bmp 之后,根据结果分别去调用 onNext() /onError() 方法。而在订阅者中若是 onNext 被回调则表示成功获取到 bmp,对应地将其设置给对应的 mImageView 对象上,如果 onError 被回调了,那么表示加载 Bitmap 是失败的,对应的再做一些其它操作,这些操作应该在主线程中进行。本次通过从源码的角度探究的是 RxJava2 内部是如何进行线程切换操作的。本小节先分析 subscribeOn 如何去实现事件源在子线程中发射事件。也就是 ObservableOnSubscribe#subscribe 在子线程中去执行。

    Observable.create(new ObservableOnSubscribe<String>() {
        @Override
        public void subscribe(@NonNull ObservableEmitter<Bitmap> e) throws Exception {
         
            //该方法进行网络请求,是比较耗时的操作。
            Bitmap bmp = getBitampFormServer("uri");
            if(bmp!=null) {
                //获取 bmp 成功
                e.onNext(bmp);
                e.onComplete();
            }else{
                //如果从网络加载图片不成功,回调onError 来通知订阅者
                e.onError(new Exception("图片加载出错啦"));
            }
        }}) //事件源发射事件在子线程中运行
            .subscribeOn(Schedulers.io())
            //订阅者在主线程中接受事件
            .observeOn(AndroidSchedulers.mainThread())
            .subscribe(new Observer<Bitmap>() {
                @Override
                public void onSubscribe(Disposable d) {
                    Log.e("zeal", "onSubscribe");
                }
                @Override
                public void onNext(@NonNull Bitmap bmp) {
                    //设置显示在 ImageView 上
                    mImageView.setImageBitmap(bmp);             
                }
                @Override
                public void onError(@NonNull Throwable e) {
                    Log.e("zeal","error:"+e.toString());
                }
                @Override
                public void onComplete() {
                    Log.e("zeal", "onComplete");
                }
            });
    

    2、.subscribeOn(Schedulers.io()) 源码分析

    public final Observable<T> subscribeOn(Scheduler scheduler) {
        ...
        return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    

    2.1、Scheduler

    从下面的类注释可以知道,这个类是一个调度类,可以延时/周期性地去执行一个任务。可以从 Schedulers 这个类去获取 Scheduler 的实现子类对象,例如在频繁进行 io 操作就可以调用 Schedulers.io() ,如果是计算比较多的可以调用 Schedulers.computation()。

    /**
     * A {@code Scheduler} is an object that specifies an API for scheduling
     * units of work with or without delays or periodically.
     * You can get common instances of this class in {@link io.reactivex.schedulers.Schedulers}.
     */
    public abstract class Scheduler {}
    

    2.2、Schedulers.io()

    通过下面的 Schedulers.io() 源码跟踪,最终返回的是一个 IoScheduler 对象,这个对象实际上就是 Scheduler 的子类对象。那么就符合 subscribeOn(Scheduler) 参数的要求了。

    @NonNull
    public static Scheduler io() {
        //内部是 IO 
        return RxJavaPlugins.onIoScheduler(IO);
    }
    //-----------------------------------------------------
    @NonNull
    static final Scheduler IO;
    
    static {
        ...
        // IO 是在静态代码块中实例化的
        IO = RxJavaPlugins.initIoScheduler(new Callable<Scheduler>() {
            @Override
            public Scheduler call() throws Exception {
                //这里返回一个 IoHolder 对象。
                return IoHolder.DEFAULT;
            }
        });
        ...
    }
    //-----------------------------------------------------
    static final class IoHolder {
        static final Scheduler DEFAULT = new IoScheduler();
    }
    
    //-----------------------------------------------------
    //IoHolder 类定义中可以知道,该类是继承至 Scheduler
    public final class IoScheduler extends Scheduler {}
    

    2.3、subscribeOn 内部实现

    • subscribeOn(Scheduler scheduler)

    这个方法内部会通过创建一个 ObservableSubscribeOn 对象,根据之前的经验可知道,这个类肯定也是一个 Observable 的子类对象。因此对于 subscribe(observer) 方法而言,我们就只关心它真正调用的方法 subscribeActual(observer) 。

    • subscribeActual(observer)

    在subscribeActual 内部首先是对 observer 进行包装成 SubscribeOnObserver 对象。这里的 SubscribeOnObserver 不仅是一个 Observer ,而且具备一个连接器的作用 Disposable 。

    @Override
    public void subscribeActual(final Observer<? super T> s) {
        //包装 observer 对象
        final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
        //将连接器 parent 通过 onSubscribe 回调给 observer 对象
        s.onSubscribe(parent);
        //这里是通过 scheduler 去执行一个任务 SubscribeTask。
        parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
    }
    
    • SubscribeOnObserver

    这个类是对 observer 的包装,内部实现了 Observer 和 Disposable 接口。也就是说它既有订阅者的功能,也实现了连接器的功能。注意 actual 这个变量,它是下一级的 Observer 对象,为什么说是下一级呢?因为每次包装的 Observer 是一级级别往上被订阅的,当前的 Observer 都会包装下一级别的 Observer 对象。例如 SubscribeOnObserver 就封装了下一级的 Observer 对象,其实就是当前 Observer 接受到事件源发送过来的事件时,再调用包装的 Observer 回调给下一级,这样一级级传递下去知道最后一级 Observer。

    static final class SubscribeOnObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
        ...
        final Observer<? super T> actual;
        final AtomicReference<Disposable> s;
        SubscribeOnObserver(Observer<? super T> actual) {
            this.actual = actual;
            this.s = new AtomicReference<Disposable>();
        }
        @Override
        public void onSubscribe(Disposable s) {
            DisposableHelper.setOnce(this.s, s);
        }
        //发送事件
        @Override
        public void onNext(T t) {
            //回调给下一级
            actual.onNext(t);
        }
        //发送事件
        @Override
        public void onError(Throwable t) {
            //回调给下一级
            actual.onError(t);
        }
        //发送事件
        @Override
        public void onComplete() {
            //回调给下一级
            actual.onComplete();
        }
        @Override
        public void dispose() {
            DisposableHelper.dispose(s);
            DisposableHelper.dispose(this);
        }
        @Override
        public boolean isDisposed() {
            return DisposableHelper.isDisposed(get());
        }
        void setDisposable(Disposable d) {
            DisposableHelper.setOnce(this, d);
    
    • SubscribeTask(parent)

    SubscribeTask 它是一个 Runnbale ,因此我们把它理解为一个任务。首先关注是它的 run 方法,它内部实现很简单,就是**通知上一级的 Observable 通过 subscribe 这个方法进行订阅当前 observer **。下面会执行一大堆代码,其实都会为创建一个线程然后交给指定的线程池取执行这个任务,先记住这个任务的使命。那么既然是一个线程,那么肯定有一个地方需要执行这个线程的,接下来关注 scheduler.scheduleDirect 方法。

    final class SubscribeTask implements Runnable {
        private final SubscribeOnObserver<T> parent;
        SubscribeTask(SubscribeOnObserver<T> parent) {
            this.parent = parent;
        }
        @Override
        public void run() {
            //【核心代码,这段代码决定上一级observable订阅在哪个线程执行。】
            //source:就是上一级创建的 observable
            //parent 就是包装后的 observer
            source.subscribe(parent);
        }
    }
    

    开始寻找 SubscribeTask 这个线程实在哪里被执行的。

    • scheduler.scheduleDirect(new SubscribeTask(parent))

    刚才分析过 scheduler 就是 IoScheduler 对象了,跟踪源码发现,这个类并没有重写这个方法,因此直接进入 Scheduler 查看。

    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    
    //这里的 delay = 0,也就是马上执行这个任务。
    //【这个 run 就是我们的目标】
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        
        //核心代码 createWorker() 创建一个可以可以执行 run 的 worker 
        final Worker w = createWorker();
        //对 run 进行了包装,实际上还是 run 这个对象。【这个 run 就是我们的目标】
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        
        //decoratedRun 交给了 worker 去执行
        w.schedule(new Runnable() {
            @Override
            public void run() {
                try {
                    【我们的目标在此处被执行】
                    decoratedRun.run();
                } finally {
                    //事件源发射事件完毕之后,就关闭连接器。
                    w.dispose();
                }
            }
        }, delay, unit);
        return w;
    }
    
    • IoScheduler#createWorker();

    现在我们知道我们的任务是交给 worker.schedule() 去执行的。因为 Worker 是负责去执行调度的,因此不同的子类会有不同的 Worker 的实现,在 Scheduler 中通过 createWorker() 来获取子类实现的 Worker 对象。

    @Override
    public Worker createWorker() {
        return new EventLoopWorker(pool.get());
    }
    
    • Scheduler#Worker

    这个类具备延迟执行任务,周期性执行任务的功能。所有的执行都是基于 schedule() 方法,而这个方法是一个抽象方法,也就是它无法知道子类需要怎么执行这个任务,因为每一种调度器执行的方式 schedule 都不一样,因此交给子类去实现。

     @NonNull
     public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
    
    • EnentLooerWorker#schedule()

    有了 Worker 之后就要开始执行【我们的任务 action 啦】

    @Override
    public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
        if (tasks.isDisposed()) {
            // don't schedule, we are unsubscribed
            return EmptyDisposable.INSTANCE;
        }
        //【任务 action 】交给 threadWorker 去执行
        return threadWorker.scheduleActual(action, delayTime, unit, tasks);
    
    • threadWorker.scheduleActual

    threadWorker 是 ThreadWorker ,继承至 NewThreadWorker 。

    static final class ThreadWorker extends NewThreadWorker 
    
    //NewThreadWorker 内部维护一个线程池 executor。
    public class NewThreadWorker extends Scheduler.Worker implements Disposable {
        private final ScheduledExecutorService executor;
    
        volatile boolean disposed;
    
        public NewThreadWorker(ThreadFactory threadFactory) {
            executor = SchedulerPoolFactory.create(threadFactory);
        }
        
    
    //最终代码会走到这里
    @NonNull
    public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
        //对 run 进行包装
        Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
        if (parent != null) {
            if (!parent.add(sr)) {
                return sr;
            }
        }
        Future<?> f;
        try {
            //上面已经提到,delayTime = 0;所以这个任务会被立即执行,
            if (delayTime <= 0) {
                f = executor.submit((Callable<Object>)sr);
            } else {
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
            sr.setFuture(f);
        } catch (RejectedExecutionException ex) {
            if (parent != null) {
                parent.remove(sr);
            }
            RxJavaPlugins.onError(ex);
        }
        return sr;
    }
    
    @Override
    public void run() {
        try {
            try {
                //执行原始的 run 方法。
                actual.run();
            } catch (Throwable e) {
                // Exceptions.throwIfFatal(e); nowhere to go
                RxJavaPlugins.onError(e);
            }
        } finally {
            Object o = get(PARENT_INDEX);
            if (o != DISPOSED && o != null && compareAndSet(PARENT_INDEX, o, DONE)) {
                ((DisposableContainer)o).delete(this);
            }
            for (;;) {
                o = get(FUTURE_INDEX);
                if (o == DISPOSED || compareAndSet(FUTURE_INDEX, o, DONE)) {
                    break;
                }
            }
        }
    }
    

    2.4、 结果

    f = executor.submit((Callable<Object>)sr); 这里执行了 SubscribeTask#run() 方法,也就是当前的订阅者 Observer 订阅了上一级的 Observable 。也就是上一级的 ObservableCreate.subscribe(observer) 被执行了。请注意它是在子线程中被执行的。如果想要了解接下来的事件源是怎么发送事件的可以参考RxJava2_整体流程分析

    相关文章

      网友评论

        本文标题:RxJava线程切换流程分析_subscribeOn

        本文链接:https://www.haomeiwen.com/subject/dasrqxtx.html