美文网首页
Rxjava源码解析

Rxjava源码解析

作者: 卓码 | 来源:发表于2016-04-11 14:48 被阅读0次

    这边文章主要记录使用Rxjava过程中对map方法以及flatmap方法的源码理解,自认为也是RxJava的一个精髓所在。
    有关RxJava的详细使用,网络已经有很多资料。这里推荐[匠心写作]的一篇文章

    http://gank.io/post/560e15be2dca930e00da1083

    下面进入正题,先看下map方法

    map方法解析:

    public final <R> Observable<R> map(Func1<? super T, ? extends R> func) {    
        return lift(new OperatorMap<T, R>(func));
    }
    

    OperatorMap是Operator接口的实现类,来看一下Operator接口

    public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {    
        // cover for generics insanity
    }
    

    而Operator又继承了Func1,这个接口有一个只有一个方法R call(T t);
    看下OperatorMap的call方法实现

    public Subscriber<? super T> call(final Subscriber<? super R> o) {
            return new Subscriber<T>(o) {
    
                @Override
                public void onCompleted() {
                    o.onCompleted();
                }
    
                @Override
                public void onError(Throwable e) {
                    o.onError(e);
                }
    
                @Override
                public void onNext(T t) {
                    try {
                        o.onNext(transformer.call(t));
                    } catch (Throwable e) {
                        Exceptions.throwOrReport(e, this, t);
                    }
                }
    
            };
        }
    

    然后看lift函数

    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
            return new Observable<R>(new OnSubscribe<R>() {
                @Override
                public void call(Subscriber<? super R> o) {
                    try {
                        Subscriber<? super T> st = hook.onLift(operator).call(o);
                        try {
                            // new Subscriber created and being subscribed with so 'onStart' it
                            st.onStart();
                            onSubscribe.call(st);
                        } catch (Throwable e) {
                            // localized capture of errors rather than it skipping all operators 
                            // and ending up in the try/catch of the subscribe method which then
                            // prevents onErrorResumeNext and other similar approaches to error handling
                            Exceptions.throwIfFatal(e);
                            st.onError(e);
                        }
                    } catch (Throwable e) {
                        Exceptions.throwIfFatal(e);
                        // if the lift function failed all we can do is pass the error to the final Subscriber
                        // as we don't have the operator available to us
                        o.onError(e);
                    }
                }
            });
        }
    

    简单点来说呢,做了三件事
    1.创建新的Observable,负责接手原Observable发出的事件
    2.hook.onLift(operator).call(o) 会执行OperatorMap的call方法,返回一个新的Subscriber。
    3.onSubscribe.call(st)新的Subscriber会传给原Observable。在原Observable发送事件是会调用新Subscriber的onNext方法,会先执行transformer.call(t) 即map(Func1)方法中参数Func1的call方法,然后执行原Subscriber的onNext方法。

    faltmap方法解析

    还是老样子,先贴一下faltmap的源码

    public final <R> Observable<R> flatMap(Func1<? super T, ? extends Observable<? extends R>> func) {
            if (getClass() == ScalarSynchronousObservable.class) {
                return ((ScalarSynchronousObservable<T>)this).scalarFlatMap(func);
            }
            return merge(map(func));
    }
    

    这里会执行 merge(map(func)) 这里其实就是merge了刚才的map方法嘛。
    好了,map方法我就不说了,可以看下前面的解释,这里提示一点 这里的map方法返回的是Observable<Observable>,然后看下merge方法

    public final static <T> Observable<T> merge(Observable<? extends Observable<? extends T>> source) {
            if (source.getClass() == ScalarSynchronousObservable.class) {
                return ((ScalarSynchronousObservable<T>)source).scalarFlatMap((Func1)UtilityFunctions.identity());
            }
            return source.lift(OperatorMerge.<T>instance(false));
    }
    

    这里会执行到 source.lift(OperatorMerge.<T>instance(false))其实就是之前map方法里面介绍的lift方法。这里的source类型是Observable<Observable>可以理解成一个新Observable接收所有原Observable发出的事件,组成一个新的Observable。然后执行lift的方法,从之前map方法的分析知道,这里会先去执行Operator中的call方法,然后执行原Subscriber中的call方法。
    这里Operator为OperatorMerge,看下这个类中的call方法

    public Subscriber<Observable<? extends T>> call(final Subscriber<? super T> child) {
            MergeSubscriber<T> subscriber = new MergeSubscriber<T>(child, delayErrors, maxConcurrent);
            MergeProducer<T> producer = new MergeProducer<T>(subscriber);
            subscriber.producer = producer;
            
            child.add(subscriber);
            child.setProducer(producer);
            
            return subscriber;
    }
    

    这里经过一些操作,最终会走到MergeProducer中的request方法中

    public void request(long n) {
        if (n > 0) {
            if (get() == Long.MAX_VALUE) {
                return;
            }
            BackpressureUtils.getAndAddRequest(this, n);
            subscriber.emit();
        } else 
        if (n < 0) {
            throw new IllegalArgumentException("n >= 0 required");
        }
    }
    

    这里注意这句subscriber.emit()会发送所有的事件。这样就解释通了
    1.接收所有原Observable的事件,组成新的Observable
    2.新Observable发送所有事件
    3.原Subscriber接收到新事件后进行处理

    总结:

    不管是map还是flatmap,其实都是运用了转换的思想

    1. 截断原事件分发流程。
    2. 增加中间处理操作(map 增加了一个call方法回调,flatmap增加了一次事件收集再发送)。
    3. 回到原事件分发流程处理事件。

    相关文章

      网友评论

          本文标题:Rxjava源码解析

          本文链接:https://www.haomeiwen.com/subject/chomlttx.html