美文网首页
RxJava(六、变换)

RxJava(六、变换)

作者: 敲代码的本愿 | 来源:发表于2016-10-12 20:49 被阅读91次

    目录

    一、Is what 是什么
    二、Concept 概念
    三、Basic realization 基本实现
    四、Scheduler 线程控制(上)
    五、Scheduler 线程控制(下)
    六、变换

    因个人学习需要,故文章内容均为网上摘抄整理,感谢创作者的辛勤,源文章地址请看文末。

    变换

    将事件序列中的对象或整个序列进行加工处理,转换成不同的事件或事件序列。

    RxJava核心功能之一:提供了对事件序列进行变换的支持。
    

    分类

    变换有多种,即可以针对事件对象,也可以针对整个事件队列。

    基类
    • lift(Operator):对事件项和事件序列变换

    • compose(Transformer):对 Observable 自身变换

    拓展类######
    • map():事件对象的直接变换,变换中常用的一种。
      map() 示意图
    • flatMap():很有用,但很难理解的变换,以下详解。

    相同点:把传入的参数转化后返回另一个对象。

    不同点:flatMap() 中返回的是个 Observable 对象,该对象并不是被直接发送到 Subscriber 的回调方法中。

    API

    //示例1 map()
    Observable.just("images/logo.png") // 输入类型 String
        .map(new Func1<String, Bitmap>() {
            @Override
            public Bitmap call(String filePath) { // 参数类型 String
                return getBitmapFromPath(filePath); // 返回类型 Bitmap
            }
        })
        .subscribe(new Action1<Bitmap>() {
            @Override
            public void call(Bitmap bitmap) { // 参数类型 Bitmap
                showBitmap(bitmap);
            }
        });
    

    其中,Func1 类是RxJava的一个接口,用于包装含有一个参数的方法。

    FuncX 和 ActionX 相似,拥有多个,用于不同参数个数的方法。
    区别:FuncX包装的是有返回值的方法
    

    map()

    map() 方法将参数中的 String 对象转换成 Bitmap 对象后返回,经过 map() 方法后,事件的参数类型由 String 转为 Bitmap

    flatMap()

    原理
    1. 使用传入的事件对象创建一个 Observable 对象;
    2. 并不发送这个 Observable ,而是将它激活,于是它开始发送事件;
    3. 每个创建出来的 Observable 发送的事件,都被汇入同一个 Observable ,而这个 Observable 负责将这些事件同意交给 Subscriber 的回调方法。

    以上三个步骤,把事件拆成两级,通过一组新建的 Observable 将初始的对象『铺平』之后通过统一路径分发下去,而这个『铺平』就是 flatMap() 所谓的flat

    flatMap() 示意图
    示例

    现有一个数据结构(学生)

    //1. 打印出一组学生的名字
    Student[] students = ...;
    Subscriber<String> subscriber = new Subscriber<String>() {
        @Override
        public void onNext(String name) {
            Log.d(tag, name);
        }
        ...
    };
    Observable.from(students)
        .map(new Func1<Student, String>() {
            @Override
            public String call(Student student) {
                return student.getName();
            }
        })
        .subscribe(subscriber);
    
    //2. 打印出每个学生所需要修的所有课程的名称
    //需求区别:每个学生只有一个名字,但却有多个课程
    //第一种实现
    Student[] students = ...;
    Subscriber<Student> subscriber = new Subscriber<Student>() {
        @Override
        public void onNext(Student student) {
            List<Course> courses = student.getCourses();
            for (int i = 0; i < courses.size(); i++) {
                Course course = courses.get(i);
                Log.d(tag, course.getName());
            }
        }
        ...
    };
    Observable.from(students)
        .subscribe(subscriber);
    
    //第二种实现
    Student[] students = ...;
    Subscriber<Course> subscriber = new Subscriber<Course>() {
        @Override
        public void onNext(Course course) {
            Log.d(tag, course.getName());
        }
        ...
    };
    Observable.from(students)
        .flatMap(new Func1<Student, Observable<Course>>() {
            @Override
            public Observable<Course> call(Student student) {
                return Observable.from(student.getCourses());
            }
        })
        .subscribe(subscriber);
    
    扩展

    可以在嵌套的 Observable 中添加异步代码,flatMap() 也常用于嵌套的异步操作。

    例:嵌套的网络请求。代码(Retrofit + RxJava):

    networkClient.token() // 返回 Observable<String>,在订阅时请求 token,并在响应后发送 token
    .flatMap(new Func1<String, Observable<Messages>>() {
        @Override
        public Observable<Messages> call(String token) {
            // 返回 Observable<Messages>,在订阅时请求消息列表,并在响应后发送请求到的消息列表
            return networkClient.messages();
        }
    })
    .subscribe(new Action1<Messages>() {
        @Override
        public void call(Messages messages) {
            // 处理显示消息列表
            showMessages(messages);
        }
    });
    

    传统的嵌套请求需要使用嵌套的 Callback 来实现。通过 flatMap() ,把嵌套的请求写在一条链中,从而保持程序逻辑的清晰。

    throttleFirst(): 在每次事件触发后的一定时间间隔内丢弃新的事件。常用作去抖动过滤。
    
    //按钮的点击监听器
    RxView.clickEvents(button) // RxBinding 代码,后面的文章有解释 
    .throttleFirst(500, TimeUnit.MILLISECONDS) // 设置防抖间隔为 500ms 
    .subscribe(subscriber); 妈妈再也不怕我的用户手抖点开两个重复的界面啦。
    

    变换的原理: lift()

    各种变换虽然功能不同,但实质上都是针对事件序列的处理和再发送。而在RxJava的内部,它们是基于同一个基础变换方法:lift(Operator)

    lift()的内部实现(仅核心代码):

    // 注意:这不是 lift() 的源码,而是将源码中与性能、兼容性、扩展性有关的代码剔除后的核心代码。
    // 如果需要看源码,可以去 RxJava 的 GitHub 仓库下载。
    public <R> Observable<R> lift(Operator<? extends R, ? super T> operator) {
        return Observable.create(new OnSubscribe<R>() {
            @Override
            public void call(Subscriber subscriber) {
                Subscriber newSubscriber = operator.call(subscriber);
                newSubscriber.onStart();
                onSubscribe.call(newSubscriber);
            }
        });
    }
    

    其中,生成了一个新的Observable并返回,而且创建新Observale所用的参数OnSubscribe的回调方法call()中的实现和Observable.subscribe()类似,但是并不一样,关键在于第二行onSubscribe.call(subscriber)中的onSubscribe所指代的对象不同:

    • subscribe()中这句话的onSubscribe指的是Observable中的onSubscribe对象,这个没有问题,但是list()之后的情况就复杂了。
    • 当含有list()时:
    1. lift()创建了一个Observable后,加上之前原始的Obervable,已经有两个Observable了;
    2. 同样的,新Observable里的新OnSubscribe加上原始Observable中的原始OnSubscribe,也就有了两个OnSubscribe;
    3. 当用户调用经过lift()后的Observablesubscribe()的时候,使用的是lift()所返回的新的Observable,于是它所触发的 onSubscribe.call(subscriber),也是用的新Observable中的新OnSubscribe,即在lift()中生成的那个OnSubscribe
    4. 而这个新OnSubscribecall()方法中的onSubscribe,就是指的原始Observable中的原始OnSubscribe,在这个call()方法里,新OnSubscribe利用operator.call(subscriber)生成了一个新的Subscriber(Operator 就是在这里,通过自己的 call() 方法将新 Subscriber 和原始 Subscriber 进行关联,并插入自己的『变换』代码以实现变换),然后利用这个新Subscriber向原始 Observable进行订阅。

    这样实现了lift()过程,像一种代理机制,通过事件拦截和处理实现事件序列的变换

    简而言之:在 Observable 执行了 lift(Operator) 方法之后,会返回一个新的 Observable,这个新的 Observable 会像一个代理一样,负责接收原始的 Observable 发出的事件,并在处理后发送给 Subscriber

    示意图:


    流程图 (静态) 流程图 (动态) 两次和多次的 lift()
    示例
    //具体的 Operator 的实现
    //将事件中的 Integer 对象转换成 String
    observable.lift(new Observable.Operator<String, Integer>() {
        @Override
        public Subscriber<? super Integer> call(final Subscriber<? super String> subscriber) {
            // 将事件序列中的 Integer 对象转换为 String 对象
            return new Subscriber<Integer>() {
                @Override
                public void onNext(Integer integer) {
                    subscriber.onNext("" + integer);
                }
    
                @Override
                public void onCompleted() {
                    subscriber.onCompleted();
                }
    
                @Override
                public void onError(Throwable e) {
                    subscriber.onError(e);
                }
            };
        }
    });
    

    注意:讲述 lift() 的原理是为了更好地了解 RxJava ,从而更好地使用它。然而不管是否理解了 lift() 的原理,RxJava 都不建议开发者自定义 Operator 来直接使用 lift(),而是建议尽量使用已有的 lift() 包装方法(如 map()flatMap() 等)进行组合来实现需求,因为直接使用 lift() 非常容易发生一些难以发现的错误。

    compose:对Observable整体的变换

    //有多个 Observable ,都需要应用一组相同的 lift() 变换
    //第一种方法
    observable1
        .lift1()
        .lift2()
        .lift3()
        .lift4()
        .subscribe(subscriber1);
    observable2
        .lift1()
        .lift2()
        .lift3()
        .lift4()
        .subscribe(subscriber2);
    observable3
        .lift1()
        .lift2()
        .lift3()
        .lift4()
        .subscribe(subscriber3);
    observable4
        .lift1()
        .lift2()
        .lift3()
        .lift4()
        .subscribe(subscriber1);
    
    //第二种方法
    private Observable liftAll(Observable observable) {
        return observable
            .lift1()
            .lift2()
            .lift3()
            .lift4();
    }
    ...
    liftAll(observable1).subscribe(subscriber1);
    liftAll(observable2).subscribe(subscriber2);
    liftAll(observable3).subscribe(subscriber3);
    liftAll(observable4).subscribe(subscriber4);
    

    第二种方法可读性、可维护性提高了。可是被方法包起来,这种方式对于 Observale 的灵活性似乎增添了那么点限制。

    //使用 compose() 解决
    //第三种方法
    public class LiftAllTransformer implements Observable.Transformer<Integer, String> {
        @Override
        public Observable<String> call(Observable<Integer> observable) {
            return observable
                .lift1()
                .lift2()
                .lift3()
                .lift4();
        }
    }
    ...
    Transformer liftAll = new LiftAllTransformer();
    observable1.compose(liftAll).subscribe(subscriber1);
    observable2.compose(liftAll).subscribe(subscriber2);
    observable3.compose(liftAll).subscribe(subscriber3);
    observable4.compose(liftAll).subscribe(subscriber4);
    

    第三种方法,Observable 可以利用传入的 Transformer 对象的 call 方法直接对自身进行处理,也就不必包在方法里。

    参考
    给 Android 开发者的 RxJava 详解

    相关文章

      网友评论

          本文标题:RxJava(六、变换)

          本文链接:https://www.haomeiwen.com/subject/qxniyttx.html