美文网首页
RxJava的线程切换

RxJava的线程切换

作者: 凯玲之恋 | 来源:发表于2021-02-27 21:36 被阅读0次

    1 线程切换例子

            new Thread() {
                @Override
                public void run() {
                    Log.d(TAG, "Thread run() 所在线程为 :" + Thread.currentThread().getName());
                    Observable
                            .create(new ObservableOnSubscribe<String>() {
                                @Override
                                public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                                    Log.d(TAG, "Observable subscribe() 所在线程为 :" + Thread.currentThread().getName());
                                    emitter.onNext("文章1");
                                    emitter.onNext("文章2");
                                    emitter.onComplete();
                                }
                            })
                            .subscribeOn(Schedulers.io())
                            .observeOn(AndroidSchedulers.mainThread())
                            .subscribe(new Observer<String>() {
                                @Override
                                public void onSubscribe(Disposable d) {
                                    Log.d(TAG, "Observer onSubscribe() 所在线程为 :" + Thread.currentThread().getName());
                                }
    
                                @Override
                                public void onNext(String s) {
                                    Log.d(TAG, "Observer onNext() 所在线程为 :" + Thread.currentThread().getName());
                                }
    
                                @Override
                                public void onError(Throwable e) {
                                    Log.d(TAG, "Observer onError() 所在线程为 :" + Thread.currentThread().getName());
                                }
    
                                @Override
                                public void onComplete() {
                                    Log.d(TAG, "Observer onComplete() 所在线程为 :" + Thread.currentThread().getName());
                                }
                            });
                }
            }.start();
    

    输出结果为:

    Thread run() 所在线程为 :Thread-2
    Observer onSubscribe() 所在线程为 :Thread-2
    Observable subscribe() 所在线程为 :RxCachedThreadScheduler-1
    Observer onNext() 所在线程为 :main
    Observer onNext() 所在线程为 :main
    Observer onComplete() 所在线程为 :main
    
    • Observer(观察者)的onSubscribe()方法运行在当前线程中。
    • Observable(被观察者)中的subscribe()运行在subscribeOn()指定的线程中。
    • Observer(观察者)的onNext()和onComplete()等方法运行在observeOn()指定的线程中。

    2 Scheduler类型

    通过Schedulers类我们可以获取到各种Scheduler的子类。RxJava提供了以下这些线程调度类供我们使用:

    Scheduler类型 使用方式 含义 使用场景
    IoScheduler Schedulers.io() io操作线程 读写SD卡文件,查询数据库,访问网络等IO密集型操作
    NewThreadScheduler Schedulers.newThread() 创建新线程 耗时操作等
    SingleScheduler Schedulers.single() 单例线程 只需一个单例线程时
    ComputationScheduler Schedulers.computation() CPU计算操作线程 图片压缩取样、xml,json解析等CPU密集型计算
    TrampolineScheduler Schedulers.trampoline() 当前线程 需要在当前线程立即执行任务时
    HandlerScheduler AndroidSchedulers.mainThread() Android主线程 更新UI等

    2.1 Schedulers类的io()

        @NonNull
        static final Scheduler IO;
    
        @NonNull
        public static Scheduler io() {
            //1.直接返回一个名为IO的Scheduler对象
            return RxJavaPlugins.onIoScheduler(IO);
        }
    
        static {
            //省略无关代码
    
            //2.IO对象是在静态代码块中实例化的,这里会创建按一个IOTask()
            IO = RxJavaPlugins.initIoScheduler(new IOTask());
        }
    
        static final class IOTask implements Callable<Scheduler> {
            @Override
            public Scheduler call() throws Exception {
                //3.IOTask中会返回一个IoHolder对象
                return IoHolder.DEFAULT;
            }
        }
    
        static final class IoHolder {
            //4.IoHolder中会就是new一个IoScheduler对象出来
            static final Scheduler DEFAULT = new IoScheduler();
        }
    

    可以看到,Schedulers.io()中使用了静态内部类的方式来创建出了一个单例IoScheduler对象出来,这个IoScheduler是继承自Scheduler的

    3 subscribeOn()源码分析

     .subscribeOn(Schedulers.io())
    

    subscribeOn()方法要传入一个Scheduler类对象作为参数,Scheduler是一个调度类,能够延时或周期性地去执行一个任务。

        public final Observable<T> subscribeOn(Scheduler scheduler) {
            //省略无关代码
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
    
        public final Observable<T> subscribeOn(Scheduler scheduler) {
            //省略无关代码
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
    

    可以看到,首先会将当前的Observable(其具体实现为ObservableCreate)包装成一个新的ObservableSubscribeOn对象。
    放个图:


    3.3 ObservableSubscribeOn类的构造方法

        public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            super(source);
            this.scheduler = scheduler;
        }
    

    也就是把source和scheduler这两个保存一下,后面会用到。

    然后subscribeOn()方法就完了。好像也没做什么,就是重新包装一下对象而已,然后将新对象返回。即将一个旧的被观察者包装成一个新的被观察者

    3.4 ObservableSubscribeOn类的subscribeActual()

    RxJava 的消息订阅
    由于我们调用subscribeOn()之后,ObservableCreate对象被包装成了一个新的ObservableSubscribeOn对象了。因此我们就来看看ObservableSubscribeOn类中的subscribeActual()方法:

        @Override
        public void subscribeActual(final Observer<? super T> s) {
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
    
            s.onSubscribe(parent);
    
            parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
        }
    

    subscribeActual()中同样也将我们自定义的Observer给包装成了一个新的SubscribeOnObserver对象。同样,放张图:


    然后就是调用Observer的onSubscribe()方法,可以看到,到目前为止,还没出现过任何线程相关的东西,所以Observer的onSubscribe()方法就是运行在当前线程中

    然后我们重点看下最后一行代码,首先创建一个SubscribeTask对象,然后就是调用scheduler.scheduleDirect().。

    3.5 SubscribeTask类

        //SubscribeTask是ObservableSubscribeOn的内部类
        final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver<T> parent;
    
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
    
            @Override
            public void run() {
                //这里的source就是我们自定义的Observable对象,即ObservableCreate
                source.subscribe(parent);
            }
        }
    

    很简单的一个类,就是实现了Runnable接口,然后run()中调用Observer.subscribe()

    3.6 Scheduler类的scheduleDirect()

        public Disposable scheduleDirect(@NonNull Runnable run) {
            return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
        }
    

    往下看:

        public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
    
            //createWorker()在Scheduler类中是个抽象方法,所以其具体实现在其子类中
            //因此这里的createWorker()应当是在IoScheduler中实现的。
            //Worker中可以执行Runnable
            final Worker w = createWorker();
    
            //实际上decoratedRun还是这个run对象,即SubscribeTask
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            //将Runnable和Worker包装成一个DisposeTask
            DisposeTask task = new DisposeTask(decoratedRun, w);
    
            //Worker执行这个task
            w.schedule(task, delay, unit);
    
            return task;
        }
    

    我们来看下创建Worker和Worker执行任务的过程。

    3.7 IoScheduler的createWorker()和schedule()

        final AtomicReference<CachedWorkerPool> pool;
    
        public Worker createWorker() {
            //就是new一个EventLoopWorker,并且传一个Worker缓存池进去
            return new EventLoopWorker(pool.get());
        }
    
        static final class EventLoopWorker extends Scheduler.Worker {
            private final CompositeDisposable tasks;
            private final CachedWorkerPool pool;
            private final ThreadWorker threadWorker;
    
            final AtomicBoolean once = new AtomicBoolean();
    
            //构造方法
            EventLoopWorker(CachedWorkerPool pool) {
                this.pool = pool;
                this.tasks = new CompositeDisposable();
                //从缓存Worker池中取一个Worker出来
                this.threadWorker = pool.get();
            }
    
            @NonNull
            @Override
            public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
                //省略无关代码
    
                //Runnable交给threadWorker去执行
                return threadWorker.scheduleActual(action, delayTime, unit, tasks);
            }
        }
    

    注意,不同的Scheduler类会有不同的Worker实现,因为Scheduler类最终是交到Worker中去执行调度的。

    3.9 CachedWorkerPool的get()

        static final class CachedWorkerPool implements Runnable {
            ThreadWorker get() {
                if (allWorkers.isDisposed()) {
                    return SHUTDOWN_THREAD_WORKER;
                }
                while (!expiringWorkerQueue.isEmpty()) {
                    //如果缓冲池不为空,就从缓存池中取threadWorker
                    ThreadWorker threadWorker = expiringWorkerQueue.poll();
                    if (threadWorker != null) {
                        return threadWorker;
                    }
                }
    
                //如果缓冲池中为空,就创建一个并返回。
                ThreadWorker w = new ThreadWorker(threadFactory);
                allWorkers.add(w);
                return w;
            }
        }
    

    3.9 NewThreadWorker的scheduleActual()

    我们再来看下threadWorker.scheduleActual()。
    ThreadWorker类没有实现scheduleActual()方法,其父类NewThreadWorker实现了该方法,我们点进去看下:

    public class NewThreadWorker extends Scheduler.Worker implements Disposable {
        private final ScheduledExecutorService executor;
    
        volatile boolean disposed;
    
        public NewThreadWorker(ThreadFactory threadFactory) {
            //构造方法中创建一个ScheduledExecutorService对象,可以通过ScheduledExecutorService来使用线程池
            executor = SchedulerPoolFactory.create(threadFactory);
        }
    
        public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            //这里的decoratedRun实际还是run对象
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            //将decoratedRun包装成一个新对象ScheduledRunnable
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
            //省略无关代码
    
            if (delayTime <= 0) {
                //线程池中立即执行ScheduledRunnable
                f = executor.submit((Callable<Object>)sr);
            } else {
                //线程池中延迟执行ScheduledRunnable
                f = executor.schedule((Callable<Object>)sr, delayTime, unit);
            }
    
            //省略无关代码
    
            return sr;
        }
    }
    

    这里的executor就是使用线程池去执行任务,最终SubscribeTask的run()方法会在线程池中被执行,即Observable的subscribe()方法会在IO线程中被调用。

    Observable subscribe() 所在线程为 RxCachedThreadScheduler-1
    

    3.10 简单总结

    • Observer(观察者)的onSubscribe()方法运行在当前线程中,因为在这之前都没涉及到线程切换。

    • 如果设置了subscribeOn(指定线程),那么Observable(被观察者)中subscribe()方法将会运行在这个指定线程中去。

    3.11 时序图

    3.12 多次设置subscribeOn()的问题

    如果我们多次设置subscribeOn(),那么其执行线程是在哪一个呢?先来看下例子

            //省略前后代码,看重点部分
            .subscribeOn(Schedulers.io())//第一次
            .subscribeOn(Schedulers.newThread())//第二次
            .subscribeOn(AndroidSchedulers.mainThread())//第三次
    

    其输出结果为:

    Observable subscribe() 所在线程为 :RxCachedThreadScheduler-1
    
    

    即只有第一次的subscribeOn()起作用了。这是为什么呢?
    我们知道,每调用一次subscribeOn()就会把旧的被观察者包装成一个新的被观察者,经过了三次调用之后,就变成了下面这个样子:


    同时,我们知道,被观察者被订阅时是从最外面的一层通知到里面的一层,那么当传到上图第三层时,也就是ObservableSubscribeOn(第一次)那一层时,管你之前是在哪个线程,subscribeOn(Schedulers.io())都会把线程切到IO线程中去执行,所以多次设置subscribeOn()时,只有第一次生效。

    4 observeOn()

        //指定在Android主线程中执行
        .observeOn(AndroidSchedulers.mainThread())
    

    4.1 Observable类的observeOn()

        public final Observable<T> observeOn(Scheduler scheduler) {
            return observeOn(scheduler, false, bufferSize());
        }
    
        public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
            //省略无关代码
            return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
        }
    

    同样,这里也是新包装一个ObservableObserveOn对象,注意,这里包装的旧被观察者是ObservableSubscribeOn对象了,因为之前调用过subscribeOn()包装了一层了,所以现在是如下图所示:


    RxJavaPlugins.onAssembly()也是原样返回。

    我们看看ObservableObserveOn的构造方法。

    4.2 ObservableObserveOn类的构造方法

        public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
            super(source);
            this.scheduler = scheduler;
            this.delayError = delayError;
            this.bufferSize = bufferSize;
        }
    

    4.3 ObservableObserveOn的subscribeActual()

    和subscribeOn()差不多,我们就直接来看ObservableObserveOn的subscribeActual()方法了。

        @Override
        protected void subscribeActual(Observer<? super T> observer) {
            //判断是否当前线程
            if (scheduler instanceof TrampolineScheduler) {
                //是当前线程的话,直接调用里面一层的subscribe()方法
                //即调用ObservableSubscribeOn的subscribe()方法
                source.subscribe(observer);
            } else {
                //创建Worker
                //本例子中的scheduler为AndroidSchedulers.mainThread()
                Scheduler.Worker w = scheduler.createWorker();
                //这里会将Worker包装到ObserveOnObserver对象中去
                //注意:source.subscribe没有涉及到Worker,所以还是在之前设置的线程中去执行
                //本例子中source.subscribe就是在IO线程中执行。
                source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
            }
        }
    

    同样,这里也将observer给包装了一层,如下图所示:

    source.subscribe()中将会把事件逐一发送出去,我们这里只看下ObserveOnObserver中的onNext()方法的处理,onComplete()等就不看了,实际上都差不多。

    4.4 ObserveOnObserver的onNext()

            @Override
            public void onNext(T t) {
                //省略无关代码
                if (sourceMode != QueueDisposable.ASYNC) {
                    //将信息存入队列中
                    queue.offer(t);
                }
                schedule();
            }
    

    就是调用schedule()而已。

    4.5 ObserveOnObserver的schedule()

            void schedule() {
                if (getAndIncrement() == 0) {
                    //ObserveOnObserver同样实现了Runnable接口,所以就把它自己交给worker去调度了
                    worker.schedule(this);
                }
            }
    

    Android主线程调度器里面的代码就不分析了,里面实际上是用handler来发送Message去实现的,感兴趣的可以看下。
    既然ObserveOnObserver实现了Runnable接口,那么就是其run()方法会在主线程中被调用。
    我们来看下ObserveOnObserver的run()方法:

    4.6 ObserveOnObserver的run()

            @Override
            public void run() {
                //outputFused默认是false
                if (outputFused) {
                    drainFused();
                } else {
                    drainNormal();
                }
            }
    

    这里会走到drainNormal()方法

    4.7 ObserveOnObserver的drainNormal()

            void drainNormal() {
                int missed = 1;
                //存储消息的队列
                final SimpleQueue<T> q = queue;
                //这里的actual实际上是SubscribeOnObserver
                final Observer<? super T> a = actual;
    
                //省略无关代码
    
                //从队列中取出消息
                v = q.poll();
    
                //...
    
                //这里调用的是里面一层的onNext()方法
                //在本例子中,就是调用SubscribeOnObserver.onNext()
                a.onNext(v);
    
                //...
            }
    

    至于SubscribeOnObserver.onNext(),里面也没切换线程的逻辑,就是调用里面一层的onNext(),所以最终会调用到我们自定义的Observer中的onNext()方法。

    因此,Observer的onNext()方法就在observeOn()中指定的线程中给调用了,在本例中,就是在Android主线程中给调用。

    4.8 简单总结

    1. 如果设置了observeOn(指定线程),那么Observer(观察者)中的onNext()、onComplete()等方法将会运行在这个指定线程中去。
    2. subscribeOn()设置的线程不会影响到observeOn()。

    4.9 observeOn()时序图:

    参考

    详解 RxJava 的消息订阅和线程切换原理

    相关文章

      网友评论

          本文标题:RxJava的线程切换

          本文链接:https://www.haomeiwen.com/subject/htyifltx.html