美文网首页
RxJava中lift()变换原理

RxJava中lift()变换原理

作者: lxbnjupt | 来源:发表于2018-04-18 10:54 被阅读0次

    在RxJava中,map(),flatMap()虽然功能各有不同,但实质上都是针对事件序列的处理和再发送。而在它们内部,其实都是基于同一个基础的变换方法,即lift(Operator)。lift(Operator)方法是RxJava中所有操作符的基础,可以通过它做各种各样的变化,弄清楚它的原理,也方便我们理解其他操作符。

    1.Func1接口 和 Operator接口

    /**
     * Represents a function with one argument.
     */
    public interface Func1<T, R> extends Function {
        R call(T t);
    }
    
    /**
    * Operator function for lifting into an Observable.
    */
    public interface Operator<R, T> extends Func1<Subscriber<? super R>, Subscriber<? super T>> {
        // cover for generics insanity
    }
    

    Func1接口会按照泛型参数的顺序传入T,并返回R。按照Func1接口的定义,Operator接口会传入一个Subscriber<? super R>参数,并返回一个Subscriber<? super T>。

    看到Observable<T>,Operator<R, T>,Func1<T, R>这2个类的泛型参数,可能会有这样的疑问,即Operator的泛型参数顺序为什么是<R, T>,而不是<T, R>?其实,这里我们并不需要关心顺序是什么,只需要记住Operator<R, T>是按照泛型参数的顺序,传入一个Subscriber<R>参数,并返回一个Subscriber<T>。

    2.lift(Operator)内部实现原理

    public final <R> Observable<R> lift(final Operator<? extends R, ? super T> operator) {
            return new Observable<R>(new OnSubscribe<R>() {
                @Override
                public void call(Subscriber<? super R> o) {
                    try {
                        //也可以看作是Subscriber<? super T> st = operator.call(o);
                        Subscriber<? super T> st = hook.onLift(operator).call(o);
                        try {
                            // new Subscriber created and being subscribed with so 'onStart' it
                            st.onStart();
                            // 这里的onSubscribe是调用lift方法的Observable中的onSubscribe,即Observable<T>中的onSubscribe
                            onSubscribe.call(st);
                        } catch (Throwable e) {
                            // localized capture of errors rather than it skipping all operators 
                            // and ending up in the try/catch of the subscribe method which then
                            // prevents onErrorResumeNext and other similar approaches to error handling
                            Exceptions.throwIfFatal(e);
                            st.onError(e);
                        }
                    } catch (Throwable e) {
                        Exceptions.throwIfFatal(e);
                        // if the lift function failed all we can do is pass the error to the final Subscriber
                        // as we don't have the operator available to us
                        o.onError(e);
                    }
                }
            });
        }
    

    (1)假设已有一个Observable<T>,调用lift()方法,生成一个Observable<R>,此时就有了两个Observable和两个OnSubscribe对象。
    (2)然后调用Observable<R>的subscribe()方法,传入一个Subscriber<R>对象,此时触发Observable<R>.onSubscribe.call()方法,也就是上面lift()方法中的call()方法。
    (3)在上述lift()的call()方法中会调用onSubscribe.call()方法,注意这个onSubscribe是Observable<T>中的那个OnSubscribe<T>对象,它需要传入一个Subscriber<T>对象,这个对象是通过operator.call()方法生成的。正是这个Operator对象将两个Subscriber对象关联起来,OnSubscribe<T>在执行Subscriber<T>.onNext(T t)方法的时候也会执行Subscriber<R>.onNext(R r)。

    小结

    上面的Observable<T>是事件源,对它进行lift()变换得到新的Observable<R>,这个新的Observable<R>的回调已经固定,也就是上面lift()方法中的call()方法。此时调用Observable<R>的subscribe()方法,传入的Subscriber<R>是用户定义的事件监听者,它监听的是新的Observable<R>。这个Observable<R>的回调是固定的,它并不能产生新事件,所以得靠事件源Observable<T>。那么,此时通过Operator生成一个中间的Subscriber<T>对象,然后利用这个Subscriber<T>向原始Observable<T>进行订阅,然后原始Observable<T>就开始发送事件。但是这个Subscriber<T>对象并没有消耗事件,而是起着一个代理的作用,即接收事件源Observable<T>的事件,并将事件转给用户定义的Subscriber<R>。所以,Operator可以看做是一个生成代理的工具类,通过它生成一个代理Subscriber<T>,然后通过代理Subscriber<T>将事件发送给Subscriber<R>。
    精简版:在Observable执行了lift(Operator) 方法之后,会返回一个新的Observable,这个新的 Observable会像一个代理一样,负责接收原始的Observable 发出的事件,并在处理后发送给 Subscriber。

    3.举个例子:通过lift(Operator)将Integer变换为String

    Observable.create(new Observable.OnSubscribe<Integer>() {
    
                @Override
                public void call(Subscriber<? super Integer> subscriber) {
                    subscriber.onNext(121);
                    Log.e(TAG, "lift call Integer: 121");
                }
            }).lift(new Observable.Operator<String, Integer>() {
    
                @Override
                public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
                    return new Subscriber<Integer>() {
                        @Override
                        public void onCompleted() {
    
                        }
    
                        @Override
                        public void onError(Throwable e) {
    
                        }
    
                        @Override
                        public void onNext(Integer integer) {
                            Log.e(TAG, "Subscriber<Integer> onNext: " + integer);
                            Log.e(TAG, "lift transform: " + "Integer to String :" + integer);
                            subscriber.onNext("Integer to String " + integer);
                        }
                    };
                }
            }).subscribe(new Subscriber<String>() {
    
                @Override
                public void onCompleted() {
    
                }
    
                @Override
                public void onError(Throwable e) {
    
                }
    
                @Override
                public void onNext(String s) {
                    Log.e(TAG, "Subscriber<String> onNext: " + s);
                }
            });
    

    相关文章

      网友评论

          本文标题:RxJava中lift()变换原理

          本文链接:https://www.haomeiwen.com/subject/nhtvkftx.html