1 - Scheduler在RxJava中使用代码示例
从《1. RxJava源码分析之 --- 订阅过程和线程切换》中我们看到在发送端/接收端切换线程时都用到Scheduler。
发送端使用代码为:
final Scheduler scheduler;
scheduler.scheduleDirect(new SubscribeTask(parent))
接收端使用代码为:
Scheduler.Worker worker = scheduler.createWorker();
...
final Scheduler.Worker worker;
worker.schedule(this);
2 - Scheduler接口
public abstract class Scheduler {
// 创建具体的Worker对象,用于真正的调度执行
public abstract Worker createWorker();
// 启动Scheduler,使Scheduler可以接收Runnable对象执行
public void start() {
}
// 关闭Scheduler
public void shutdown() {
}
// 立刻调度执行Runnable
public Disposable scheduleDirect(@NonNull Runnable run) {
return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
}
// 延迟调度执行Runnable
public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
DisposeTask task = new DisposeTask(decoratedRun, w);
w.schedule(task, delay, unit);
return task;
}
// 延时周期调用Runnable
public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
final Worker w = createWorker();
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);
Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
return periodicTask;
}
public <S extends Scheduler & Disposable> S when(@NonNull Function<Flowable<Flowable<Completable>>, Completable> combine) {
return (S) new SchedulerWhen(combine, this);
}
public abstract static class Worker implements Disposable {
public Disposable schedule(@NonNull Runnable run) {
return schedule(run, 0L, TimeUnit.NANOSECONDS);
}
public abstract Disposable schedule(@NonNull Runnable run, long delay, @NonNull TimeUnit unit);
public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
final SequentialDisposable first = new SequentialDisposable();
final SequentialDisposable sd = new SequentialDisposable(first);
final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
final long periodInNanoseconds = unit.toNanos(period);
final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);
Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
periodInNanoseconds), initialDelay, unit);
if (d == EmptyDisposable.INSTANCE) {
return d;
}
first.replace(d);
return sd;
}
...
// 此处省略其他相关代码,DusoiseTask,PeriodicDirectTask,主要是把Runnable包装为可Disposable Runnable对象,和可以循环执行的Disposable Runnable对象
}
}
从上面代码分析中可知,Scheduler真正处理线程调用逻辑是在Worker类中,且createWorker()方法是abstrct,即Worker对象有具体实现类创建,且对于取消操作和周期调用逻辑通过Worker类中的内部类DusoiseTask,PeriodicDirectTask实现,具体的真正线程调用是虚方法,在具体实现类中处理。
3 - 看一个Scheduler接口的实现类IoScheduler
public final class IoScheduler extends Scheduler {
final AtomicReference<CachedWorkerPool> pool;
// 线程池管理类
static final class CachedWorkerPool implements Runnable {
CachedWorkerPool(long keepAliveTime, TimeUnit unit, ThreadFactory threadFactory) {
...
ScheduledExecutorService evictor = null;
Future<?> task = null;
if (unit != null) {
// 创建线程池
evictor = Executors.newScheduledThreadPool(1, EVICTOR_THREAD_FACTORY);
task = evictor.scheduleWithFixedDelay(this, this.keepAliveTime, this.keepAliveTime, TimeUnit.NANOSECONDS);
}
evictorService = evictor;
evictorTask = task;
}
}
@NonNull
@Override
public Worker createWorker() {
// 创建具体的Worker对象
return new EventLoopWorker(pool.get());
}
// 具体Worker类
static final class EventLoopWorker extends Scheduler.Worker {
private final CompositeDisposable tasks;
private final CachedWorkerPool pool;
private final ThreadWorker threadWorker;
final AtomicBoolean once = new AtomicBoolean();
EventLoopWorker(CachedWorkerPool pool) {
this.pool = pool;
this.tasks = new CompositeDisposable();
this.threadWorker = pool.get();
}
@NonNull
@Override
public Disposable schedule(@NonNull Runnable action, long delayTime, @NonNull TimeUnit unit) {
// 具体的线程切换调用
if (tasks.isDisposed()) {
// don't schedule, we are unsubscribed
return EmptyDisposable.INSTANCE;
}
// 具体延迟调用在NewThreadWorker中
return threadWorker.scheduleActual(action, delayTime, unit, tasks);
}
}
// 线程调用子类
static final class ThreadWorker extends NewThreadWorker {
private long expirationTime;
ThreadWorker(ThreadFactory threadFactory) {
super(threadFactory);
this.expirationTime = 0L;
}
...
}
public class NewThreadWorker extends Scheduler.Worker implements Disposable {
...
// 具体线程切换延迟调用实现
public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
if (parent != null) {
if (!parent.add(sr)) {
return sr;
}
}
Future<?> f;
try {
if (delayTime <= 0) {
f = executor.submit((Callable<Object>)sr);
} else {
f = executor.schedule((Callable<Object>)sr, delayTime, unit);
}
sr.setFuture(f);
} catch (RejectedExecutionException ex) {
if (parent != null) {
parent.remove(sr);
}
RxJavaPlugins.onError(ex);
}
return sr;
}
}
}
从上面代码分析可知,具体的Scheduler实现类,其实就是实现具体的线程池,并实现具体的Worker类,处理任务的延迟和线程切换逻辑。
RxJava源码分析系列文章主题目录:
- 1. RxJava源码分析-----初始篇
- 2. RxJava源码分析之 --- 订阅过程和线程切换
- RxJava源码分析之 --- 操作符
- RxJava源码分析之 --- Backpressure
- RxJava源码分析之 --- hook
网友评论