美文网首页
3-RxJava源码分析之 --- Scheduler

3-RxJava源码分析之 --- Scheduler

作者: 土人徐 | 来源:发表于2018-09-12 10:04 被阅读0次

1 - Scheduler在RxJava中使用代码示例

从《1. RxJava源码分析之 --- 订阅过程和线程切换》中我们看到在发送端/接收端切换线程时都用到Scheduler。
发送端使用代码为:

final Scheduler scheduler;
scheduler.scheduleDirect(new SubscribeTask(parent))

接收端使用代码为:

Scheduler.Worker worker = scheduler.createWorker();
    
    ...
    
final Scheduler.Worker worker;
worker.schedule(this);

2 - Scheduler接口

public abstract class Scheduler {
    // 创建具体的Worker对象,用于真正的调度执行
    public abstract Worker createWorker();
    
    // 启动Scheduler,使Scheduler可以接收Runnable对象执行
    public void start() {

    }
    
    // 关闭Scheduler
    public void shutdown() {

    }
    
    // 立刻调度执行Runnable
    public Disposable scheduleDirect(@NonNull Runnable run) {
        return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
    }
    
    // 延迟调度执行Runnable
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        DisposeTask task = new DisposeTask(decoratedRun, w);

        w.schedule(task, delay, unit);

        return task;
    }
    
    // 延时周期调用Runnable
    public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
        final Worker w = createWorker();

        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

        PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);

        Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
        if (d == EmptyDisposable.INSTANCE) {
            return d;
        }

        return periodicTask;
    }
    
    
    public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flowable<Completable>>, Completable> combine) {
        return (S) new SchedulerWhen(combine, this);
    }
    
    
    public abstract static class Worker implements Disposable {
        public Disposable schedule(@NonNull Runnable run) {
            return schedule(run, 0L, TimeUnit.NANOSECONDS);
        }
        
        public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
        
        public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
            final SequentialDisposable first = new SequentialDisposable();

            final SequentialDisposable sd = new SequentialDisposable(first);

            final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);

            final long periodInNanoseconds = unit.toNanos(period);
            final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
            final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);

            Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
                    periodInNanoseconds), initialDelay, unit);

            if (d == EmptyDisposable.INSTANCE) {
                return d;
            }
            first.replace(d);

            return sd;
        }
        
        ...
        
        // 此处省略其他相关代码,DusoiseTask,PeriodicDirectTask,主要是把Runnable包装为可Disposable Runnable对象,和可以循环执行的Disposable Runnable对象
    }
}

从上面代码分析中可知,Scheduler真正处理线程调用逻辑是在Worker类中,且createWorker()方法是abstrct,即Worker对象有具体实现类创建,且对于取消操作和周期调用逻辑通过Worker类中的内部类DusoiseTask,PeriodicDirectTask实现,具体的真正线程调用是虚方法,在具体实现类中处理。

3 - 看一个Scheduler接口的实现类IoScheduler

public final class IoScheduler extends Scheduler {
    final AtomicReference<CachedWorkerPool> pool;
    
    // 线程池管理类
    static final class CachedWorkerPool implements Runnable {
        CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
            ... 
            
            ScheduledExecutorService evictor = null;
            Future<?> task = null;
            if (unit != null) {
                // 创建线程池
                evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
                task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
            }
            evictorService = evictor;
            evictorTask = task;
        }
    }
    
    @NonNull
    @Override
    public Worker createWorker() {
        // 创建具体的Worker对象
        return new EventLoopWorker(pool.get());
    }

    // 具体Worker类
    static final class EventLoopWorker extends Scheduler.Worker {
        private final CompositeDisposable tasks;
        private final CachedWorkerPool pool;
        private final ThreadWorker threadWorker;

        final AtomicBoolean once = new AtomicBoolean();

        EventLoopWorker(CachedWorkerPool pool) {
            this.pool = pool;
            this.tasks = new CompositeDisposable();
            this.threadWorker = pool.get();
        }

        @NonNull
        @Override
        public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
            // 具体的线程切换调用
            if (tasks.isDisposed()) {
                // don't schedule, we are unsubscribed
                return EmptyDisposable.INSTANCE;
            }
            
            // 具体延迟调用在NewThreadWorker中
            return threadWorker.scheduleActual(action, delayTime, unit, tasks);
        }
    }
    
    // 线程调用子类
    static final class ThreadWorker extends NewThreadWorker {
        private long expirationTime;

        ThreadWorker(ThreadFactory threadFactory) {
            super(threadFactory);
            this.expirationTime = 0L;
        }
        ...
    }
    
    
    public class NewThreadWorker extends Scheduler.Worker implements Disposable {
        ...
        
        // 具体线程切换延迟调用实现
        public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
            Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
            ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
    
            if (parent != null) {
                if (!parent.add(sr)) {
                    return sr;
                }
            }
    
            Future<?> f;
            try {
                if (delayTime <= 0) {
                    f = executor.submit((Callable<Object>)sr);
                } else {
                    f = executor.schedule((Callable<Object>)sr, delayTime, unit);
                }
                sr.setFuture(f);
            } catch (RejectedExecutionException ex) {
                if (parent != null) {
                    parent.remove(sr);
                }
                RxJavaPlugins.onError(ex);
            }
    
            return sr;
        }
    }
}

从上面代码分析可知,具体的Scheduler实现类,其实就是实现具体的线程池,并实现具体的Worker类,处理任务的延迟和线程切换逻辑。

RxJava源码分析系列文章主题目录:

相关文章

网友评论

      本文标题:3-RxJava源码分析之 --- Scheduler

      本文链接:https://www.haomeiwen.com/subject/khuewftx.html