美文网首页Android开源框架源码解析
RxJava源码分析(2) 变换原理

RxJava源码分析(2) 变换原理

作者: 导火索 | 来源:发表于2018-05-26 09:02 被阅读5次

    RxJava源码分析基于RxJava1.3.8。

    在上一章节中,主要介绍了RxJava的基本使用并对该部分的源码做了详细分析。在这一章节中,将主要介绍RxJava的另一大核心功能:变换

    变换,就是将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。

    在RxJava中,提供了许多针对不同场景实现变换功能的操作符,如下:

    • map()
    • flatMap(), concatMap(), and flatMapIterable()
    • switchMap()
    • scan()
    • groupBy()
    • buffer()
    • window()
    • cast()

    虽然,RxJava中提供了如此多的变换操作符,但是变换的原理基本都是一致的。在本章节将主要针对map()和flatMap()操作符来分析变换的原理。

    接下来,先通过一个实例来理解什么是变换以及map()实现变换的原理。

    map变换

    实例

    需求,根据图片url实现ImageView图片的加载。

    以下通过RxJava实现该操作:

    Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            // 1、传入图片路径
            subscriber.onNext("http://localhost:8080/...");
        }
    }).map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String s) {
            // 2、根据图片路径获取Bitmp并返回
            Bitmap bitmap = getImageBitmap(s);
            return bitmap;
        }
    }).subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            // 3、ImageView设置Bitmap
            imageView.setImageBitmap(bitmap);
        }
    });
    

    在上面RxJava代码中,功能很清晰就是通过url实现图片的加载的流程,并且这里就完成了一个变换的过程:我们只传入了一个图片url,但在map方法中完成图片url到Bitmap转换的过程,最终在<code>Action1.call(Bitmap)</code>中只需要设置bitmap给ImageView即可。

    所以在上述流程中,<code>map(Func1)</code>是完成变换的核心方法,下面我们就对该方法的源码做具体分析。

    为了便于上述代码的分析,我们将上面链式调用的代码改成非链式调用的代码,这要能更清楚的展示两个Observable对象:stringObservablemapObservable,如下:

    Observable<String> stringObservable = Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            // 1、传入图片路径
            subscriber.onNext("http://localhost:8080/...");
        }
    });
    Observable<Bitmap> mapObservable = stringObservable.map(new Func1<String, Bitmap>() {
        @Override
        public Bitmap call(String s) {
            // 2、根据图片路径获取Bitmp并返回
            Bitmap bitmap = getImageBitmap(s);
            return bitmap;
        }
    });
    mapObservable.subscribe(new Action1<Bitmap>() {
        @Override
        public void call(Bitmap bitmap) {
            // 3、ImageView设置Bitmap
            imageView.setImageBitmap(bitmap);
        }
    });
    

    map操作符源码分析

    首先,我们看下<code>map(Func1)</code>方法的具体实现,如下:

    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {
        // 创建一个Observable对象并返回
        return unsafeCreate(new OnSubscribeMap<T, R>(this, func));
    }
    

    关于unsafeCreate()方法在上一章节已经讲解过,这里就不在重复了。

    在<code>map(Func1)</code>方法中,主要有两个功能:

    1. 创建并返回Observable对象,对于该Observable对象这里称之为mapObservable对象。
    2. 为mapObservable对象创建了一个OnSubscribe对象,也就是OnSubscribeMap对象(OnSubscribeMap对象是实现变换以及链式调用的核心)

    在通过<code>map(Func1)</code>方法创建并返回mapObservable对象后,该mapObservable就通过调用<code>subscribe()</code>方法开始整个事件。

    根据上章节对<code>subscribe()</code>方法的分析可知:在<code>subscribe()</code>方法内部会调用对应Observable对象(这里就是mapObservable对象)的<code>onSubscribe.call()</code>方法。而在mapObservable对象中onSubscribe实际就是指<code>OnSubscribeMap</code>对象,所以接下来就需要看看该类都做了些什么。

    OnSubscribeMap

    OnSubscribeMap源码如下:

    
    public final class OnSubscribeMap<T, R> implements OnSubscribe<R> {
    
        final Observable<T> source;
        final Func1<? super T, ? extends R> transformer;
    
        /**
         * 1、构造方法
         *
         * @param source        变换前Observable对象,也就是stringObservable
         * @param transformer   Func1对象,通过map方法传入的参数
         */
        public OnSubscribeMap(Observable<T> source, Func1<? super T, ? extends R> transformer) {
            this.source = source;
            this.transformer = transformer;
        }
    
        /**
         * 开始事件的方法
         *
         * @param o 观察者对象,该参数就是通过subscribe()方法传入的参数
         */
        @Override
        public void call(final Subscriber<? super R> o) {
            // 创建观察者对象,MapSubscriber
            MapSubscriber<T, R> parent = new MapSubscriber<T, R>(o, transformer);
            // 将创建的观察者对象添加到定于列表中
            o.add(parent);
            // 调用方法,这是整个事件的开始
            source.unsafeSubscribe(parent);
        }
    
        static final class MapSubscriber<T, R> extends Subscriber<T> {
        
        // 该内部类稍后在讲解
        }
    }
    
    

    这里,我们需要关注的是call()方法,在call()方法内部,最终会通过<code>source.unsafeSubscribe(parent)</code>开始整个事件的链式调用。

    这里,我们看下<code>source.unsafeSubscribe(parent)</code>方法的内部实现:

    public final Subscription unsafeSubscribe(Subscriber<? super T> subscriber) {
        try {
            subscriber.onStart();
            /**
             * 1、通过onObservableStart()方法获取OnSubscribe对象,也就是创建stringObservable对象是传入的OnSubscribe对象
             * 2、调用OnSubscribe对象的call方法
             */
            RxJavaHooks.onObservableStart(this, onSubscribe).call(subscriber);
            return RxJavaHooks.onObservableReturn(subscriber);
        } catch (Throwable e) {
            // 省略部分代码
        }
    }
    
    

    对于source变量,在<code>OnSubscribeMap</code>代码中就提到过,source变量就是指向引用stringObservable对象,这样虽然是通过mapObservable对象先调用subscribe()方法,但是最终还是stringObservable调用了subscribe()方法,这样也就保证了链式调用始终是从上而下执行的。

    MapSubscriber

    在上面的流程中,我们仅仅完成了链式调用的第一步,也就是stringObservable完成了call方法的调用,那么变换又是如何实现的呢?

    在<code>stringObservable.call(Subscriber)</code>方法调用时,我们需要注意此时<code>call(Subscriber)</code>方法中传入的参数实际已经变成了MapSubscriber(看OnSubscribeMap.call()方法)。

    MapSubscriber是OnSubscribeMap的一个静态内部类并且该类继承自Subscriber,所以它是一个观察者。

    MapSubscriber类代码如下:

    static final class MapSubscriber<T, R> extends Subscriber<T> {
        final Subscriber<? super R> actual;
    
        final Func1<? super T, ? extends R> mapper;
    
        boolean done;
    
        /**
         * 构造方法
         * @param actual  观察者对象,也就是subscribe()方法传入的参数
         * @param mapper  Func1对象,通过map方法传入的参数
         */
        public MapSubscriber(Subscriber<? super R> actual, Func1<? super T, ? extends R> mapper) {
            this.actual = actual;
            this.mapper = mapper;
        }
    
        @Override
        public void onNext(T t) {
            R result;
    
            try {
                // 1、调用Func1对象的call方法,实现事件变换
                result = mapper.call(t);
            } catch (Throwable ex) {
                Exceptions.throwIfFatal(ex);
                unsubscribe();
                onError(OnErrorThrowable.addValueAsLastCause(ex, t));
                return;
            }
    
            // 2、完成变换后,将变换后的结果传入到下一个观察者
            actual.onNext(result);
        }
    
        @Override
        public void onError(Throwable e) {
            if (done) {
                RxJavaHooks.onError(e);
                return;
            }
            done = true;
    
            actual.onError(e);
        }
    
    
        @Override
        public void onCompleted() {
            if (done) {
                return;
            }
            actual.onCompleted();
        }
    
        @Override
        public void setProducer(Producer p) {
            actual.setProducer(p);
        }
    
    }
    

    根据上述代码可知:<code>MapSubscriber.onNext()</code>方法就是实现变换的核心。在该方法中会根据传入的原始参数并通过<code>Func1.call(t)</code>完成变换功能,最终将变换的结果传递到下一个事件中去。

    这里,我们可以通过一张图来描述上述整个过程:


    这里写图片描述

    flatMap变换

    flatMap操作符:使用一个指定的函数对原始Observable发射的每一项数据执行变换操作,这个函数返回一个本身也发射数据的Observable,然后FlatMap合并这些Observables发射的数据,最后将合并后的结果当做它自己的数据序列发射。

    实例演示

    需求:输入学生列表中的每个学生的具体课程名称。

    通过RxJava代码实现:

    Observable
            .from(students)
            .flatMap(new Func1<Student, Observable<Course>>() {
                @Override
                public Observable<Course> call(Student student) {
                    return Observable.from(student.courseList);
                }
            })
            .subscribe(new Action1<Course>() {
                @Override
                public void call(Course course) {
                    System.out.println(course.courseName);
                }
            });
    

    同样的,为了便于分析源码,我们将上面链式调用的源码改为非链式调用的源码,如下:

    Observable<Student> fromObservable = Observable.from(students);
    Observable<Course> flatMapObservable = fromObservable.flatMap(new Func1<Student, Observable<Course>>() {
        @Override
        public Observable<Course> call(Student student) {
            return Observable.from(student.courseList);
        }
    });
    flatMapObservable.subscribe(new Action1<Course>() {
        @Override
        public void call(Course course) {
            System.out.println(course.courseName);
        }
    });
    

    flatMap操作符源码分析

    创建flatMapObservable

    首先,我们看下flatMapObservable的创建过程,内部代码如下:

    /**
     * 开始变换
     * 
     * @param func
     * @param <R>
     * @return
     */
    public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
        if (getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
        }
    
        /*
         * 1、通过map(func)方法创建一个Observable对象,该对象的onSubscribe类型为OnSubscribeMap
         * 2、通过merge()方法创键创建一个新的Observable对象,并返回最终通过该Observable调用subscribe()方法
         */
        return merge(map(func));
    }
    
    /**
     * 创建变换后的Observable对象
     * 
     * @param source    通过map(func)方法创建一个Observable对象,改对象的onSubscribe类型为OnSubscribeMap
     * @return
     */
    public static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
        if (source.getClass() == ScalarSynchronousObservable.class) {
            return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
        }
    
        /*
         * 1、OperatorMerge.<T>instance(false)获取一个OperatorMerge(HolderNoDelay.INSTANCE)对象,该对象实际就是一个Func1
         *
         * 2、通过map(func)创建的Observable对象调用lift()方法
         */
        return source.lift(OperatorMerge.<T>instance(false));
    }
    
    /*
     * 1、通过map(func)创建的Observable对象调用lift()方法
     *
     * @param operator   OperatorMerge.<T>instance(false)获取一个OperatorMerge对象,该对象实际就是一个Func1
     * @param <R>
     * @return
     */
    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
        /*
         *  这里onSubscribe实例是OnSubscribeMap
         */
        return unsafeCreate(new OnSubscribeLift<T, R>(onSubscribe, operator));
    }
    
    /**
     * 最终通过flatMap()方法创建的Observable,该类的onSubscribe类型为OnSubscribeLift
     *
     * @param f
     * @param <T>
     * @return
     */
    public static <T> Observable<T> unsafeCreate(OnSubscribe<T> f) {
        return new Observable<T>(RxJavaHooks.onCreate(f));
    }
    
    

    在flatMap操作符下,将创建一个新的Observable对象,该对象的onSubscribe对象实例为OnSubscribeLift,最终在该<code>Observable.subscribe()</code>方法调用时<code>OnSubscribeLift.call()</code>会被调用。

    接下来,就来看下订阅的过程吧。

    flatMapObservable订阅事件

    完成了flatMapObservable变换Observable的创建后,就要调用订阅方法subscribe()了,如下:

    flatMapObservable.subscribe(new Action1<Course>() {
        @Override
        public void call(Course course) {
            System.out.println(course.courseName);
        }
    });
    

    通过在上一章节中的介绍,我们了解到<code>flatMapObservable.subscribe(subscriber)</code>调用时,内部会调用<code>flatMapObservable.onSubscribe.call(subscriber)</code>,这里就需要确定<code>flatMapObservable.onSubscribe</code>的具体类型了。

    由上面flatMapObservable的创建可知,<code>flatMapObservable.onSubscribe</code>的具体类型就是<code>lift()</code>方法中创建的<code>OnSubscribeLift</code>。

    OnSubscribeLift源码如下:

    public final class OnSubscribeLift<T, R> implements Observable.OnSubscribe<R> {
        /**
         * parent类型,就是创建fromObservable时,创建的OnSubscribe对象
         *
         * OnSubscribeMap类型,
         */
        final Observable.OnSubscribe<T> parent;
        /**
         * OperatorMerge(HolderNoDelay.INSTANCE)对象
         */
        final Observable.Operator<? extends R, ? super T> operator;
    
        public OnSubscribeLift(Observable.OnSubscribe<T> parent, Observable.Operator<? extends R, ? super T> operator) {
            this.parent = parent;
            this.operator = operator;
        }
    
        @Override
        public void call(Subscriber<? super R> o) {
            try {
                // 创建一个MergeSubscriber,该对象被被o持有
                Subscriber<? super T> st = RxJavaHooks.onObservableLift(operator).call(o);
                try {
                    st.onStart();
                    //  OnSubscribeMap.call()被调用,这个里面其实就map操作符转换的结果
                    parent.call(st);
                } catch (Throwable e) {
                    // ....
                }
            } catch (Throwable e) {
                // ....
            }
        }
    }
    

    在<code>OnSubscribeLift.call(Subscriber)</code>方法中,最终会调用<code>parent.call(st)</code>,这个parent就是<code>fromObservable.onSubscribe</code>对象,这里指的就是OnSubscribeMap对象,<font color='red'>这样RxJava链式调用的第一步就完成了,其实还是map()操作符的变换</font>

    然而,与普通map操作符不同的是,在<code>parent.call(st)</code>方法中传入的参数为MergeSubscriber,这个参数最终会被MapSubscriber对象持有并在onNext()方法中被调用到。

    MergeSubscriber源码如下(该方法过长,这里就看下onNext方法),这个对象的作用是将完成转化的序列(这是是指Observable序列)依次发射出去并最终被调用。

    static final class MergeSubscriber<T> extends Subscriber<Observable<? extends T>> {
        
        // 通过onNext的参数就能知道,这个是处理转换后的Observable
        @Override
        public void onNext(Observable<? extends T> t) {
            if (t == null) {
                return;
            }
            if (t == Observable.empty()) {
                emitEmpty();
            } else
            if (t instanceof ScalarSynchronousObservable) {
                tryEmit(((ScalarSynchronousObservable<? extends T>)t).get());
            } else {
                // 1、创建一个Subscriber
                InnerSubscriber<T> inner = new InnerSubscriber<T>(this, uniqueId++);
                // 2、通过addInner()方法存储所有转换后的Subscriber,合并所有的Observable对象。
                addInner(inner);
                t.unsafeSubscribe(inner);
                // 3、将转换后的序列发出并处理,这里最终会调用我们subscribe()方法中传入的Subscriber.onNext()方法。
                emit();
            }
        }
    
    }
    
    

    这样关于flatmap变换操作的讲解就讲到这里了。

    总结

    通过map及flatMap操作符源码的分析可知,对于变换操作符的核心流程就是通过OnSubscribeLift类实现的。同时创建<code>MapSubscribe</code>及<code>OnSubscribeMap</code>这两个类,对于不同种类的变换不同的仅仅就是最终在OnSubscribeLift指定这两个类中具体的属性而已。

    相关文章

      网友评论

        本文标题:RxJava源码分析(2) 变换原理

        本文链接:https://www.haomeiwen.com/subject/tjosjftx.html