美文网首页
RxJava map操作符和flatMap操作符的实现

RxJava map操作符和flatMap操作符的实现

作者: leilifengxingmw | 来源:发表于2018-10-25 23:16 被阅读54次

本篇文章的目的:分析RxJava map操作符和flatMap操作符的实现

map操作符的实现

Observable.create(new ObservableOnSubscribe<Integer>() {
            @Override
            public void subscribe(ObservableEmitter<Integer> emitter) throws Exception {
                emitter.onNext(1);
                emitter.onNext(2);
                emitter.onNext(3);
                emitter.onComplete();

            }
        }).map(new Function<Integer, String>() {
            @Override
            public String apply(Integer integer) throws Exception {
                return String.valueOf(integer);
            }
        }).subscribe(new Observer<String>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(String string) {
                Log.e(TAG, "onNext: " + string);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete: ");
            }
        });

先上个图


map.png

Observable的create()方法简化版

public static <T> Observable<T> create(ObservableOnSubscribe<T> source) {
        return new ObservableCreate<T>(source);
}

create()方法返回的Observable是一个ObservableCreate对象。

Observable的map()方法简化版

public final <R> Observable<R> map(Function<? super T, ? extends R> mapper) {
        return new ObservableMap<T, R>(this, mapper);
    }

Observable的subscribe()方法简化版

public final void subscribe(Observer<? super T> observer) {
        subscribeActual(observer);
}

我们进入到ObservableMap类去看

public void subscribeActual(Observer<? super U> t) {
    //1.
    source.subscribe(new MapObserver<T, U>(t, function));
}

这里的source就是create()方法返回的ObservableCreate对象。然后使用传入的下游的观察者和function构建了一个MapObserver对象。

然后source调用subscribe()方法。内部也是调用subscribeActual()方法

protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        observer.onSubscribe(parent);

        try {
            source.subscribe(parent);
        } catch (Throwable ex) {
            Exceptions.throwIfFatal(ex);
            parent.onError(ex);
        }
    }

这个过程在前面的文章中已经分析过了,这里我们只需要知道,当数据发出的时候,会调用map操作符返回的MapObserver对象对应的onNext(),onError()和onComplete()方法。

MapObserver类简化版

static final class MapObserver<T, U> extends BasicFuseableObserver<T, U> {
        final Function<? super T, ? extends U> mapper;

        MapObserver(Observer<? super U> actual, Function<? super T, ? extends U> mapper) {
            super(actual);
            this.mapper = mapper;
        }

        @Override
        public void onNext(T t) {

           U v;
          //应用mapper以后,返回结果
           v = mapper.apply(t)

           downstream.onNext(v);
        }
    }

总结:map操操作符,在onNext() 方法中,获取上游发射的数据t,应用传入的function,获得对应类型的对象v,然后使用v作为参数,调用下游观察者的onNext()方法。还是比较简单的。

flatMap操作符的实现

Observable.create(new ObservableOnSubscribe<List<Integer>>() {
            @Override
            public void subscribe(ObservableEmitter<List<Integer>> emitter) throws Exception {
                 List<Integer> list1 = new ArrayList<>();
                list1.add(1);
                list1.add(2);
                list1.add(3);
                List<Integer> list2 = new ArrayList<>();
                list2.add(4);
                list2.add(5);
                list2.add(6);

                emitter.onNext(list1);
                emitter.onNext(list2);

                emitter.onComplete();

            }
        }).flatMap(new Function<List<Integer>, ObservableSource<Integer>>() {
            @Override
            public ObservableSource<Integer> apply(List<Integer> integers) throws Exception {
                return Observable.fromIterable(integers);
            }
        }).subscribe(new Observer<Integer>() {
            @Override
            public void onSubscribe(Disposable d) {
                Log.e(TAG, "onSubscribe: ");
            }

            @Override
            public void onNext(Integer integer) {
                Log.e(TAG, "onNext: " + integer);
            }

            @Override
            public void onError(Throwable e) {
                Log.e(TAG, "onError: " + e.getMessage());
            }

            @Override
            public void onComplete() {
                Log.e(TAG, "onComplete: ");
            }
        });

上张图


flatMap.png

Observable的flatMap()方法

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper) {
        return flatMap(mapper, false);
    }
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors) {
        return flatMap(mapper, delayErrors, Integer.MAX_VALUE);
    }
public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper, boolean delayErrors, int maxConcurrency) {
        return flatMap(mapper, delayErrors, maxConcurrency, bufferSize());
    }

方法重载也是狠,最终是调用了这个方法,返回一个ObservableFlatMap对象。

public final <R> Observable<R> flatMap(Function<? super T, ? extends ObservableSource<? extends R>> mapper,
            boolean delayErrors, int maxConcurrency, int bufferSize) {
        return new ObservableFlatMap<T, R>(this, mapper, delayErrors, maxConcurrency, bufferSize);
    }

当调用subscribe()方法把观察者和被观察者关联起来的时候会调用ObservableFlatMap的subscribeActual()方法。

ObservableFlatMap的subscribeActual()方法简化版

public void subscribeActual(Observer<? super U> t) {
        source.subscribe(new MergeObserver<T, U>(t, mapper, delayErrors, maxConcurrency, bufferSize));
    }

创建了一个MergeObserver对象,然后source(ObservableCreate对象)调用subscribe()方法。最终会进入到ObservableCreate的subscribeActual()方法。

ObservableCreate的subscribeActual()方法简化版

