美文网首页主流框架开源库
主流开源库-Rxjava源码分析

主流开源库-Rxjava源码分析

作者: isLJli | 来源:发表于2020-04-15 08:34 被阅读0次

    从最简单的使用开始分析起:

    Observable.just("Url")
                  .subscribe(new Observer<String>() {
                      @Override
                      public void onSubscribe(Disposable d) {
                          
                      }
    
                      @Override
                      public void onNext(String value) {
    
                      }
    
                      @Override
                      public void onError(Throwable e) {
    
                      }
    
                      @Override
                      public void onComplete() {
    
                      }
                  });
    

    先看一下just("Url")f方法的去向:

    #Observable:
    @SchedulerSupport(SchedulerSupport.NONE)
      public static <T> Observable<T> just(T item) {
          ObjectHelper.requireNonNull(item, "The item is null");
          return RxJavaPlugins.onAssembly(new ObservableJust<T>(item));
      }
    

    看一下RxJavaPlugins.onAssembly()

    public static <T> Observable<T> onAssembly(Observable<T> source) {
          Function<Observable, Observable> f = onObservableAssembly;
          if (f != null) {
              return apply(f, source);
          }
          return source;
      }
    

    从return可以看出,它返回就是我们传给它的参数值,所以just()方法返回的就是new ObservableJust<T>(item),ObservableJust类,看看这个方法:

    //可以看到这个类继承了Observable
    public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    
      private final T value;
      public ObservableJust(final T value) {
          this.value = value;
      }
    
      @Override
      protected void subscribeActual(Observer<? super T> s) {
          ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
          s.onSubscribe(sd);
          sd.run();
      }
    
      @Override
      public T call() {
          return value;
      }
    }
    

    just()的时候就是通过一个泛型,把参数值传递进来保存ObservableJust类中。

    第二,订阅方法subscribe()源码

    @Override
      public final void subscribe(Observer<? super T> observer) {
          ObjectHelper.requireNonNull(observer, "observer is null");
          try {
              observer = RxJavaPlugins.onSubscribe(this, observer);
    
              ObjectHelper.requireNonNull(observer, "Plugin returned null Observer");
              //重点方法
              subscribeActual(observer);
          } catch (NullPointerException e) { // NOPMD
              throw e;
          } catch (Throwable e) {
              Exceptions.throwIfFatal(e);
              RxJavaPlugins.onError(e);
              NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
              npe.initCause(e);
              throw npe;
          }
      }
    
    protected abstract void subscribeActual(Observer<? super T> observer);
    
    

    可以看到,把参数Observer传给了subscribeActual(observer),但是这个方法是Observable的抽象类,所以我们在其子类Observable中查看此方法实现:

    public final class ObservableJust<T> extends Observable<T> implements ScalarCallable<T> {
    
      private final T value;
      public ObservableJust(final T value) {
          this.value = value;
      }
    
      @Override
      protected void subscribeActual(Observer<? super T> s) {
          ScalarDisposable<T> sd = new ScalarDisposable<T>(s, value);
         //首先调用Observer.onSubscribe方法,并把sd传进去
          s.onSubscribe(sd);
         //随后,调用sd的run方法
          sd.run();
      }
    
      @Override
      public T call() {
          return value;
      }
    }
    

    从代码中得知subscribeActual()方法,先把Observer的实例和参数value创建ScalarDisposable对象,然后调用Observer的onSubscribe方法,然后执行sd.run()方法

    最后我们查看一下ScalarDisposable的run方法整个流程就走完了:

    public static final class ScalarDisposable<T>
      extends AtomicInteger
      implements QueueDisposable<T>, Runnable {
          ....
          final Observer<? super T> observer;
    
          final T value;
          
          public ScalarDisposable(Observer<? super T> observer, T value) {
              this.observer = observer;
              this.value = value;
          }
    
        .....
         
          @Override
          public void run() {
              if (get() == START && compareAndSet(START, ON_NEXT)) {
                 //调用observer接口的onNext()方法,并把我们传的参数写进去。
                 observer.onNext(value);
                  if (get() == ON_NEXT) {
                      lazySet(ON_COMPLETE);
                      observer.onComplete();//调用observer接口的onComplete方法
                  }
              }
          }
      }
    

    所以,在Obdervable类中subscribe()方法中没有直接调用Observer的方法,而是通过两个类中去调用。

    (二)有时我们会使用new Consumer这种简单的写法
    我们来分析一下:

    首先Consumer是一个接口,里面有一个accept方法。
    public interface Consumer<T> {
      void accept(T t) throws Exception;
    }
    
    接着我们从subscribe(Consumer)中点进去看一源码:
    
    public final Disposable subscribe(Consumer<? super T> onNext) {
          return subscribe(onNext, Functions.ERROR_CONSUMER, Functions.EMPTY_ACTION, Functions.emptyConsumer());
      }
    我们发现它把我们实例化的Consumer作为参数,传到另一个subscribe中。
    其中还有Funtions类中静态量,点进去发现两个是Consumer接口实例类,还有一个是Action。
    public static final Consumer<Throwable> ERROR_CONSUMER = new Consumer<Throwable>() {
          @Override
          public void accept(Throwable error) {
              RxJavaPlugins.onError(error);
          }
      };
    public static final Action EMPTY_ACTION = new Action() {
          @Override
          public void run() { }
    
          @Override
          public String toString() {
              return "EmptyAction";
          }
      };
    static final Consumer<Object> EMPTY_CONSUMER = new Consumer<Object>() {
          @Override
          public void accept(Object v) { }
    
          @Override
          public String toString() {
              return "EmptyConsumer";
          }
      };
    
     
    我们看看return的subscribe方法:
    
    public final Disposable subscribe(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
              Action onComplete, Consumer<? super Disposable> onSubscribe) {
          ObjectHelper.requireNonNull(onNext, "onNext is null");
          ObjectHelper.requireNonNull(onError, "onError is null");
          ObjectHelper.requireNonNull(onComplete, "onComplete is null");
          ObjectHelper.requireNonNull(onSubscribe, "onSubscribe is null");
          //我们发现这个类实现了Observer接口,并传进去我们的Consumer实现类。
          LambdaObserver<T> ls = new LambdaObserver<T>(onNext, onError, onComplete, onSubscribe);
         //这里才是真正订阅我们的Obsever
          subscribe(ls);
    
          return ls;
      }
    

    从上面的分析我们知道,LambadaObserver类中的onNext等四个方法会接收我们传递的参数,然后我们这个参数通过Consumer实例传给方法。

    public final class LambdaObserver<T> extends AtomicReference<Disposable> implements Observer<T>, Disposable {
    
      private static final long serialVersionUID = -7251123623727029452L;
      final Consumer<? super T> onNext;
      final Consumer<? super Throwable> onError;
      final Action onComplete;
      final Consumer<? super Disposable> onSubscribe;
    
      public LambdaObserver(Consumer<? super T> onNext, Consumer<? super Throwable> onError,
              Action onComplete,
              Consumer<? super Disposable> onSubscribe) {
          super();
          this.onNext = onNext;
          this.onError = onError;
          this.onComplete = onComplete;
          this.onSubscribe = onSubscribe;
      }
    
      @Override
      public void onSubscribe(Disposable s) {
          if (DisposableHelper.setOnce(this, s)) {
              try {
                  onSubscribe.accept(this);
              } catch (Throwable ex) {
                  Exceptions.throwIfFatal(ex);
                  s.dispose();
                  RxJavaPlugins.onError(ex);
              }
          }
      }
    
      @Override
      public void onNext(T t) {
          try {
              onNext.accept(t);
          } catch (Throwable e) {
              Exceptions.throwIfFatal(e);
              onError(e);
          }
      }
    
      @Override
      public void onError(Throwable t) {
          dispose();
          try {
              onError.accept(t);
          } catch (Throwable e) {
              Exceptions.throwIfFatal(e);
              RxJavaPlugins.onError(e);
              RxJavaPlugins.onError(t);
          }
      }
    
      @Override
      public void onComplete() {
          dispose();
          try {
              onComplete.run();
          } catch (Throwable e) {
              Exceptions.throwIfFatal(e);
              RxJavaPlugins.onError(e);
          }
      }
    
      @Override
      public void dispose() {
          DisposableHelper.dispose(this);
      }
    
      @Override
      public boolean isDisposed() {
          return get() == DisposableHelper.DISPOSED;
      }
    }
    

    变换操作符map:

    这是经过两次变换的map代码

    .map(new Function<String, Bitmap>() {
                      @Override
                      public Bitmap apply(String s) throws Exception {
                          URL url= new URL(s);
                          HttpURLConnection urlConnection = (HttpURLConnection) url.openConnection();
                          InputStream inputStream= urlConnection.getInputStream();
                          Bitmap bitmap= BitmapFactory.decodeStream(inputStream);
                          return bitmap;
                      }
                  })
                  .map(new Function<Bitmap, Bitmap>() {
                      @Override
                      public Bitmap apply(Bitmap bitmap) throws Exception {
                          return createWatermark(bitmap,"Rxjava");
                      }
                  })
    

    事件流,只关心上游和下游。
    定义了两个泛型的接口:

    public interface Function<T, R> {
     
      R apply(T t) throws Exception;
    }
    
      泛型方法前面要<R>,并传入一个Function实例,返回一个ObservableMap
      public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
          ObjectHelper.requireNonNull(mapper, "mapper is null");
         // onAeesmbly()方法前面分析过返回它的参数,这里传了一个this:就是调用方法上一个Observable,还有function实例
          return RxJavaPlugins.onAssembly(new ObservableMap<T, R>(this, mapper));
      }
    
    
    public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
      final Function<? super T, ? extends U> function;
    
      //这里保存上一个调用方法的Observable,和此次function实例
      public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
          super(source);
          this.function = function;
      }
    
     //subscribeActual方法的功能就是把我们传递的参数给Observer的onNext方法。
     //如果有多个map操作,就会遍历这个这个方法,
     //一直到找到一个source(如ObservableJust)可以在subscribe()里的subscribeActual能真正把我们传递的参数给Observer的onNext方法
      @Override
      public void subscribeActual(Observer<? super U> t) {
          source.subscribe(new MapObserver<T, U>(t, function));
      }
    //正向:ObservableJust.subscribe(new MapObserver<T, U>(t, function)) :我们手动的url传给了下一个的MapObserver.onNext()参数。
    
    
    
      static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
          final Function<? super T, ? extends U> mapper;
    
         //这里将订阅的Observer,和当前的function实例传进来
          MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
              super(actual);
              this.mapper = mapper;
          }
    
          @Override
          public void onNext(T t) {
              if (done) {
                  return;
              }
              if (sourceMode != NONE) {
                  actual.onNext(null);
                  return;
              }
              //做事件的变换赋值
              U v;
              try {
                  v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
              } catch (Throwable ex) {
                  fail(ex);
                  return;
              }
             //给订阅的Observer传递此次变换后的数据
              actual.onNext(v);
          }
    
          @Override
          public int requestFusion(int mode) {
              return transitiveBoundaryFusion(mode);
          }
    
          @Override
          public U poll() throws Exception {
              T t = qs.poll();
              return t != null ? ObjectHelper.<U>requireNonNull(mapper.apply(t), "The mapper function returned a null value.") : null;
          }
      }
    }
    
    大体流程图

    Subscribe.on()源码:

    .subscribeOn(Schedulers.io())// 上面之前的执行在子线程中
    .observeOn(AndroidSchedulers.mainThread())// 下面之后的执行在主线程中
    

    我们直接看重点部分:

    // 创建了一个 ObservableSubscribeOn 
    public final Observable<T> subscribeOn(Scheduler scheduler) {
      ObjectHelper.requireNonNull(scheduler, "scheduler is null");
      return RxJavaPlugins.onAssembly(new ObservableSubscribeOn<T>(this, scheduler));
    }
    
    // 主要看 subscribeActual 方法
    public final class ObservableSubscribeOn<T> extends AbstractObservableWithUpstream<T, T> {
      final Scheduler scheduler;
    
      public ObservableSubscribeOn(ObservableSource<T> source, Scheduler scheduler) {
          super(source);
          this.scheduler = scheduler;
      }
    
      @Override
      public void subscribeActual(final Observer<? super T> s) {
          // 创建了一个 SubscribeOnObserver ,也就是把 SubscribeOnObserver 进行了一层包装
          // 代理设计模式
          final SubscribeOnObserver<T> parent = new SubscribeOnObserver<T>(s);
          // 调用代理的 Observer 的 onSubscribe 方法
          s.onSubscribe(parent);
          // 把下面这个代码变为两行,容易看懂一点
          // parent.setDisposable(scheduler.scheduleDirect(new SubscribeTask(parent)));
          Disposable disposable = scheduler.scheduleDirect(new SubscribeTask(parent));
          parent.setDisposable(disposable);
      }
    }
    // 看到这里差不多要明白了 implements Runnable 看样子要开线程的节奏
    final class SubscribeTask implements Runnable {
      private final SubscribeOnObserver<T> parent;
    
      SubscribeTask(SubscribeOnObserver<T> parent) {
          this.parent = parent;
      }
    
      @Override
      public void run() {
          source.subscribe(parent);
      }
    }
    
    // Schedulers.io() 返回 IoHolder.DEFAULT
    static final class IOTask implements Callable<Scheduler> {
          @Override
          public Scheduler call() throws Exception {
              return IoHolder.DEFAULT;
          }
    }
    // 单例设计模式 - 静态内部类
    static final class IoHolder {
      static final Scheduler DEFAULT = new IoScheduler();
    }
    
    //scheduler.scheduleDirect最终调用这个方法生成Disposable
    // 线程池 + 线程 + Runnable
    @NonNull
    public Disposable scheduleDirect(@NonNull Runnable run, long delay, @NonNull TimeUnit unit) {
      // 创建 线程池
      final Worker w = createWorker();
    
      final Runnable decoratedRun = RxJavaPlugins.onSchedule(run);
      // 代理
      DisposeTask task = new DisposeTask(decoratedRun, w);
      // 利用线程池去执行任务
      w.schedule(task, delay, unit);
    
      return task;
    }
    // 在子线程中执行 source.subscribe(parent); // 又要往上走,这是子线程处理的逻辑
    

    observerOn源码
    如果你不调用observeOn()方法,那么最后的一个Observer实例也是运行在上一个的线程中
    直接看重点:

    // 创建了一个 ObservableObserveOn 
    public final Observable<T> observeOn(Scheduler scheduler, boolean delayError, int bufferSize) {
      ObjectHelper.requireNonNull(scheduler, "scheduler is null");
      ObjectHelper.verifyPositive(bufferSize, "bufferSize");
      return RxJavaPlugins.onAssembly(new ObservableObserveOn<T>(this, scheduler, delayError, bufferSize));
    }
    
    // 主要还是看 subscribeActual 方法
    public final class ObservableObserveOn<T> extends AbstractObservableWithUpstream<T, T> {
      
      @Override
      protected void subscribeActual(Observer<? super T> observer) {
         // ...... 省略部分代码
         // 创建一个 Scheduler.Worker(HandlerWorker) 
         Scheduler.Worker w = scheduler.createWorker();
         // Worker对象和下一个observer传进去
         source.subscribe(new ObserveOnObserver<T>(observer, w, delayError, bufferSize));
      }
    }
    // 最后的 onNext 是 schedule() 
    static final class ObserveOnObserver<T> extends BasicIntQueueDisposable<T>
      implements Observer<T>, Runnable {
    
      void schedule() {
          if (getAndIncrement() == 0) {
            //通过Handler切换到主线程中执行任务
              worker.schedule(this);
          }
      }
         //handlerMessage要处理的内容。
          public void run() {
              if (outputFused) {
                  drainFused();
              } else {
                  drainNormal();
              }
          }
    
    1. void drainNormal() {
              int missed = 1;
    
              final SimpleQueue<T> q = queue;
              final Observer<? super T> a = actual;
    
              for (;;) {
                  if (checkTerminated(done, q.isEmpty(), a)) {
                      return;
                  }
    
                  for (;;) {
                      boolean d = done;
                      T v;
    
                      try {
                          v = q.poll();
                      } catch (Throwable ex) {
                          Exceptions.throwIfFatal(ex);
                          s.dispose();
                          q.clear();
                          a.onError(ex);
                          return;
                      }
                      boolean empty = v == null;
    
                      if (checkTerminated(d, empty, a)) {
                          return;
                      }
    
                      if (empty) {
                          break;
                      }
    
                      a.onNext(v);
                  }
    
                  missed = addAndGet(-missed);
                  if (missed == 0) {
                      break;
                  }
              }
          }
    
          2.void drainFused() {
              int missed = 1;
    
              for (;;) {
                  if (cancelled) {
                      return;
                  }
    
                  boolean d = done;
                  Throwable ex = error;
    
                  if (!delayError && d && ex != null) {
                      actual.onError(error);
                      worker.dispose();
                      return;
                  }
    
                  actual.onNext(null);
    
                  if (d) {
                      ex = error;
                      if (ex != null) {
                          actual.onError(ex);
                      } else {
                          actual.onComplete();
                      }
                      worker.dispose();
                      return;
                  }
    
                  missed = addAndGet(-missed);
                  if (missed == 0) {
                      break;
                  }
              }
          }
    
    }
    // MAIN_THREAD 的 Scheduler 
    public final class AndroidSchedulers {
      private static final class MainHolder {
          // new Handler(Looper.getMainLooper()) 创建一个主线程的 Handler 对象
          static final Scheduler MAIN_THREAD= new HandlerScheduler(new Handler(Looper.getMainLooper()));
      }
      //所以AndroidSchedulers.mainThread()最后返回是HandlerScheduler对象
     public static Scheduler mainThread() {
          return RxAndroidPlugins.onMainThreadScheduler(MAIN_THREAD);
      }
    }
    final class HandlerScheduler extends Scheduler {
      private final Handler handler;
      HandlerScheduler(Handler handler) {
          this.handler = handler;
      }
      @Override
      public Worker createWorker() {
          return new HandlerWorker(handler);
      }
    }
    
    // Handler 切换到主线程
    private static final class HandlerWorker extends Worker {
      @Override
      public Disposable schedule(Runnable run, long delay, TimeUnit unit) {
        //这种写法在内部执行了scheduled.run
          Message message = Message.obtain(handler, scheduled);
          message.obj = this; // Used as token for batch disposal of this worker's runnables.
          // 但是 handler 并没有复写 handleMessage 方法,那是怎么调用了方法?一切都在 Handler 源码中
          handler.sendMessageDelayed(message, Math.max(0L, unit.toMillis(delay)));
      }
    }
    

    相关文章

      网友评论

        本文标题:主流开源库-Rxjava源码分析

        本文链接:https://www.haomeiwen.com/subject/pqwtvhtx.html