美文网首页
RxJava之timer和interval操作符源码解析

RxJava之timer和interval操作符源码解析

作者: 103style | 来源:发表于2019-05-20 10:22 被阅读0次

转载请以链接形式标明出处:
本文出自:103style的博客

timer 操作符

  • timer 操作符实际上返回的是一个 ObservableTimer对象。两个参数的方法默认在 Schedulers.computation()中工作。

     public static Observable<Long> timer(long delay, TimeUnit unit) {
         return timer(delay, unit, Schedulers.computation());
     }
     public static Observable<Long> timer(long delay, TimeUnit unit, Scheduler scheduler) {
         return RxJavaPlugins.onAssembly(new ObservableTimer(Math.max(delay, 0L), unit, scheduler));
     }
    
  • ObservableTimer 源码:

    • 构建了 TimerObserver 对象。
    • 执行 观察者 的 onSubscribe 方法。
    • 通过scheduler.scheduleDirect(ios, delay, unit) 返回一个 Disposable 对象。
    • 将返回的 Disposable 对象传给 TimerObserver 对象的 setResource 方法
    public final class ObservableTimer extends Observable<Long> {
        final Scheduler scheduler;
        final long delay;
        final TimeUnit unit;
        public ObservableTimer(long delay, TimeUnit unit, Scheduler scheduler) {
            this.delay = delay;
            this.unit = unit;
            this.scheduler = scheduler;
        }
    
        @Override
        public void subscribeActual(Observer<? super Long> observer) {
            TimerObserver ios = new TimerObserver(observer);
            observer.onSubscribe(ios);
            Disposable d = scheduler.scheduleDirect(ios, delay, unit);
            ios.setResource(d);
        }
        ...
    }
    
  • TimerObserver对象源码:

    static final class TimerObserver extends AtomicReference<Disposable>
    implements Disposable, Runnable {
    
        final Observer<? super Long> downstream;
    
        TimerObserver(Observer<? super Long> downstream) {
            this.downstream = downstream;
        }
        ...
        @Override
        public void run() {
            if (!isDisposed()) {
                downstream.onNext(0L);
                lazySet(EmptyDisposable.INSTANCE);
                downstream.onComplete();
            }
        }
    
        public void setResource(Disposable d) {
            DisposableHelper.trySet(this, d);
        }
    }
    
  • 首先看 TimerObserversetResource(Disposable d)方法 里的 DisposableHelper.trySet(this, d);

    public static boolean trySet(AtomicReference<Disposable> field, Disposable d) {
        if (!field.compareAndSet(null, d)) {
            if (field.get() == DISPOSED) {
                d.dispose();
            }
            return false;
        }
        return true;
    }
    
    • d 不为 null,直接 return true;否则判断 是否为 DISPOSED 状态,是的话调用传进来的 Disposable 对象(也就是之前 Scheduler 构建的 DisposeTask 对象)的 dispose 方法。
  • scheduler.scheduleDirect(ios, delay, unit) 方法:

    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        DisposeTask task = new DisposeTask(decoratedRun, w);
        w.schedule(task, delay, unit);
        return task;
    }
    
    • 首先创建了一个 Worker,因为默认是 Schedulers.computation()中工作,查看源码可知 实际调用的是 ComputationSchedulercreateWorker 方法 。
      Schedulers

      ...
      static final class ComputationHolder {
          static final Scheduler DEFAULT = new ComputationScheduler();
      }
      ...
      static {
          COMPUTATION = RxJavaPlugins.initComputationScheduler(new ComputationTask());
          ...
      }
      
      static final class ComputationTask implements Callable<Scheduler> {
          @Override
          public Scheduler call() throws Exception {
              return ComputationHolder.DEFAULT;
          }
      }
      

      RxJavaPlugins

      public static Scheduler initComputationScheduler(@NonNull Callable<Scheduler> defaultScheduler) {
          ObjectHelper.requireNonNull(defaultScheduler, "Scheduler Callable can't be null");
          Function<? super Callable<Scheduler>, ? extends Scheduler> f = onInitComputationHandler;
          if (f == null) {
              return callRequireNonNull(defaultScheduler);
          }
          return applyRequireNonNull(f, defaultScheduler); // JIT will skip this
      }
      
      static Scheduler callRequireNonNull(@NonNull Callable<Scheduler> s) {
          try {
              return ObjectHelper.requireNonNull(s.call(), "Scheduler Callable result can't be null");
          } catch (Throwable ex) {
              throw ExceptionHelper.wrapOrThrow(ex);
          }
      }
      

      f 默认为 null,所以返回的是 callRequireNonNull(defaultScheduler),然后实际调用的是 ComputationTaskcall 方法。返回的即为 ComputationScheduler 对象。

    • ComputationSchedulercreateWorker 方法 。

      public ComputationScheduler() {
          this(THREAD_FACTORY);
      }
      
      public ComputationScheduler(ThreadFactory threadFactory) {
          this.threadFactory = threadFactory;
          this.pool = new AtomicReference<FixedSchedulerPool>(NONE);
          start();
      }
      
      public Worker createWorker() {
          return new EventLoopWorker(pool.get().getEventLoop());
      }
      
      • pool.get()通过构造函数我们可知返回的为 NONE = new FixedSchedulerPool(0, THREAD_FACTORY); 所以 pool.get().getEventLoop() 返回的为 SHUTDOWN_WORKER = new PoolWorker(new RxThreadFactory("RxComputationShutdown"));。实际上是创建了一个 executorExecutors.newScheduledThreadPool(1, factory) ,即 factoryRxThreadFactory("RxComputationShutdown")单线程线程池对象PoolWorker对象
        FixedSchedulerPool(int maxThreads, ThreadFactory threadFactory) {
            this.cores = maxThreads;
            this.eventLoops = new PoolWorker[maxThreads];
            for (int i = 0; i < maxThreads; i++) {
                this.eventLoops[i] = new PoolWorker(threadFactory);
            }
        }
        public PoolWorker getEventLoop() {
            int c = cores;
            if (c == 0) {
                return SHUTDOWN_WORKER;
            }
            return eventLoops[(int)(n++ % c)];
        }
        
      • 所以createWorker 返回的是:poolWorkerfactoryRxThreadFactory("RxComputationShutdown")单线程线程池对象PoolWorker对象
        static final class EventLoopWorker extends Scheduler.Worker {
            private final ListCompositeDisposable serial;
            private final CompositeDisposable timed;
            private final ListCompositeDisposable both;
            private final PoolWorker poolWorker;
        
            volatile boolean disposed;
        
            EventLoopWorker(PoolWorker poolWorker) {
                this.poolWorker = poolWorker;
                this.serial = new ListCompositeDisposable();
                this.timed = new CompositeDisposable();
                this.both = new ListCompositeDisposable();
                this.both.add(serial);
                this.both.add(timed);
            }
        ...
        
    • decoratedRun 即为 TimerObserver 对象。

      public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
          final Worker w = createWorker();
          final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
          DisposeTask task = new DisposeTask(decoratedRun, w);
          w.schedule(task, delay, unit);
          return task;
      }
      
    • 然后构建了一个 DisposeTask 对象。

      static final class DisposeTask implements Disposable, Runnable, SchedulerRunnableIntrospection {
          final Runnable decoratedRun;
          final Worker w;
          Thread runner;
      
          DisposeTask(@NonNull Runnable decoratedRun, @NonNull Worker w) {
              this.decoratedRun = decoratedRun;
              this.w = w;
          }
      
          @Override
          public void run() {
              runner = Thread.currentThread();
              try {
                  decoratedRun.run();
              } finally {
                  dispose();
                  runner = null;
              }
          }
          ...
      }
      
    • createWorker 返回的 poolWorkerfactoryRxThreadFactory("RxComputationShutdown")单线程线程池对象PoolWorker对象,并执行 schedule 方法。
      实际上是执行了 单线程线程池对象 Executors.newScheduledThreadPool(1, factory)schedule(task, delayTime, unit)方法,并将返回值 Future 对象 传给ScheduledRunnablesetFuture 方法。

      public Disposable schedule(@NonNull final Runnable action, long delayTime, @NonNull TimeUnit unit) {
          if (disposed) {
              return EmptyDisposable.INSTANCE;
          }
          return scheduleActual(action, delayTime, unit, null);
      }
      
      public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
          Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
          ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
          ...
          Future<?> f;
          try {
              if (delayTime <= 0) {
                  f = executor.submit((Callable<Object>)sr);
              } else {
                  f = executor.schedule((Callable<Object>)sr, delayTime, unit);
              }
              sr.setFuture(f);
          } catch (RejectedExecutionException ex) {
              ...
              RxJavaPlugins.onError(ex);
          }
          return sr;
      }
      
  • 线程池的schedule(task, delayTime, unit) 方法实际时延时 delayTime 执行 taskrun 方法。即为 执行 TimerObserver 对象的 run 方法。

    public void subscribeActual(Observer<? super Long> observer) {
        TimerObserver ios = new TimerObserver(observer);
        observer.onSubscribe(ios);
        Disposable d = scheduler.scheduleDirect(ios, delay, unit);
        ios.setResource(d);
    }
    
  • TimerObserver 对象的 run 方法: 即执行了 观察者onNext(0L)onComplete()

    public void run() {
        if (!isDisposed()) {
            downstream.onNext(0L);
            lazySet(EmptyDisposable.INSTANCE);
            downstream.onComplete();
        }
    }
    

