美文网首页
RxJava(2):线程切换原理

RxJava(2):线程切换原理

作者: 壹元伍角叁分 | 来源:发表于2021-07-07 16:37 被阅读0次

    一、subscribeOn(Schedulers.io())原理

      Observable
                    .create(new ObservableOnSubscribe<String>() {
                        @Override
                        public void subscribe(ObservableEmitter<String> e) throws Exception {
    
                        }
                    })
                    .subscribeOn(Schedulers.io())
                    .subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(String s) {
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    

    源码分析

    1. Schedulers.io(),表明上面的代码执行在子线程中

      IO = RxJavaPlugins.initIoScheduler(new IOTask());
      
      static final class IOTask implements Callable<Scheduler> {
          @Override
          public Scheduler call() throws Exception {
              return IoHolder.DEFAULT;
          }
      }
      
      static final class IoHolder {
          static final Scheduler DEFAULT = new IoScheduler();
      }
      
      public IoScheduler() {
          this(WORKER_THREAD_FACTORY);
      }
      
      public IoScheduler(ThreadFactory threadFactory) {
          this.threadFactory = threadFactory;
          this.pool = new AtomicReference<CachedWorkerPool>(NONE);
          start();
      }
      
    2. .subscribeOn(Schedulers.io()),ObservableCreate和Schedulers.io()作为参数,new出一个ObservableSubscribeOn对象,返回

      public final Observable<T> subscribeOn(Scheduler scheduler) {
          ObjectHelper.requireNonNull(scheduler, "scheduler is null");
          return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
      }
      

      ObservableSubscribeOn中source就是ObservableCreate

      public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
          final Scheduler scheduler;
      
          public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
              super(source);
              this.scheduler = scheduler;
          }
      }
      
    3. .subscribe(new Observer<String>())

      .subscribe(new Observer<Boolean>() {
          @Override
          public void onSubscribe(Disposable d) {
              Log.d(TAG, "onSubscribe: 终点的监听执行 onSubscribe");
          }
      
          @Override
          public void onNext(Boolean aBoolean) {
              Log.d(TAG, "onNext: 终点的监听执行 onNext aBoolean=" + aBoolean);
          }
      
          @Override
          public void onError(Throwable e) {
      
          }
      
          @Override
          public void onComplete() {
              Log.d(TAG, "onComplete: 终点的监听执行 onComplete");
          }
      });
      

      将终点的监听作为参数传入,subscribe()是ObservableSubscribeOn父类Observable的方法。实际调用的是ObservableSubscribeOn.subscribeActual(observer)方法

      @Override
      public void subscribeActual(final Observer<? super T> s) {
          //将终点observer封装一层SubscribeOnObserver
          final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
          //先调用终点observer的onSubscribe方法
          s.onSubscribe(parent);
          //下面代码重点是这个scheduler.scheduleDirect(new SubscribeTask(parent))
          parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
      }
      
    4. scheduler.scheduleDirect(new SubscribeTask(parent))

      1. 首先是创建了一个SubscribeTask对象,他是实现了runnable接口的,看下它的run方法,调用了source.subscribe(parent),source就是obserableCreate,所以它的run方法是调用了obserableCreate.subscribe(parent)。

        final class SubscribeTask implements Runnable {
            private final SubscribeOnObserver<T> parent;
        
            SubscribeTask(SubscribeOnObserver<T> parent) {
                this.parent = parent;
            }
        
            @Override
            public void run() {
                source.subscribe(parent);
            }
        }
        
      2. scheduler就是IoScheduler。scheduler.scheduleDirect(),scheduleDirect是IoScheduler父类Scheduler的方法,方法中通过createWorker()创建了一个Worker对象,createWorker()具体实现是在IoScheduler中。IoScheduler的createWorker()创建并返回了一个EventLoopWorker对象。

      public Disposable scheduleDirect(@NonNull Runnable run) {
          return scheduleDirect(run, 0L, TimeUnit.NANOSECONDS);
      }
      
      public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
          //createWorker()具体实现是在IoScheduler中。IoScheduler的createWorker()创建并返回了一个EventLoopWorker对象。
          final Worker w = createWorker();
          //继续封装
          final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
          DisposeTask task = new DisposeTask(decoratedRun, w);
          //正在开始执行了,EventLoopWorker.schedule(task, delay, unit);
          w.schedule(task, delay, unit);
      
          return task;
      }
      
      @Override
      public Disposable schedule(@NonNull Runnable action) {
          if (disposed) {
              return EmptyDisposable.INSTANCE;
          }
      
          return poolWorker.scheduleActual(action, 0, TimeUnit.MILLISECONDS, serial);
      }
      
      @NonNull
      public ScheduledRunnable scheduleActual(final Runnable run, long delayTime, @NonNull TimeUnit unit, @Nullable DisposableContainer parent) {
          Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
          //再封装
          ScheduledRunnable sr = new ScheduledRunnable(decoratedRun, parent);
      
          if (parent != null) {
              if (!parent.add(sr)) {
                  return sr;
              }
          }
      
          Future<?> f;
          //这里交由线程池处理,obserableCreate.subscribe(parent)就运行在了子线程中。
          try {
              if (delayTime <= 0) {
                  f = executor.submit((Callable<Object>)sr);
              } else {
                  f = executor.schedule((Callable<Object>)sr, delayTime, unit);
              }
              sr.setFuture(f);
          } catch (RejectedExecutionException ex) {
              if (parent != null) {
                  parent.remove(sr);
              }
              RxJavaPlugins.onError(ex);
          }
      
          return sr;
      }
      

    代码执行流程:

    1. observable.create传入了自定义source,返回一个obserableCreate对象;
    2. obserableCreate.subscribeOn(Schedulers.io())。Schedulers.io()返回的是一个IoScheduler对象,将其作为参数。在subscribeOn方法中,返回了一个ObservableSubscribeOn对象,将obserableCreate(即source)和IoScheduler(即scheduler)传入;
    3. ObservableSubscribeOn.subscribe(observer),传入一个自定义的终点监听observer。
    4. 在ObservableSubscribeOn.subscribeActual(observer)方法中,将传入的终点监听封装了一层,将终点observer对象作为参数,定义了一个SubscribeOnObserver对象(下面的parent)。然后先调用了终点observer的onSubscribe(parent),参数是刚定义的SubscribeOnObserver对象。
    5. 接着又调用了IoScheduler.scheduleDirect(new SubscribeTask(parent))。这个方法很重要。首先是创建了一个SubscribeTask对象,他是实现了runnable接口的,看下它的run方法,调用了source.subscribe(parent),source就是obserableCreate,所以它的run方法是调用了obserableCreate.subscribe(parent)。
    6. scheduleDirect是IoScheduler父类Scheduler的方法,方法中通过createWorker()创建了一个Worker对象,createWorker()具体实现是在IoScheduler中。IoScheduler的createWorker()创建并返回了一个EventLoopWorker对象。
    7. EventLoopWorker.schedule()方法中threadWorker.scheduleActual(),scheduleActual方法中继续将SubscribeTask进行封装一层,然后交由线程池去处理。这样就obserableCreate.subscribe(parent)就运行在了子线程中。

    二、observeOn(AndroidSchedulers.mainThread())原理

     Observable
                    .create(new ObservableOnSubscribe<String>() {
                        @Override
                        public void subscribe(ObservableEmitter<String> e) throws Exception {
    
                        }
                    })
                    .observeOn(AndroidSchedulers.mainThread())
                    .subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
    
                        }
    
                        @Override
                        public void onNext(String s) {
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onComplete() {
    
                        }
                    });
    

    源码分析

    observeOn(AndroidSchedulers.mainThread()),表明下面的代码执行在主线程中

    订阅过程

    1. AndroidSchedulers.mainThread(),返回的是HandlerScheduler对象,内部维护了一个主线程的Handler

      public final class AndroidSchedulers {
      
         private static final class MainHolder {
              //返回的是HandlerScheduler对象,内部维护了一个主线程的Handler
              static final Scheduler DEFAULT = new HandlerScheduler(new Handler(Looper.getMainLooper()));
          }
      
        private static final Scheduler MAIN_THREAD = RxAndroidPlugins.initMainThreadScheduler(
                  new Callable<Scheduler>() {
                      @Override public Scheduler call() throws Exception {
                          return MainHolder.DEFAULT;
                      }
                  });
                  
        public static Scheduler mainThread() {
               return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
         }
       }
      
      final class HandlerScheduler extends Scheduler {
          private final Handler handler;
      
          HandlerScheduler(Handler handler) {
              this.handler = handler;
          }
      }
      
    2. .observeOn(AndroidSchedulers.mainThread()),HandlerScheduler作为参数,返回ObservableObserveOn对象

      public abstract class Observable<T> implements ObservableSource<T> {
      
          public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
              ObjectHelper.requireNonNull(scheduler, "scheduler is null");
              ObjectHelper.verifyPositive(bufferSize, "bufferSize");
              return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
          }
      }
      

      ObservableObserveOn中scheduler就是HandlerScheduler

      public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
          final Scheduler scheduler;//HandlerScheduler,内部维护了一个主线程的Handler
          final boolean delayError;
          final int bufferSize;
          public ObservableObserveOn(ObservableSource<T> source, Scheduler scheduler, boolean delayError, int bufferSize) {
              super(source);
              this.scheduler = scheduler;
              this.delayError = delayError;
              this.bufferSize = bufferSize;
          }
      }
      
    3. .subscribe(new Observer<String>())

      .subscribe(new Observer<Boolean>() {
          @Override
          public void onSubscribe(Disposable d) {
              Log.d(TAG, "onSubscribe: 终点的监听执行 onSubscribe");
          }
      
          @Override
          public void onNext(Boolean aBoolean) {
              Log.d(TAG, "onNext: 终点的监听执行 onNext aBoolean=" + aBoolean);
          }
      
          @Override
          public void onError(Throwable e) {
      
          }
      
          @Override
          public void onComplete() {
              Log.d(TAG, "onComplete: 终点的监听执行 onComplete");
          }
      });
      

      将终点的监听作为参数传入,subscribe()是ObservableObserveOn父类Observable的方法,ObservableObserveOn没有重写subscribe()。而实际调用的是subscribeActual(),ObservableObserveOn重写了,所以走的是ObservableObserveOn.subscribeActual(observer)方法;

      public final void subscribe(Observer<? super T> observer) {
          ObjectHelper.requireNonNull(observer, "observer is null");
          try {
              observer = RxJavaPlugins.onSubscribe(this, observer);
      
              ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
      
              subscribeActual(observer);//这句是重点
          } catch (NullPointerException e) { // NOPMD
              throw e;
          } catch (Throwable e) {
              Exceptions.throwIfFatal(e);
              RxJavaPlugins.onError(e);
      
              NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
              npe.initCause(e);
              throw npe;
          }
      }
      

      再来看下ObservableObserveOn中subscribeActual的具体实现,实际调用的又是source.subscribe(),这个source就是前面保存的ObservableCreate对象。所以实际调用的是ObservableCreate.subscribe()。

      public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
      
         @Override
          protected void subscribeActual(Observer<? super T> observer) {
              if (scheduler instanceof TrampolineScheduler) {
                  source.subscribe(observer);
              } else {
                 //走的是这个。HandlerScheduler.createWorker(),返回HandlerWorker对象
                  Scheduler.Worker w = scheduler.createWorker();
                 //source就是自定义source,又包了一层,将终点observer和HandlerWorker作为参数
                  source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
              }
          }
      }
      

      HandlerScheduler.createWorker(),返回HandlerWorker对象。HandlerScheduler中的handler前面已经定义了,是一个主线程的Handler

      final class HandlerScheduler extends Scheduler {
         @Override
         public Worker createWorker() {
             return new HandlerWorker(handler);
         }
      }
      

      source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize))。source就是自定义source,又包了一层,将终点observer和HandlerWorker作为参数

      public final class ObservableCreate<T> extends Observable<T> {
         @Override
         protected void subscribeActual(Observer<? super T> observer) {
             CreateEmitter<T> parent = new CreateEmitter<T>(observer);//ObserveOnObserver,这边封了一层箱
             observer.onSubscribe(parent);//调用了终点的监听的onSubscribe方法
      
             try {
                 source.subscribe(parent);//这边调用的是我们自定义source的subscribe方法
             } catch (Throwable ex) {
                 Exceptions.throwIfFatal(ex);
                 parent.onError(ex);
             }
         }
      }
      

    响应事件过程

    1. 在自定义source中模拟调用onNext方法

      new ObservableOnSubscribe<String>() {
          @Override
          public void subscribe(ObservableEmitter<String> observableEmitter) throws Exception {
              //observableEmitter就是CreateEmitter
              observableEmitter.onNext("测试onNext()");
          }
      }
      
    2. CreateEmitter.onNext(T t)内部又继续调用ObserveOnObserver.onNext(t);

      static final class CreateEmitter<T> extends AtomicReference<Disposable> implements ObservableEmitter<T>, Disposable {
      
          final Observer<? super T> observer; // ObserveOnObserver
      
          CreateEmitter(Observer<? super T> observer) {
              this.observer = observer;
          }
      
          @Override
          public void onNext(T t) {
              if (t == null) {
                  onError(new NullPointerException("onNext called with null. Null values are generally not allowed in 2.x operators and sources."));
                  return;
              }
              if (!isDisposed()) {
                  observer.onNext(t);//ObserveOnObserver.onNext(t);
              }
          }
      
    3. ObserveOnObserver.onNext(T t)

      static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
          final Observer<? super T> actual;//终点observer
          final Scheduler.Worker worker;//HandlerWorker
          
          ObserveOnObserver(Observer<? super T> actual, Scheduler.Worker worker, boolean delayError, int bufferSize) {
              this.actual = actual;
              this.worker = worker;
              this.delayError = delayError;
              this.bufferSize = bufferSize;
          }
      
          @Override
          public void onNext(T t) {
              if (done) {
                  return;
              }
      
              if (sourceMode != QueueDisposable.ASYNC) {
                  queue.offer(t);
              }
              schedule();
          }
          
            void schedule() {
                  if (getAndIncrement() == 0) {
                      worker.schedule(this);//HandlerWorker.schedule(this)
                  }
              }
      }
      
    4. HandlerWorker.schedule(this),向主线程中发送消息,执行ScheduledRunnable的run方法

      private static final class HandlerWorker extends Worker {
          private final Handler handler;//是一个主线程的Handler
      
          HandlerWorker(Handler handler) {
              this.handler = handler;
          }
      
          @Override
          public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
              if (run == null) throw new NullPointerException("run == null");
              if (unit == null) throw new NullPointerException("unit == null");
      
              if (disposed) {
                  return Disposables.disposed();
              }
      
              run = RxJavaPlugins.onSchedule(run);
      
              //handler:主线程的Handler。run:ObserveOnObserver
              ScheduledRunnable scheduled = new ScheduledRunnable(handler, run);
      
              //创建
              Message message = Message.obtain(handler, scheduled);
              message.obj = this; // Used as token for batch disposal of this worker's runnables.
      
              handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
      
              // Re-check disposed state for removing in case we were racing a call to dispose().
              if (disposed) {
                  handler.removeCallbacks(scheduled);
                  return Disposables.disposed();
              }
      
              return scheduled;
          }
      }
      
    5. ScheduledRunnable.run(),run()又调用了ObserveOnObserver.run()

       private static final class ScheduledRunnable implements Runnable, Disposable {
              private final Handler handler;
              private final Runnable delegate;
      
              ScheduledRunnable(Handler handler, Runnable delegate) {
                  this.handler = handler;
                  this.delegate = delegate;
              }
      
              @Override
              public void run() {
                  try {
                      delegate.run();//ObserveOnObserver.run()
                  } catch (Throwable t) {
                      IllegalStateException ie =
                          new IllegalStateException("Fatal Exception thrown on Scheduler.", t);
                      RxJavaPlugins.onError(ie);
                      Thread thread = Thread.currentThread();
                      thread.getUncaughtExceptionHandler().uncaughtException(thread, ie);
                  }
              }
      }
      
    6. ObserveOnObserver.run();

      static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T> implements Observer<T>, Runnable {
         @Override
         public void run() {
             if (outputFused) {
                 drainFused();
             } else {
                 drainNormal();
             }
          }
      }
      
       void drainFused() {
          ...
          actual.onNext(null);//执行终点observer的next方法,在主线程中执行
          ...
      }
      

    代码执行流程:

    相关文章

      网友评论

          本文标题:RxJava(2):线程切换原理

          本文链接:https://www.haomeiwen.com/subject/avibultx.html