美文网首页
RxJava源码分析(三)线程切换subscribeOn

RxJava源码分析(三)线程切换subscribeOn

作者: kakaxicm | 来源:发表于2018-08-12 17:52 被阅读0次

    引言

    前面我们梳理了RxJava扩展的观察者模式的实现,今天我们学习RxJava的第二块核心内容:订阅方法的线程切换subscribeOn。

    subscribeOn方法

    public final Observable<T> subscribeOn(Scheduler scheduler) {
            ObjectHelper.requireNonNull(scheduler, "scheduler is null");
            return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
        }
    

    根据scheduler对象和原来的Observable对象构造了新的ObservableSubscribeOn对象:

    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T>{
        ...
    }
    

    继承自AbstractObservableWithUpstream,注意这里有两个泛型,结合名字意思,可以猜想这个类可能与Observable的变换相关。
    AbstractObservableWithUpstream:

    /**
     * Base class for operators with a source consumable.
     *
     * @param <T> the input source type
     * @param <U> the output type
     */
    abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
    
        /** The source consumable Observable. */
        protected final ObservableSource<T> source;
    
        /**
         * Constructs the ObservableSource with the given consumable.
         * @param source the consumable Observable
         */
        AbstractObservableWithUpstream(ObservableSource<T> source) {
            this.source = source;
        }
    
        @Override
        public final ObservableSource<T> source() {
            return source;
        }
    
    }
    

    我们可以看到它内部封装了原ObservableSource<T>,而继承自Observable<U>,其中T为原来的数据类型,U为转换后的数据类型。
    下面我们回头再看ObservableSubscribeOn:

    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
          final Scheduler scheduler;
          //构造方法传入原来的ObservableSource和Scheduler对象
          public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
            super(source);
            this.scheduler = scheduler;
        }
       @Override
        public void subscribeActual(final Observer<? super T> s) {
            //构造相应的SubscribeOnObserver对象,包装原观察者
            final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
            s.onSubscribe(parent);
            //核心代码: scheduler.scheduleDirect将 source.subscribe(parent)代码交给scheduler调度
            parent.setDisposable(scheduler.scheduleDirect(new Runnable() {
                @Override
                public void run() {
                    source.subscribe(parent);
                }
            }));
        }
    ...
    }
    

    其中的核心代码,是scheduler.scheduleDirect,将source.subscribe(parent)封装到Runnable方法中,交给scheduler调度,实现的订阅方法的线程切换。
    下面我们再看看核心类Scheduler:

    线程切换Scheduler和Worker

    Scheduler

    负责

    /**
     * A {@code Scheduler} is an object that specifies an API for scheduling
     * units of work with or without delays or periodically.
     * You can get common instances of this class in {@link io.reactivex.schedulers.Schedulers}.
     * 负责线程切换,支持延迟和周期任务调度
     */
    public abstract class Scheduler {
        //子类构造任务执行单元Worker
        public abstract Worker createWorker();
        //开始任务
        public void start() {
        }
        //停止任务
        public void shutdown() {
        }
          
        //立即执行任务
        public Disposable scheduleDirect(Runnable run) {
            return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
        }    
        
        //run代码交给Worker调度
        public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
            //取Worker
            final Worker w = createWorker();
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            //Worker任务调度
            w.schedule(new Runnable() {
                @Override
                public void run() {
                    try {
                        decoratedRun.run();
                    } finally {
                        w.dispose();
                    }
                }
            }, delay, unit);
            return w;
        }
    //调度周期性任务
    public Disposable schedulePeriodicallyDirect(Runnable run, long initialDelay, long period, TimeUnit unit) {
            final Worker w = createWorker();
            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);
            Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
            if (d == EmptyDisposable.INSTANCE) {
                return d;
            }
            return periodicTask;
       }
       ....
    }
    

    发现它的调度任务都是通过Worker实现,看看它有哪些东西:

    Worker

    public abstract static class Worker implements Disposable {
         //执行run代码
        public Disposable schedule(Runnable run) {
                return schedule(run, 0L, TimeUnit.NANOSECONDS);
         }
         //子类覆写具体的调度方法 
        public abstract Disposable schedule(Runnable run, long delay, TimeUnit unit);
    ....
    }
    

    主要的线程调度实现类

    IoScheduler

    这个类源码比较长,我们捡重点分析

    /**
     * Scheduler that creates and caches a set of thread pools and reuses them if possible.
     */
    public final class IoScheduler extends Scheduler {
    //CachedWorkerPool线程安全的引用,CachedWorkerPool维护
    final AtomicReference<CachedWorkerPool> pool;
    ...
    //初始化CachedWorkerPool
    public IoScheduler() {
            this.pool = new AtomicReference<CachedWorkerPool>(NONE);
            start();
        }
    
        @Override
        public void start() {
            CachedWorkerPool update = new CachedWorkerPool(KEEP_ALIVE_TIME, KEEP_ALIVE_UNIT);
            if (!pool.compareAndSet(NONE, update)) {
                update.shutdown();
            }
        }
       ...
       @Override
        //关键方法,根据ThreadWorker回收池构造EventLoopWorker
        public Worker createWorker() {
            return new EventLoopWorker(pool.get());
        }
        ....
    }
    

    再来看EventLoopWorker:

     static final class EventLoopWorker extends Scheduler.Worker {
            private final CachedWorkerPool pool;
            private final ThreadWorker threadWorker;
            EventLoopWorker(CachedWorkerPool pool) {
                this.pool = pool;
                this.tasks = new CompositeDisposable();
                //从缓存池中取
                this.threadWorker = pool.get();
            }
          ....
       
            @Override
            public Disposable schedule(Runnable action, long delayTime, TimeUnit unit) {
                if (tasks.isDisposed()) {
                    // don't schedule, we are unsubscribed
                    return EmptyDisposable.INSTANCE;
                }
                //最后调用threadWorker.scheduleActual执行run方法
                return threadWorker.scheduleActual(action, delayTime, unit, tasks);
            }
        }
    

    先看看pool.get()方法:

    static final class CachedWorkerPool implements Runnable{
         //未过期的闲置ThreadWorker队列
          private final ConcurrentLinkedQueue<ThreadWorker> expiringWorkerQueue;
           //执行清理过期ThreadWorker的线程池
           private final ScheduledExecutorService evictorService;
            CachedWorkerPool(long keepAliveTime, TimeUnit unit) {
                this.keepAliveTime = unit != null ? unit.toNanos(keepAliveTime) : 0L;
                this.expiringWorkerQueue = new ConcurrentLinkedQueue<ThreadWorker>();
               ...
                if (unit != null) {
                    evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                     //延时执行清理ThreadWorker方法
                    task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
                }
                evictorService = evictor;
                evictorTask = task;
            }
    
            @Override
            public void run() {
                evictExpiredWorkers();
            }
            
            //从回收池中取ThreadWorker
            ThreadWorker get() {
                if (allWorkers.isDisposed()) {
                    return SHUTDOWN_THREAD_WORKER;
                }
                while (!expiringWorkerQueue.isEmpty()) {
                    ThreadWorker threadWorker = expiringWorkerQueue.poll();
                    if (threadWorker != null) {
                        return threadWorker;
                    }
                }
    
                // No cached worker found, so create a new one.
                ThreadWorker w = new ThreadWorker(WORKER_THREAD_FACTORY);
                allWorkers.add(w);
                return w;
            }
            //回收threadWorker
            void release(ThreadWorker threadWorker) {
                // Refresh expire time before putting worker back in pool
                threadWorker.setExpirationTime(now() + keepAliveTime);
    
                expiringWorkerQueue.offer(threadWorker);
            }
            //清理过期的ThreadWorker
            void evictExpiredWorkers() {
                if (!expiringWorkerQueue.isEmpty()) {
                    long currentTimestamp = now();
    
                    for (ThreadWorker threadWorker : expiringWorkerQueue) {
                        if (threadWorker.getExpirationTime() <= currentTimestamp) {
                            if (expiringWorkerQueue.remove(threadWorker)) {
                                allWorkers.remove(threadWorker);
                            }
                        } else {
                            // Queue is ordered with the worker that will expire first in the beginning, so when we
                            // find a non-expired worker we can stop evicting.
                            break;
                        }
                    }
                }
            }
            ....
    }
    

    CachedWorkerPool维护了一个ThreadWorker回收池,EventLoopWorker从中取ThreadWorker来执行任务。
    我们再回头看threadWorker.scheduleActual方法如何调度任务的,依然是挑重点看:

    public class NewThreadWorker extends Scheduler.Worker implements Disposable {
         //线程池
        private final ScheduledExecutorService executor;
        ...
        public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, TimeUnit unit, DisposableContainer parent) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
            //封装原始的run
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
            if (parent != null) {
                if (!parent.add(sr)) {
                    return sr;
                }
            }
    
            Future<?> f;
            try {
                //runnable交给线程池调度
                if (delayTime <= 0) {
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                parent.remove(sr);
                RxJavaPlugins.onError(ex);
            }
            return sr;
        }
        ...
    }
    

    到目前为止我们走完了IO线程调度的大致流程,下面再看看主线程的调度HandlerScheduler.

    HandlerScheduler

    外部切换主线程使用MainHolder. DEFAULT对象:

    static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
    

    传入绑定MainLooper的Handler对象构造HandlerScheduler。

    final class HandlerScheduler extends Scheduler {
        private final Handler handler;
    
        HandlerScheduler(Handler handler) {
            this.handler = handler;
        }
    
        @Override
        public Disposable scheduleDirect(Runnable run, long delay, TimeUnit unit) {
            if (run == null) throw new NullPointerException("run == null");
            if (unit == null) throw new NullPointerException("unit == null");
    
            run = RxJavaPlugins.onSchedule(run);
            //封装原始的Runnable对象
            ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
            handler.postDelayed(scheduled, Math.max(0L, unit.toMillis(delay)));
            return scheduled;
        }
    
        @Override
        public Worker createWorker() {
            return new HandlerWorker(handler);
        }
        
        //最终的调度类
        private static final class HandlerWorker extends Worker {
            private final Handler handler;
    
            private volatile boolean disposed;
    
            HandlerWorker(Handler handler) {
                this.handler = handler;
            }
    
            @Override
            public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
                if (run == null) throw new NullPointerException("run == null");
                if (unit == null) throw new NullPointerException("unit == null");
    
                if (disposed) {
                    return Disposables.disposed();
                }
    
                run = RxJavaPlugins.onSchedule(run);
    
                ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
    
                Message message = Message.obtain(handler, scheduled);
                message.obj = this; // Used as token for batch disposal of this worker's runnables.
                //发送给主线程执行run
                handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
    
                // Re-check disposed state for removing in case we were racing a call to dispose().
                if (disposed) {
                    handler.removeCallbacks(scheduled);
                    return Disposables.disposed();
                }
    
                return scheduled;
            }
    
            @Override
            public void dispose() {
                disposed = true;
                handler.removeCallbacksAndMessages(this /* token */);
            }
    
            @Override
            public boolean isDisposed() {
                return disposed;
            }
        }
        //封装原始的Runnable对象
        private static final class ScheduledRunnable implements Runnable, Disposable {
            private final Handler handler;
            private final Runnable delegate;
    
            private volatile boolean disposed;
    
            ScheduledRunnable(Handler handler, Runnable delegate) {
                this.handler = handler;
                this.delegate = delegate;
            }
    
            @Override
            public void run() {
                try {
                    delegate.run();
                } catch (Throwable t) {
                    IllegalStateException ie =
                        new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
                    RxJavaPlugins.onError(ie);
                    Thread thread = Thread.currentThread();
                    thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
                }
            }
    
            @Override
            public void dispose() {
                disposed = true;
                handler.removeCallbacks(this);
            }
    
            @Override
            public boolean isDisposed() {
                return disposed;
            }
        }
    }
    

    分析完订阅方法的线程切换,我们可以考虑这个问题:订阅方法连续切换为什么总是以第一次调用为准?
    我的理解如下:
    1.在subscribeActual()里开启了Scheduler的工作,source.subscribe(parent);,从这一句开始切换了线程,所以在这之上的代码都是在切换后的线程里的了。
    2.连续切换时,执行订阅操作时,最上面的切换(scheduleDirect)最后执行,此时source.subscribe(parent)所在的线程变成了最上面的subscribeOn(xxxx)指定的线程
    3.发送数据的方法,最终还是在最上面的source.subscribe(parent)中执行。
    接下来的博客我们继续分析观察者方法的线程调度。

    相关文章

      网友评论

          本文标题:RxJava源码分析(三)线程切换subscribeOn

          本文链接:https://www.haomeiwen.com/subject/ddbsbftx.html