美文网首页
RxJava之转换操作符源码介绍

RxJava之转换操作符源码介绍

作者: 103style | 来源:发表于2019-06-03 22:43 被阅读0次

    转载请以链接形式标明出处:
    本文出自:103style的博客

    转换相关的操作符 以及 官方介绍

    RxJava转换操作符 官方介绍 :Transforming Observables

    以下介绍我们就直接具体实现,中间流程请参考 RxJava之create操作符源码解析

    buffer

    • 官方示例:
      Observable.range(0, 10)
          .buffer(4)
          .subscribe((List<Integer> buffer) -> System.out.println(buffer));
      
      输出:
      [0, 1, 2, 3]
      [4, 5, 6, 7]
      [8, 9]
      
    • 返回对象的 ObservableBuffersubscribeActual 方法:
      单参数bufferskipcount 是相等的。
      protected void subscribeActual(Observer<? super U> t) {
          if (skip == count) {
              BufferExactObserver<T, U> bes = new BufferExactObserver<T, U>(t, count, bufferSupplier);
              if (bes.createBuffer()) {//1.0
                  source.subscribe(bes);
              }
          } else {
              source.subscribe(new BufferSkipObserver<T, U>(t, count, skip, bufferSupplier));
          }
      }
      
      • (1.0) createBuffer即新创建了一个ArrayList对象 buffer
    • onNext(T t)onComplete()方法:
      public void onNext(T t) {
          U b = buffer;
          if (b != null) {
              b.add(t);
              if (++size >= count) {//1.0
                  downstream.onNext(b);
                  size = 0;
                  createBuffer();
              }
          }
      }
      
      public void onComplete() {
          U b = buffer;
          if (b != null) {//2.0
              buffer = null;
              if (!b.isEmpty()) {
                  downstream.onNext(b);
              }
              downstream.onComplete();
          }
      }
      
      • (1.0) 每次调用onNext 就检查缓存的事件数是否 不小于 buffer操作符设置的 值,成立则将缓存的 buffer 数组 传给观察者的 onNext
      • (2.0) onComplete 是检查缓存的事件数是否不为空,成立则将缓存的 buffer 数组 传给观察者的 onNext,再调用观察者的 onComplete

    cast

    • 官方示例:
      Observable<Number> numbers = Observable.just(1, 4.0, 3f, 7, 12, 4.6, 5);
      numbers.filter((Number x) -> x instanceof Integer)
              .cast(Integer.class)
              .subscribe((Integer x) -> System.out.println(x));
      
      输出:
      1
      7
      12
      5
      
    • cast 是通过map操作符来实现的,我们直接看map
      public final <U> Observable<U> cast(final Class<U> clazz) {
          ObjectHelper.requireNonNull(clazz, "clazz is null");
          return map(Functions.castFunction(clazz));
      }
      
      • apply方法:
        public U apply(T t) throws Exception {
            return clazz.cast(t);
        } 
        

    map

    • 官方示例:
      Observable.just(1, 2, 3)
              .map(x -> x * x)
              .subscribe(System.out::println);
      
      输出:
      1
      4
      9
      
    • 返回对象的 ObservableMapsubscribeActual 方法:
      public void subscribeActual(Observer<? super U> t) {
          source.subscribe(new MapObserver<T, U>(t, function));
      }
      
    • 继续看 MapObserveronNext(T t)
      public void onNext(T t) {
          if (done) {
              return;
          }
          if (sourceMode != NONE) {
              downstream.onNext(null);
              return;
          }
          U v;
          try {
              v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
          } catch (Throwable ex) {
              fail(ex);
              return;
          }
          downstream.onNext(v);
      }
      
      • 即将 map 操作符 传入Function对象的 返回值 传递给 链式调用上一步的返回对象的 onNext(T t)

    concatMap

    RxJava之concatMap系列转换操作符介绍


    flatMap

    RxJava之flatMap系列转换操作符介绍


    flattenAsFlowable

    • 官方示例:
      Single<Double> source = Single.just(2.0);
      Flowable<Double> flowable = source.flattenAsFlowable(x -> {
          return Arrays.asList(x, Math.pow(x, 2), Math.pow(x, 3));
      });
      flowable.subscribe(x -> System.out.println("onNext: " + x));
      
      输出:
      onNext: 2.0
      onNext: 4.0
      onNext: 8.0
      
    • 我们先看Single.just(2.0)
      public static <T> Single<T> just(final T item) {
          ObjectHelper.requireNonNull(item, "value is null");
          return RxJavaPlugins.onAssembly(new SingleJust<T>(item));
      }
      
    • SingleJustsubscribeActual
      protected void subscribeActual(SingleObserver<? super T> observer) {
          observer.onSubscribe(Disposables.disposed());
          observer.onSuccess(value);
      }
      
    • flattenAsFlowable返回对象的 SingleFlatMapIterableFlowablesubscribeActual 方法:
      protected void subscribeActual(Subscriber<? super R> s) {
          source.subscribe(new FlatMapIterableObserver<T, R>(s, mapper));
      }
      
    • 继续看 FlatMapIterableObserveronSubscribe(Disposable d)onSuccess(T value)
      public void onSubscribe(Disposable d) {
          if (DisposableHelper.validate(this.upstream, d)) {
              this.upstream = d;
              downstream.onSubscribe(this);//1.0
          }
      }
      public void onSuccess(T value) {
          Iterator<? extends R> iterator;
          boolean has;
          try {
              iterator = mapper.apply(value).iterator();//2.0 调用apply返回的Iterable对象的 iterator()方法。
              has = iterator.hasNext();//3.0
          } catch (Throwable ex) {
              Exceptions.throwIfFatal(ex);
              downstream.onError(ex);
              return;
          }
          if (!has) {
              downstream.onComplete();//3.1
              return;
          }
          this.it = iterator;//3.2
          drain();
      }
      
      • (1.0) 通过Flowable subscribe流程介绍 我们知道downstream.onSubscribe(this)即调用 FlowableInternalHelper.RequestMax.INSTANCEaccept方法:
        public enum RequestMax implements Consumer<Subscription> {
            INSTANCE;
            @Override
            public void accept(Subscription t) throws Exception {
                t.request(Long.MAX_VALUE);
            }
        }
        
        即:FlatMapIterableObserver.request(Long.MAX_VALUE): 即为设置变量requestedvalueLong.MAX_VALUEdrain()因为it 变量还是null,所以没做什么操作。
        public void request(long n) {
            if (SubscriptionHelper.validate(n)) {
                BackpressureHelper.add(requested, n);
                drain();
            }
        }
        
      • (2.0) 调用flattenAsFlowable传入的Functionapply返回的Iterable对象的 iterator()方法。
      • (3.0) 检查Iterable时候为空,(3.1) 为空直接onComplete()(3.2) 不为空则将 iterator()返回值赋值给当前的 it 变量,继续执行drain()
    • drain():
      void drain() {
          ...
          Subscriber<? super R> a = downstream;
          Iterator<? extends R> iterator = this.it;
          ...
          int missed = 1;
          for (; ; ) {
              if (iterator != null) {
                  long r = requested.get();
                  long e = 0L;
                  if (r == Long.MAX_VALUE) {//1.0
                      slowPath(a, iterator);
                      return;
                  }
                  ...
              }
              ...
          }
      }
      
      • 因为上一步downstream.onSubscribe(this)调用了request(Long.MAX_VALUE), 所以 (1.0) 这里条件成立,执行slowPath(downstream iterator)
    • slowPath(downstream iterator)
      void slowPath(Subscriber<? super R> a, Iterator<? extends R> iterator) {
          for (;;) {
              if (cancelled) {
                  return;
              }
              R v;
              try {
                  v = iterator.next();//1.0
              } catch (Throwable ex) {
                  Exceptions.throwIfFatal(ex);
                  a.onError(ex);
                  return;
              }
              a.onNext(v);//1.1
              if (cancelled) {
                  return;
              }
              boolean b;
              try {
                  b = iterator.hasNext();//1.2
              } catch (Throwable ex) {
                  Exceptions.throwIfFatal(ex);
                  a.onError(ex);
                  return;
              }
              if (!b) {
                  a.onComplete();//1.3
                  return;
              }
          }
      }
      
      • (1.0) 获取到的元素,(1.1)传递给downstreamonNext(1.2)然后判断是否还有其他元素,如果有则循环继续,没有的话即调用 downstreamonComplete 结束。

    flattenAsObservable

    • 官方示例:
      Single<Double> source = Single.just(2.0);
      Observable<Double> observable = source.flattenAsObservable(x -> {
          return Arrays.asList(x, Math.pow(x, 2), Math.pow(x, 3));
      });
      observable.subscribe(x -> System.out.println("onNext: " + x));
      
      输出:
      onNext: 2.0
      onNext: 4.0
      onNext: 8.0
      

    subscribeActual实现的逻辑和 flattenAsFlowable 类似,只是返回的对象为 SingleFlatMapIterableObservable,就不再赘述了。


    groupBy

    • 官方示例:

      Observable<String> animals = Observable.just(
          "Tiger", "Elephant", "Cat", "Chameleon", "Frog", "Fish", "Turtle", "Flamingo");
      animals.groupBy(animal -> animal.charAt(0), String::toUpperCase)
          .concatMapSingle(Observable::toList)
          .subscribe(System.out::println);
      

      输出:

      [TIGER, TURTLE]
      [ELEPHANT]
      [CAT, CHAMELEON]
      [FROG, FISH, FLAMINGO]
      
    • 我们来看看返回对象的ObservableGroupBy:

      public GroupByObserver(Observer<? super GroupedObservable<K, V>> actual, 
                             Function<? super T, ? extends K> keySelector, 
                             Function<? super T, ? extends V> valueSelector, 
                             int bufferSize, boolean delayError) {
          this.downstream = actual;
          this.keySelector = keySelector;
          this.valueSelector = valueSelector;
          this.bufferSize = bufferSize;
          this.delayError = delayError;
          this.groups = new ConcurrentHashMap<Object, GroupedUnicast<K, V>>();
          this.lazySet(1);
      }
      
      public void subscribeActual(Observer<? super GroupedObservable<K, V>> t) {
          source.subscribe(new GroupByObserver<T, K, V>(t, keySelector, valueSelector, bufferSize, delayError));
      } 
      
    • 继续看 GroupByObserveronNext(T t)

      public void onNext(T t) {
          K key;
          try {
              key = keySelector.apply(t);//1.0
          } catch (Throwable e) {
              ...
              return;
          }
          Object mapKey = key != null ? key : NULL_KEY;
          GroupedUnicast<K, V> group = groups.get(mapKey);
          if (group == null) {
              if (cancelled.get()) {
                  return;
              }
              group = GroupedUnicast.createWith(key, bufferSize, this, delayError);
              groups.put(mapKey, group);//2.0
              getAndIncrement();
              downstream.onNext(group);//3.0
          }
      
          V v;
          try {
              v = ObjectHelper.requireNonNull(valueSelector.apply(t), "The value supplied is null");//4.0
          } catch (Throwable e) {
              ...
              return;
          }
          group.onNext(v);//4.1
      }
      
      • (1.0) 通过keySelector.apply(t)即官方示例中的 animal.charAt(0)获取分组的 key
      • (2.0) 如果GroupedUnicast不存再这个key,则保存进去。
      • (3.0) 然后继续调用上一步操作符的 onNext方法,即官方示例中的just
      • (4.0) 通过valueSelector.apply(t)即官方示例中的 String::toUpperCase)获取值,(4.1)添加到ToListObservercollection中。
    • 最后通过 onComplete()输出:

      public void onComplete() {
          List<ObservableGroupBy.GroupedUnicast<K, V>> list = new ArrayList<ObservableGroupBy.GroupedUnicast<K, V>>(groups.values());
          groups.clear();
          for (ObservableGroupBy.GroupedUnicast<K, V> e : list) {
              e.onComplete();
          }
          downstream.onComplete();
      }
      

      ToListObserveronComplete():

      public void onComplete() {
          U c = collection;
          collection = null;
          downstream.onNext(c);
          downstream.onComplete();
      }
      

    scan

    • 官方示例:
      Observable.just(5, 3, 8, 1, 7)
              .scan(0, (partialSum, x) -> partialSum + x)
              .subscribe(System.out::println);
      
      输出:
      0
      5
      8
      16
      17
      24
      
    • 我们来看看返回对象的ObservableScanSeed:
      public ObservableScanSeed(ObservableSource<T> source, Callable<R> seedSupplier, BiFunction<R, ? super T, R> accumulator) {
          super(source);
          this.accumulator = accumulator;
          this.seedSupplier = seedSupplier;
      }
      
      @Override
      public void subscribeActual(Observer<? super R> t) {
          R r;
          try {
              r = ObjectHelper.requireNonNull(seedSupplier.call(), "The seed supplied is null");//1.0
          } catch (Throwable e) {
              Exceptions.throwIfFatal(e);
              EmptyDisposable.error(e, t);
              return;
          }
          source.subscribe(new ScanSeedObserver<T, R>(t, accumulator, r));
      }
      
      • (1.0) seedSupplier.call()即官方示例中的 0,即设置 r 的值为0.
    • 继续看ScanSeedObserveronNext(T t):
      public void onNext(T t) {
          if (done) {
              return;
          }
          R v = value;//1.0
          R u;
          try {
              u = ObjectHelper.requireNonNull(accumulator.apply(v, t), "The accumulator returned a null value");//2.0
          } catch (Throwable e) {
              Exceptions.throwIfFatal(e);
              upstream.dispose();
              onError(e);
              return;
          }
          value = u;//3.0
          downstream.onNext(u);//4.0
      }
      
      • (1.0) value 即为上一步设置的 r0.
      • (2.0) accumulator.apply(v, t) 即为官方示例中的 partialSum + x
      • (3.0) 更新value 的值
      • (4.0)accumulator.apply(v, t)传递给观察者的onNext

    switchMap

    • 官方示例:
      Observable.interval(0, 1, TimeUnit.SECONDS)
              .switchMap(x -> {
                  return Observable.interval(0, 750, TimeUnit.MILLISECONDS)
                          .map(y -> x);
              })
              .takeWhile(x -> x < 3)
              .blockingSubscribe(System.out::print);
      
      
      输出:
      001122
      
    • 我们来看看返回对象的ObservableSwitchMap:
      public ObservableSwitchMap(ObservableSource<T> source,
                                 Function<? super T, ? extends ObservableSource<? extends R>> mapper, int bufferSize,
                                         boolean delayErrors) {
          super(source);
          this.mapper = mapper;
          this.bufferSize = bufferSize;
          this.delayErrors = delayErrors;
      }
      
      @Override
      public void subscribeActual(Observer<? super R> t) {
          if (ObservableScalarXMap.tryScalarXMapSubscribe(source, t, mapper)) {
              return;
          }
          source.subscribe(new SwitchMapObserver<T, R>(t, mapper, bufferSize, delayErrors));
      }
      
    • 继续看SwitchMapObserveronNext:
      public void onNext(T t) {
          long c = unique + 1;
          unique = c;
          SwitchMapInnerObserver<T, R> inner = active.get();
          if (inner != null) {
              inner.cancel();
          }
          ObservableSource<? extends R> p;
          try {
              p = ObjectHelper.requireNonNull(mapper.apply(t), "The ObservableSource returned is null");//1.0
          } catch (Throwable e) {
              ...
              return;
          }
          SwitchMapInnerObserver<T, R> nextInner = new SwitchMapInnerObserver<T, R>(this, c, bufferSize);//2.0
          for (;;) {
              inner = active.get();
              if (inner == CANCELLED) {
                  break;
              }
              if (active.compareAndSet(inner, nextInner)) {
                  p.subscribe(nextInner);//3.0
                  break;
              }
          }
      }
      
      • (1.0) 通过mapper.apply(t)即官方示例中的 Observable.interval(0, 750, TimeUnit.MILLISECONDS).map(y -> x)返回的ObservableMap对象。
      • (2.0) 构建SwitchMapInnerObserver对象
      • (3.0) 用返回的ObservableMap订阅SwitchMapInnerObserver对象

    window

    • 官方示例:
      Observable.range(1, 10)
              // Create windows containing at most 2 items, and skip 3 items before starting a new window.
              .window(2, 3)
              .flatMapSingle(window -> {
                  return window.map(String::valueOf)
                          .reduce(new StringJoiner(", ", "[", "]"), StringJoiner::add);
              })
              .subscribe(System.out::println);
      
      输出:
      [1, 2]
      [4, 5]
      [7, 8]
      [10]
      
    • 我们来看看返回对象的ObservableWindow:
      public ObservableWindow(ObservableSource<T> source, long count, long skip, int capacityHint) {
          super(source);
          this.count = count;
          this.skip = skip;
          this.capacityHint = capacityHint;
      }
      
      @Override
      public void subscribeActual(Observer<? super Observable<T>> t) {
          if (count == skip) {
              source.subscribe(new WindowExactObserver<T>(t, count, capacityHint));
          } else {
              source.subscribe(new WindowSkipObserver<T>(t, count, skip, capacityHint));
          }
      }
      
    • 继续看WindowSkipObserveronNext:
      public void onNext(T t) {
          final ArrayDeque<UnicastSubject<T>> ws = windows;
          long i = index;
          long s = skip;
          if (i % s == 0 && !cancelled) {//3.0
              wip.getAndIncrement();
              UnicastSubject<T> w = UnicastSubject.create(capacityHint, this);
              ws.offer(w);
              downstream.onNext(w);
          }
          long c = firstEmission + 1;
          for (UnicastSubject<T> w : ws) {
              w.onNext(t);//1.0
          }
          if (c >= count) {
              ws.poll().onComplete();//2.0
              if (ws.isEmpty() && cancelled) {
                  this.upstream.dispose();
                  return;
              }
              firstEmission = c - s;
          } else {
              firstEmission = c;
          }
          index = i + 1;
      }
      
      • (1.0) 将元素存入 queue
      • (2.0) 当元素个数到达count时,就之前的元素全部输出
      • (3.0) 当元素个数到达skip时,就重新创建一个UnicastSubject来存储元素

    以上

    相关文章

      网友评论

          本文标题:RxJava之转换操作符源码介绍

          本文链接:https://www.haomeiwen.com/subject/bsdazqtx.html