美文网首页
RxJava 源码分析系列(四) -操作符变换原理

RxJava 源码分析系列(四) -操作符变换原理

作者: 琼珶和予 | 来源:发表于2018-09-09 15:48 被阅读0次

      我们继续来学习RxJava的源码,今天主要会学习RxJava中的操作符变换原理。
      本文重点讲解ObservableFlowableObservable非常的像,所以就不做讲解。

    1.概述

      就Observable而言,操作符变化主要涉及的两个类:AbstractObservableWithUpstreamObservableOperator
      其中,我们可以看到的是,RxJava给我们的变换操作符,包括map,flatMapbuffer等操作符等继承于AbstractObservableWithUpstream类。
      而ObservableOperator接口主要用于自定义操作符,然后配合lift操作符,美滋滋🤓🤓。
      AbstractObservableWithUpstreamObservableOperator都有一个共同的特点,那就是进行变换时,都会找一个中间的ObserverObservable先将数据传递到中间的ObserveronNext方法,然后在onNext方法进行变换,变换之后,在传递给我们的Observer。这一点,我们必须了解。

    2. AbstractObservableWithUpstream

      我们先来看看AbstractObservableWithUpstream这个类,看一下这个类为我们做了哪些事情。

    abstract class AbstractObservableWithUpstream<T, U> extends Observable<U> implements HasUpstreamObservableSource<T> {
        protected final ObservableSource<T> source;
        AbstractObservableWithUpstream(ObservableSource<T> source) {
            this.source = source;
        }
        @Override
        public final ObservableSource<T> source() {
            return source;
        }
    }
    

      首先,我们看到的是,AbstractObservableWithUpstream是包权限,也就是RX官方也不希望我们来使用这个类。
      这个类表示的意思非常简单,实现了HasUpstreamObservableSource接口,并且实现了source方法,返回了一个ObservableSource对象。
      我们感觉AbstractObservableWithUpstream根本没做什么事情,是的,我也这么觉得😂😂。
      为了更好理解操作变换原理,我们来简单的看看map操作符。关于操作符的理解,本文只是简单做一个理解,如果后续有需要的话,可以对某些操作符做单独的分析。

    (1).map操作符

      map操作符主要涉及到的是ObservableMap类,主要是在这个类对中间Observer进行了subscribe。我们来看看:

    public final class ObservableMap<T, U> extends AbstractObservableWithUpstream<T, U> {
        final Function<? super T, ? extends U> function;
    
        public ObservableMap(ObservableSource<T> source, Function<? super T, ? extends U> function) {
            super(source);
            this.function = function;
        }
    
        @Override
        public void subscribeActual(Observer<? super U> t) {
            source.subscribe(new MapObserver<T, U>(t, function));
        }
    }
    

      我们看到,在subscribeActual方法里面进行了subscribe操作,然后ObservableonNext等操作都会执行到MapObserver相应的操作当中来。
      我们知道,map操作主要是将一个数据从一种类型转换为另一个种类型。而这种实现主要是在onNext方法里面进行的,我们来看看MapObserveronNext方法:

            public void onNext(T t) {
                if (done) {
                    return;
                }
    
                if (sourceMode != NONE) {
                    actual.onNext(null);
                    return;
                }
    
                U v;
    
                try {
                    v = ObjectHelper.requireNonNull(mapper.apply(t), "The mapper function returned a null value.");
                } catch (Throwable ex) {
                    fail(ex);
                    return;
                }
                actual.onNext(v);
            }
    

      在onNext方法里面,调用了Functionapply方法将一个数据从一个类型转换为另一个类型,然后在调用我们的ObserveronNext方法将转换之后的数据传递下去。这就是整个map转换的过程。
      从这里,可以应证前面的一点,那就是需要一个中间Observer来实现。这种实现方法相当于是一个组合式的代理模式,代理对象是中间的Observer,真正做操作的是我们的Observer

    3. ObservableOperator

      我们来看看ObservableOperator类,看看我们怎么通过ObservableOperator来自定义一个操作符。之前说过,ObservableOperator需要跟lift操作符来实现。所以,我们先来看看lift操作符:

        public final <R> Observable<R> lift(ObservableOperator<? extends R, ? super T> lifter) {
            ObjectHelper.requireNonNull(lifter, "onLift is null");
            return RxJavaPlugins.onAssembly(new ObservableLift<R, T>(this, lifter));
        }
    

      好吧,我们来看看ObservableLift

    public final class ObservableLift<R, T> extends AbstractObservableWithUpstream<T, R> {
        final ObservableOperator<? extends R, ? super T> operator;
    
        public ObservableLift(ObservableSource<T> source, ObservableOperator<? extends R, ? super T> operator) {
            super(source);
            this.operator = operator;
        }
    
        @Override
        public void subscribeActual(Observer<? super R> s) {
            Observer<? super T> observer;
            try {
                observer = ObjectHelper.requireNonNull(operator.apply(s), "Operator " + operator + " returned a null Observer");
            } catch (NullPointerException e) { // NOPMD
                throw e;
            } catch (Throwable e) {
                Exceptions.throwIfFatal(e);
                RxJavaPlugins.onError(e);
    
                NullPointerException npe = new NullPointerException("Actually not, but can't throw other exceptions due to RS");
                npe.initCause(e);
                throw npe;
            }
    
            source.subscribe(observer);
        }
    }
    

      在这个类里面,主要是在subscribeActual方法里面做了一次apply的操作,通过这个操作,我们获取了一个新的Observer,这个新的Observer就是我们的Observer,然后再subscribe。这个也是非常的简单。现在我们来通过一个小小的案例来实现一个map操作符。

    (1).实现Map操作符

      我们先定义了一个ObserverMap用来将一个U类型转换为T类型

    public class ObserverMap<T, U> implements ObservableOperator<T, U> {
    
      private Function<U, T> mFunction;
    
      public ObserverMap(Function<U, T> function) {
        this.mFunction = function;
      }
    
      @Override
      public Observer<? super U> apply(final Observer<? super T> observer) throws Exception {
        return new Observer<U>() {
          @Override
          public void onSubscribe(Disposable d) {
            observer.onSubscribe(d);
          }
    
          @Override
          public void onNext(U u) {
            observer.onNext(mFunction.apply(u));
          }
    
          @Override
          public void onError(Throwable e) {
            observer.onError(e);
          }
    
          @Override
          public void onComplete() {
            observer.onComplete();
          }
        };
      }
    }
    

      整个类也是非常的简单,只在onNext方法中做了一次类型转换。
      然后,我们看一下,是怎么使用的:

        Observable.create(new ObservableOnSubscribe<Integer>() {
          @Override
          public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
            emitter.onNext(1);
            emitter.onNext(2);
          }
        }).lift(new ObserverMap<>(new Function<Integer, String>() {
          @Override
          public String apply(Integer input) {
            return "input = " + String.valueOf(input);
          }
        })).subscribe(new Consumer<String>() {
          @Override
          public void accept(String s) throws Exception {
            Log.i("pby123", s);
          }
        });
    

    4.总结

      总的来说,这篇文章是非常的简单的,这里的讲解主要对后面打一个基础,后续有可能会对一些操作符进行分析,同时讲解一下这个,相信大家以后看到相关的代码不会懵逼。

    相关文章

      网友评论

          本文标题:RxJava 源码分析系列(四) -操作符变换原理

          本文链接:https://www.haomeiwen.com/subject/boargftx.html