protected void subscribeActual(Observer<? super T> observer) {
        CreateEmitter<T> parent = new CreateEmitter<T>(observer);
        //1. 调用观察者的onSubscribe()方法
        observer.onSubscribe(parent);
        //2. 发射数据
        source.subscribe(parent);
   
    }
  1. 调用观察者的onSubscribe()方法

MergeObserver的onSubscribe()方法

public void onSubscribe(Disposable d) {
            if (DisposableHelper.validate(this.upstream, d)) {
                //为upstream赋值,并调用下游的onSubscribe()方法
                this.upstream = d;
                downstream.onSubscribe(this);
            }
        }
  1. 发射数据

ObservableCreate对象发射数据这个过程在前面的文章中已经分析过了,这里我们只需要知道,当数据发出的时候,会调用flatMap操作符返回MergeObserver对象对应的onNext(),onError()和onComplete()方法。

               List<Integer> list1 = new ArrayList<>();
                list1.add(1);
                list1.add(2);
                list1.add(3);

                List<Integer> list2 = new ArrayList<>();
                list2.add(4);
                list2.add(5);
                list2.add(6);

                emitter.onNext(list1);
                emitter.onNext(list2);

                emitter.onComplete();

MergeObserver的onNext方法简化版

public void onNext(T t) {
        ObservableSource<? extends U> p;
        //应用传入的mapper,在这个例子中,p是一个ObservableFromIterable对象
        p = mapper.apply(t)
        subscribeInner(p);
    }

MergeObserver的subscribeInner方法简化版

void subscribeInner(ObservableSource<? extends U> p) {
        for (;;) {
                //1. 创建一个InnerObserver对象
                InnerObserver<T, U> inner = new InnerObserver<T, U>(this, uniqueId++);
                //2. 添加inner
                if (addInner(inner)) {
                    //3. 如果添加成功,p就订阅inner对象。
                    p.subscribe(inner);
                }
                //跳出循环
                break;
         }
    }
  1. 创建一个InnerObserver对象
    InnerObserver类是ObservableFlatMap的一个静态内部类,暂且不去管。

  2. 添加inner,内部操作就是把创建好的InnerObserver对象添加到observers中去。

final AtomicReference<InnerObserver<?, ?>[]> observers;
  1. 如果添加成功,p就订阅inner对象。正常情况下添加是成功的,这里的p就是我们应用flatMap后返回的对象。在上面的例子中就是Observable.fromIterable(integers)返回的对象。
    Observable的fromIterable()方法简化版
public static <T> Observable<T> fromIterable(Iterable<? extends T> source) {
        return new ObservableFromIterable<T>(source);
    }

返回一个ObservableFromIterable对象。

ObservableFromIterable的subscribeActual方法简化版

public void subscribeActual(Observer<? super T> observer) {
        Iterator<? extends T> it;
        it = source.iterator();
      
        boolean hasNext= it.hasNext();
     
        if (!hasNext) {
            EmptyDisposable.complete(observer);
            return;
        }

        FromIterableDisposable<T> d = new FromIterableDisposable<T>(observer, it);
        //1. 首先调用observer的onSubscribe()方法
        observer.onSubscribe(d);
    }
  1. 首先调用observer的onSubscribe()方法

InnerObserver的onSubscribe()方法简化版

 public void onSubscribe(Disposable d) {
            if (DisposableHelper.setOnce(this, d)) {
                if (d instanceof QueueDisposable) {
                    @SuppressWarnings("unchecked")
                    QueueDisposable<U> qd = (QueueDisposable<U>) d;
                    //1. m的取值是SYNC
                    int m = qd.requestFusion(QueueDisposable.ANY | QueueDisposable.BOUNDARY);
                    if (m == QueueDisposable.SYNC) {
                        fusionMode = m;
                        //2. 为queue赋值
                        queue = qd;
                        done = true;
                        //3 . 调用parent.drain()
                        parent.drain();
                        return;
                    }
                    if (m == QueueDisposable.ASYNC) {
                        fusionMode = m;
                        queue = qd;
                    }
                }
            }
        }
  1. m的取值是SYNC
    FromIterableDisposable的requestFusion()方法
public int requestFusion(int mode) {
            if ((mode & SYNC) != 0) {
                fusionMode = true;
                return SYNC;
            }
            return NONE;
        }

返回结果是SYNC

//2. 为queue赋值
这里明确一点即可,我们能从queue中取出我们想要的数据。

3 . 调用parent.drain()方法
这里的parent就是MergeObserver,我们回到MergeObserver类。
在这个例子中,drain()方法会被调用3次。在onNext() 方法中调用2次,然后我们发现在MergeObserver的onComplete中调用一次。

public void onComplete() {
            if (done) {
                return;
            }
            done = true;
            drain();
        }

drain()方法有点复杂,就不一行一行代码的去分析了,不过这个方法确实是flatMap操作符的精髓所在。在经过几次debug一步一步调试之后,大致看懂了这个方法的作用如下。

  1. 在onNext() 方法中调用drain()方法的时候,会从queue中取出我们的数据,然后调用下游的onNext()方法。(在这个例子中,MergeObserver的onNext() 方法会被调用2次,每次调用下游的observer的onNext() 方法3次)。

  2. 在onComplete() 方法中调用drain()方法,就直接调用下游observer的onComplete() 方法。

总结:自己分析的还是很粗糙的,在后续的学习和研究过程中会不断完善。

相关文章

网友评论

      本文标题:RxJava map操作符和flatMap操作符的实现

      本文链接:https://www.haomeiwen.com/subject/eicftqtx.html