interval 系列操作符

  • interval系列 包含 intervalintervalRange两个操作符,包含以下 6 个方法:

    • interval(long period, TimeUnit unit)
    • interval(long initialDelay, long period, TimeUnit unit)
    • interval(long period, TimeUnit unit, Scheduler scheduler)
    • interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler)
    • intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit)
    • intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler)

    分别返回的是 ObservableIntervalObservableIntervalRange 对象,默认的 SchedulerSchedulers.computation()

public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit) {
    return interval(initialDelay, period, unit, Schedulers.computation());
}
public static Observable<Long> interval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
    return RxJavaPlugins.onAssembly(new ObservableInterval(Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
}
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit) {
    return intervalRange(start, count, initialDelay, period, unit, Schedulers.computation());
}
public static Observable<Long> intervalRange(long start, long count, long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
    return RxJavaPlugins.onAssembly(new ObservableIntervalRange(start, end, Math.max(0L, initialDelay), Math.max(0L, period), unit, scheduler));
}
  • ObservableInterval 源码:

    • 构建了 IntervalObserver 对象。
    • 因为默认Schedulers.computation() 所以 sch instanceof TrampolineScheduler不成立,除非我们手动传参 SchedulerSchedulers.trampoline()
    • 和前面的 ObservableTimer类似, 即为调用 ObservableIntervalrun 方法。只是返回的为PeriodicDirectTask对象。
    • setResourceObservableTimer类似,就不再赘述了。
    public final class ObservableInterval extends Observable<Long> {
        final Scheduler scheduler;
        final long initialDelay;
        final long period;
        final TimeUnit unit;
    
        public ObservableInterval(long initialDelay, long period, TimeUnit unit, Scheduler scheduler) {
            this.initialDelay = initialDelay;
            this.period = period;
            this.unit = unit;
            this.scheduler = scheduler;
        }
    
        @Override
        public void subscribeActual(Observer<? super Long> observer) {
            IntervalObserver is = new IntervalObserver(observer);
            observer.onSubscribe(is);
    
            Scheduler sch = scheduler;
            if (sch instanceof TrampolineScheduler) {
                Worker worker = sch.createWorker();
                is.setResource(worker);
                worker.schedulePeriodically(is, initialDelay, period, unit);
            } else {
                Disposable d = sch.schedulePeriodicallyDirect(is, initialDelay, period, unit);
                is.setResource(d);
            }
        }
        ...
    }
    
  • sch.schedulePeriodicallyDirect(is, initialDelay, period, unit) 实际调用的为 schedulePeriodically方法:

    • interval 的间隔时间转化为 Nanoseconds
    • 然后设置 第一次的 响应时间为 当前时间+ 间隔时间 的 纳秒数。
    • 里面又将 PeriodicDirectTask对象 包装成 PeriodicTask 对象。
    public Disposable schedulePeriodicallyDirect(@NonNull Runnable run, long initialDelay, long period, @NonNull TimeUnit unit) {
        final Worker w = createWorker();
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
        PeriodicDirectTask periodicTask = new PeriodicDirectTask(decoratedRun, w);
        Disposable d = w.schedulePeriodically(periodicTask, initialDelay, period, unit);
        if (d == EmptyDisposable.INSTANCE) {
            return d;
        }
        return periodicTask;
    }
    
    public Disposable schedulePeriodically(@NonNull Runnable run, final long initialDelay, final long period, @NonNull final TimeUnit unit) {
        final SequentialDisposable first = new SequentialDisposable();
        final SequentialDisposable sd = new SequentialDisposable(first);
        final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
    
        final long periodInNanoseconds = unit.toNanos(period);
        final long firstNowNanoseconds = now(TimeUnit.NANOSECONDS);
        final long firstStartInNanoseconds = firstNowNanoseconds + unit.toNanos(initialDelay);
    
        Disposable d = schedule(new PeriodicTask(firstStartInNanoseconds, decoratedRun, firstNowNanoseconds, sd,
                periodInNanoseconds), initialDelay, unit);
        if (d == EmptyDisposable.INSTANCE) {
            return d;
        }
        first.replace(d);
        return sd;
    }
    
  • PeriodicTask 对象的 run 方法

    • decoratedRun.run(); 又调用了 PeriodicDirectTask对象的 run 方法.
    • run 方法的最后 sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS)); 这里又 重复执行 这个任务,直到 IntervalObserver对象 isDisposed()true
     final class PeriodicTask implements Runnable, SchedulerRunnableIntrospection {
        final Runnable decoratedRun;
        final SequentialDisposable sd;
        final long periodInNanoseconds;
        long count;
        long lastNowNanoseconds;
        long startInNanoseconds;
    
        PeriodicTask(long firstStartInNanoseconds, @NonNull Runnable decoratedRun,
                long firstNowNanoseconds, @NonNull SequentialDisposable sd, long periodInNanoseconds) {
            this.decoratedRun = decoratedRun;
            this.sd = sd;
            this.periodInNanoseconds = periodInNanoseconds;
            lastNowNanoseconds = firstNowNanoseconds;
            startInNanoseconds = firstStartInNanoseconds;
        }
    
        @Override
        public void run() {
            decoratedRun.run();
            if (!sd.isDisposed()) {
                long nextTick;
                long nowNanoseconds = now(TimeUnit.NANOSECONDS);
                // If the clock moved in a direction quite a bit, rebase the repetition period
                if (nowNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS < lastNowNanoseconds
                        || nowNanoseconds >= lastNowNanoseconds + periodInNanoseconds + CLOCK_DRIFT_TOLERANCE_NANOSECONDS) {
                    nextTick = nowNanoseconds + periodInNanoseconds;
                    /*
                     * Shift the start point back by the drift as if the whole thing
                     * started count periods ago.
                     */
                    startInNanoseconds = nextTick - (periodInNanoseconds * (++count));
                } else {
                    nextTick = startInNanoseconds + (++count * periodInNanoseconds);
                }
                lastNowNanoseconds = nowNanoseconds;
    
                long delay = nextTick - nowNanoseconds;
                sd.replace(schedule(this, delay, TimeUnit.NANOSECONDS));
            }
        }
        ...
    }
    
  • PeriodicDirectTaskrun 方法:

    • 实际调用的即为IntervalObserverrun()
    static final class PeriodicDirectTask
    implements Disposable, Runnable, SchedulerRunnableIntrospection {
        final Runnable run;
        final Worker worker;
        volatile boolean disposed;
    
        PeriodicDirectTask(@NonNull Runnable run, @NonNull Worker worker) {
            this.run = run;
            this.worker = worker;
        }
    
        @Override
        public void run() {
            if (!disposed) {
                try {
                    run.run();
                } catch (Throwable ex) {
                    Exceptions.throwIfFatal(ex);
                    worker.dispose();
                    throw ExceptionHelper.wrapOrThrow(ex);
                }
            }
        }
        ...
    }
    
  • IntervalObserverrun()

    • 调用 观察者onNext 方法
    static final class IntervalObserver
    extends AtomicReference<Disposable>
    implements Disposable, Runnable {
        final Observer<? super Long> downstream;
        long count;
    
        IntervalObserver(Observer<? super Long> downstream) {
            this.downstream = downstream;
        }
    
        @Override
        public void run() {
            if (get() != DisposableHelper.DISPOSED) {
                downstream.onNext(count++);
            }
        }
    }
    
  • 然后直到我们直接调用 dispose() 方法结束流程。

以上

相关文章

网友评论

      本文标题:RxJava之timer和interval操作符源码解析

      本文链接:https://www.haomeiwen.com/subject/grphzqtx.html