美文网首页
Rxjava2-just与fromArray

Rxjava2-just与fromArray

作者: CODERLIHAO | 来源:发表于2018-10-09 15:09 被阅读0次

    所有代码的演示都在RxJava2.1.13版本上进行的

         Observable.just(1,2,3,4).subscribe(integer -> System.out.println(integer));
         Observable.fromArray(1,2,3,4,5).subscribe(integer -> System.out.println(integer));
    

    just也是间接代用fromArray(),我们简单看看just的分析过程
    看看这段代码是如何运行的

        @SuppressWarnings("unchecked")
        @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.NONE)
        public static <T> Observable<T> just(T item1, T item2, T item3, T item4) {
           ...
           return fromArray(item1, item2, item3, item4);
        }
    

    重要的是调用fromArray()创建ObservableFromArray对象

      @CheckReturnValue
        @SchedulerSupport(SchedulerSupport.NONE)
        public static <T> Observable<T> fromArray(T... items) {
            ObjectHelper.requireNonNull(items, "items is null");
            if (items.length == 0) {
                return empty();
            } else
            if (items.length == 1) {
                return just(items[0]);
            }
            return RxJavaPlugins.onAssembly(new ObservableFromArray<T>(items));
        }
    

    ObservableFromArray继承自Observable,而Observable里有一个系列函数subscribe(),一旦被观察者订阅观察者(调用subscribe())就会触发subscribeActual(Observer<? super T> observer),
    每一个Observable的子类都有各自的实现
    简单看看ObservableFromArray的源码

    
    public final class ObservableFromArray<T> extends Observable<T> {
      final T[] array;
      public ObservableFromArray(T[] array) {
          this.array = array;
      }
      @Override
      public void subscribeActual(Observer<? super T> s) {
          FromArrayDisposable<T> d = new FromArrayDisposable<T>(s, array);
    
          s.onSubscribe(d);
    
          if (d.fusionMode) {
              return;
          }
    
          d.run();
      }
    
      static final class FromArrayDisposable<T> extends BasicQueueDisposable<T> {
    
          final Observer<? super T> actual;
    
          final T[] array;
    
          int index;
    
          boolean fusionMode;
    
          volatile boolean disposed;
    
          FromArrayDisposable(Observer<? super T> actual, T[] array) {
              this.actual = actual;
              this.array = array;
          }
    
          @Override
          public int requestFusion(int mode) {
              if ((mode & SYNC) != 0) {
                  fusionMode = true;
                  return SYNC;
              }
              return NONE;
          }
    
          @Nullable
          @Override
          public T poll() {
              int i = index;
              T[] a = array;
              if (i != a.length) {
                  index = i + 1;
                  return ObjectHelper.requireNonNull(a[i], "The array element is null");
              }
              return null;
          }
    
          @Override
          public boolean isEmpty() {
              return index == array.length;
          }
    
          @Override
          public void clear() {
              index = array.length;
          }
    
          @Override
          public void dispose() {
              disposed = true;
          }
    
          @Override
          public boolean isDisposed() {
              return disposed;
          }
    
          void run() {
              T[] a = array;
              int n = a.length;
    
              for (int i = 0; i < n && !isDisposed(); i++) {
                  T value = a[i];
                  if (value == null) {
                      actual.onError(new NullPointerException("The " + i + "th element is null"));
                      return;
                  }
                  actual.onNext(value);
              }
              if (!isDisposed()) {
                  actual.onComplete();
              }
          }
      }
    }
    

    一旦订阅就会执行subscribeActual(),进而执行run(),而run()方法中只是将数组遍历一遍

    相关文章

      网友评论

          本文标题:Rxjava2-just与fromArray

          本文链接:https://www.haomeiwen.com/subject/rmneaftx.html