Transformer 在RxJava中的使用

作者: fengzhizi715 | 来源:发表于2017-07-15 12:00 被阅读1704次
    Transformer.jpeg

    Transformer 用途

    Transformer,顾名思义是转换器的意思。早在 RxJava1.x 版本就有了Observable.Transformer、Single.Transformer和Completable.Transformer,在2.x版本中变成了ObservableTransformer、SingleTransformer、CompletableTransformer、FlowableTransformer和MaybeTransformer。其中,FlowableTransformer和MaybeTransformer是新增的。由于 RxJava2 将Observable拆分成 Observable 和 Flowable,所以多了一个FlowableTransformer。同时,Maybe是 RxJava2 新增的一个类型,所以多了MaybeTransformer。

    Transformer 能够将一个 Observable/Flowable/Single/Completable/Maybe 对象转换成另一个 Observable/Flowable/Single/Completable/Maybe 对象,和调用一系列的内联操作符是一模一样的。

    举个简单的例子,写一个transformer()方法将一个发射整数的Observable转换为发射字符串的Observable。

    public static <String> ObservableTransformer<Integer, java.lang.String> transformer() {
            return new ObservableTransformer<Integer, java.lang.String>() {
                @Override
                public ObservableSource<java.lang.String> apply(@NonNull Observable<Integer> upstream) {
                    return upstream.map(new Function<Integer, java.lang.String>() {
                        @Override
                        public java.lang.String apply(@NonNull Integer integer) throws Exception {
                            return java.lang.String.valueOf(integer);
                        }
                    });
                }
            };
        }
    

    接下来是使用transformer()方法,通过标准的RxJava的操作。

    Observable.just(123,456)
           .compose(transformer())
           .subscribe(new Consumer<String>() {
                  @Override
                   public void accept(@io.reactivex.annotations.NonNull String s) throws Exception {
                       System.out.println("s="+s);
                        }
                    });
    

    最后打印了二次,分别是

    s=123
    s=456
    

    通过这个例子,可以简单和直观地了解到Transformer的作用。

    其实,在大名鼎鼎的图片加载框架 Glide 以及 Picasso 中也有类似的transform概念,能够将图形进行变换。

    跟compose操作符相结合

    compose操作于整个数据流中,能够从数据流中得到原始的Observable<T>/Flowable<T>...
    当创建Observable/Flowable...时,compose操作符会立即执行,而不像其他的操作符需要在onNext()调用后才执行。

    关于compose操作符,老外的这篇文章不错Don't break the chain: use RxJava's compose() operator
    国内也有相应的翻译【译】避免打断链式结构:使用.compose( )操作符

    常用的场景

    1. 切换到主线程

    对于网络请求,我们经常会做如下的操作来切换线程。

    .subscribeOn(Schedulers.io())
    .observeOn(AndroidSchedulers.mainThread())
    

    于是,我做了一个简单的封装。

    import io.reactivex.FlowableTransformer
    import io.reactivex.ObservableTransformer
    import io.reactivex.android.schedulers.AndroidSchedulers
    import io.reactivex.schedulers.Schedulers
    
    /**
     * Created by Tony Shen on 2017/7/13.
     */
    object RxJavaUtils {
    
        @JvmStatic
        fun <T> observableToMain():ObservableTransformer<T, T> {
    
            return ObservableTransformer{
                upstream ->
                upstream.subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
            }
        }
    
        @JvmStatic
        fun <T> flowableToMain(): FlowableTransformer<T, T> {
    
            return FlowableTransformer{
                upstream ->
                upstream.subscribeOn(Schedulers.io())
                        .observeOn(AndroidSchedulers.mainThread())
            }
        }
    }
    

    上面这段代码是Kotlin写的,为啥不用Java?个人习惯把一些工具类来用Kotlin来编写,而且使用lambda表达式也更为直观。

    对于Flowable切换到主线程的操作,可以这样使用

    .compose(RxJavaUtils.flowableToMain())
    

    2. RxLifecycle中的LifecycleTransformer

    trello出品的RxLifecycle能够配合Android的生命周期,防止App内存泄漏,其中就使用了LifecycleTransformer。
    知乎也做了一个类似的RxLifecycle,能够做同样的事情。

    在我的项目中也使用了知乎的RxLifecycle,根据个人的习惯和爱好,我对LifecycleTransformer稍微做了一些修改,将五个Transformer合并成了一个。

    import org.reactivestreams.Publisher;
    
    import io.reactivex.Completable;
    import io.reactivex.CompletableSource;
    import io.reactivex.CompletableTransformer;
    import io.reactivex.Flowable;
    import io.reactivex.FlowableTransformer;
    import io.reactivex.Maybe;
    import io.reactivex.MaybeSource;
    import io.reactivex.MaybeTransformer;
    import io.reactivex.Observable;
    import io.reactivex.ObservableSource;
    import io.reactivex.ObservableTransformer;
    import io.reactivex.Single;
    import io.reactivex.SingleSource;
    import io.reactivex.SingleTransformer;
    import io.reactivex.annotations.NonNull;
    import io.reactivex.functions.Function;
    import io.reactivex.functions.Predicate;
    import io.reactivex.processors.BehaviorProcessor;
    
    /**
     * Created by Tony Shen on 2017/5/25.
     */
    
    public class LifecycleTransformer<T> implements ObservableTransformer<T, T>,
            FlowableTransformer<T, T>,
            SingleTransformer<T, T>,
            MaybeTransformer<T, T>,
            CompletableTransformer {
    
        private final BehaviorProcessor<Integer> lifecycleBehavior;
    
        private LifecycleTransformer() throws IllegalAccessException {
            throw new IllegalAccessException();
        }
    
        public LifecycleTransformer(@NonNull BehaviorProcessor<Integer> lifecycleBehavior) {
            this.lifecycleBehavior = lifecycleBehavior;
        }
    
        @Override
        public CompletableSource apply(Completable upstream) {
            return upstream.ambWith(
                    lifecycleBehavior.filter(new Predicate<Integer>() {
                        @Override
                        public boolean test(@LifecyclePublisher.Event Integer event) throws Exception {
                            return event == LifecyclePublisher.ON_DESTROY_VIEW ||
                                    event == LifecyclePublisher.ON_DESTROY ||
                                    event == LifecyclePublisher.ON_DETACH;
                        }
                    }).take(1).flatMapCompletable(new Function<Integer, Completable>() {
                        @Override
                        public Completable apply(Integer flowable) throws Exception {
                            return Completable.complete();
                        }
                    })
            );
        }
    
        @Override
        public Publisher<T> apply(final Flowable<T> upstream) {
            return upstream.takeUntil(
                    lifecycleBehavior.skipWhile(new Predicate<Integer>() {
                        @Override
                        public boolean test(@LifecyclePublisher.Event Integer event) throws Exception {
                            return event != LifecyclePublisher.ON_DESTROY_VIEW &&
                                    event != LifecyclePublisher.ON_DESTROY &&
                                    event != LifecyclePublisher.ON_DETACH;
                        }
                    })
            );
        }
    
        @Override
        public MaybeSource<T> apply(Maybe<T> upstream) {
            return upstream.takeUntil(
                    lifecycleBehavior.skipWhile(new Predicate<Integer>() {
                        @Override
                        public boolean test(@LifecyclePublisher.Event Integer event) throws Exception {
                            return event != LifecyclePublisher.ON_DESTROY_VIEW &&
                                    event != LifecyclePublisher.ON_DESTROY &&
                                    event != LifecyclePublisher.ON_DETACH;
                        }
                    })
            );
        }
    
        @Override
        public ObservableSource<T> apply(Observable<T> upstream) {
            return upstream.takeUntil(
                    lifecycleBehavior.skipWhile(new Predicate<Integer>() {
                        @Override
                        public boolean test(@LifecyclePublisher.Event Integer event) throws Exception {
                            return event != LifecyclePublisher.ON_DESTROY_VIEW &&
                                    event != LifecyclePublisher.ON_DESTROY &&
                                    event != LifecyclePublisher.ON_DETACH;
                        }
                    }).toObservable()
            );
        }
    
        @Override
        public SingleSource<T> apply(Single<T> upstream) {
            return upstream.takeUntil(
                    lifecycleBehavior.skipWhile(new Predicate<Integer>() {
                        @Override
                        public boolean test(@LifecyclePublisher.Event Integer event) throws Exception {
                            return event != LifecyclePublisher.ON_DESTROY_VIEW &&
                                    event != LifecyclePublisher.ON_DESTROY &&
                                    event != LifecyclePublisher.ON_DETACH;
                        }
                    })
            );
        }
    }
    
    

    3. 缓存的使用

    对于缓存,我们大致都会这样写

    cache.put(key,value);
    

    更优雅一点的做法是使用AOP,大致会这样写

    @Cacheable(key = "...")
    getValue() {
        ....
    }
    

    如果你想在RxJava的链式调用中也使用缓存,还可以考虑使用transformer的方式,下面我写了一个简单的方法

    /**
     * Created by Tony Shen on 2017/7/13.
     */
    
    public class RxCache {
    
        public static <T> FlowableTransformer<T, T> transformer(final String key, final Cache cache) {
            return new FlowableTransformer<T, T>() {
                @Override
                public Publisher<T> apply(@NonNull Flowable<T> upstream) {
    
                    return upstream.map(new Function<T, T>() {
                        @Override
                        public T apply(@NonNull T t) throws Exception {
                            cache.put(key,(Serializable) t);
                            return t;
                        }
                    });
    
                }
            };
        }
    }
    

    结合上述三种使用场景,封装了一个方法用于获取内容,在这里网络框架使用Retrofit。虽然Retrofit本身支持通过Interceptor的方式来添加Cache,但是可能某些业务场景下还是想用自己的Cache,那么可以采用下面类似的封装。

        /**
         * 获取内容
         * @param fragment
         * @param param
         * @param cacheKey
         * @return
         */
        public Flowable<ContentModel> getContent(Fragment fragment,ContentParam param,String cacheKey) {
    
            return apiService.loadVideoContent(param)
                    .compose(RxLifecycle.bind(fragment).<ContentModel>toLifecycleTransformer())
                    .compose(RxJavaUtils.<ContentModel>flowableToMain())
                    .compose(RxCache.<ContentModel>transformer(cacheKey,App.getInstance().cache));
        }
    

    4. 追踪RxJava的使用

    初学者可能会对RxJava内部的数据流向会感到困惑,所以我写了一个类用于追踪RxJava的使用,对于调试代码还蛮有帮助的。

    先来看一个简单的例子

    Observable.just("tony","cafei","aaron")
                    .compose(RxTrace.<String>logObservable("first",RxTrace.LOG_SUBSCRIBE|RxTrace.LOG_NEXT_DATA))
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(@io.reactivex.annotations.NonNull String s) throws Exception {
                            System.out.println("s="+s);
                        }
                    });
    

    下图显示了上面代码中的数据流向。


    第一次做Trace.png

    然后,再刚才代码的基础上加一个map操作符,把小写的字符串都转换成大写。

    Observable.just("tony","cafei","aaron")
                    .compose(RxTrace.<String>logObservable("first",RxTrace.LOG_SUBSCRIBE|RxTrace.LOG_NEXT_DATA))
                    .map(new Function<String, String>() {
    
    
                        @Override
                        public String apply(@io.reactivex.annotations.NonNull String s) throws
                                Exception {
                            return s.toUpperCase();
                        }
                    })
                    .compose(RxTrace.<String>logObservable("second",RxTrace.LOG_NEXT_DATA))
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(@io.reactivex.annotations.NonNull String s) throws Exception {
                            System.out.println("s="+s);
                        }
                    });
    

    看看这一次数据是怎样流向的,由于显示器不够大,其实截图还少了一点内容:(,但是能够看明白日志的展示。


    第二次做Trace.png

    最后,加上监测onComlete和OnTerminate

    Observable.just("tony","cafei","aaron")
                    .compose(RxTrace.<String>logObservable("first",RxTrace.LOG_SUBSCRIBE|RxTrace.LOG_NEXT_DATA))
                    .map(new Function<String, String>() {
    
    
                        @Override
                        public String apply(@io.reactivex.annotations.NonNull String s) throws
                                Exception {
                            return s.toUpperCase();
                        }
                    })
                    .compose(RxTrace.<String>logObservable("second",RxTrace.LOG_NEXT_DATA))
                    .compose(RxJavaUtils.<String>observableToMain())
                    .compose(RxTrace.<String>logObservable("third",RxTrace.LOG_COMPLETE|RxTrace.LOG_TERMINATE))
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(@io.reactivex.annotations.NonNull String s) throws Exception {
                            System.out.println("s="+s);
                        }
                    });
    

    上面已经展示过的截图就不显示了,就展示最后的onComlete和OnTerminate。


    第三次做Trace.png

    最后,我已经把RxTrace的代码放到https://github.com/fengzhizi715/SAF-Kotlin-log
    为何不单独开一个repository呢?它只有一个类,我就懒得创建了:(

    总结

    compose操作符和Transformer结合使用,一方面让代码看起来更加简洁化,另一方面能够提高代码的复用性。RxJava提倡链式调用,compose能够防止链式被打破。

    相关文章

      网友评论

      • Mr_panmin:大神,买了你的《RxJava2.x实战》这本书,发现书中也有这篇blog的内容,书中也没介绍LifecycleTransformer这个类如何使用,而且也没找到源码中的LifecyclePublisher是从哪儿import的,求大神解惑,谢谢
        Mr_panmin:@fengzhizi715 谢谢大神,我去学习下
        fengzhizi715:hi,源码在这里:https://github.com/fengzhizi715/SAF/tree/master/saf-rxlifecycle

        可以通过 compile 'com.safframework:saf-rxlifecycle:1.1.2' 来引入
      • 王神仙:很好
      • 开发者头条_程序员必装的App:感谢分享!已推荐到《开发者头条》:https://toutiao.io/posts/8nrpka 欢迎点赞支持!
        欢迎订阅《Tony沈哲的独家号》https://toutiao.io/subjects/5688

      本文标题:Transformer 在RxJava中的使用

      本文链接:https://www.haomeiwen.com/subject/ectnhxtx.html