美文网首页iOS
ReactiveX 入门系列(一)

ReactiveX 入门系列(一)

作者: yuGodddddd | 来源:发表于2019-12-19 09:49 被阅读0次

    目的

    • 了解 ReactiveX 是什么?
    • 了解 ReactiveX 优劣势?
    • ReactiveX 中常用的概念?
    • ReactiveX 如何使用?
    • ReactiveX 操作符?
    • ReactiveX 使用场景是什么?

    ReactiveX 简义

    ReactiveX 的历史

    ReactiveX 是 Reactive Extensions 的缩写,一般简写为 Rx,最初是 LINQ 的一个扩展,由微软的架构师 Erik Meijer 领导的团队开发,在2012年11月开源,Rx是一个编程模型,目标是提供一致的编程接口,帮助开发者更方便的处理异步数据流,Rx库支持 .NET、JavaScript 和 C++,Rx 近几年越来越流行了,现在已经支持几乎全部的流行编程语言了,Rx的大部分语言库由 ReactiveX 这个组织负责维护,比较流行的有 RxJava/RxJS/Rx.NET

    下文 ReactiveX 简称 Rx

    Rx 简义

    什么是 Rx(简称:Rx)?

    ReactiveX.io 官网对其自身的介绍:

    An API for asynchronous programming with observable streams

    一个专注于异步编程与控制可观察数据(或者事件)流的API.

    我们可以通俗的理解为:

    1. An API: 它首先是一个编程接口规范,不同的语言提个不同的实现。像 RxJava、RxSwift、RxJs。
    2. For asynchronous programming:在异步编程中使用。比如子线程耗时网络请求。
    3. With observable streams: 基于可观察的事件流。比如观察者模式中的观察者对被观察着的监听。

    其核心设计思想:观察者模式、Iterator 模式、函数式编程

    观察者模式:即定义对象间一种一对多的依赖关系,当一个对象改变状态时,则所有依赖它的对象都会被改变。

    Iterator 模式: 即迭代器模式。

    函数式编程:即提供一系列函数样式的方法供快速开发。

    Rx 工作模式图:http://reactivex.io/

    开胃菜

    再开始了解 Rx 之前,我们先了解一下以下几个基本概念。

    • 什么是响应式编程(简)

    A = B + C

    有这么一段代码A被赋值为BC的值。

    在传统的命令式编程中,如果我们改变B的值,A的值并不会随之改变。

    而如果我们运用一种机制,当B或者C的值发现变化的时候,A的值也随之改变,这样就实现了”响应式“。

    或者说叫数据绑定

    http://wiki.jikexueyuan.com/project/android-weekly/issue-145/introduction-to-RP.html 响应式编程(Reactive Programming)介绍

    • 什么是函数式编程(简单)

    函数式编程思维, 就是 用计算(函数)来表示程序, 用计算(函数)的组合来表达程序的组合的思维方式.

    伪代码

    // 传统
    int abs(int i) {
        return i * 2;
    }
    
    int add_abs(int a, int b) {
        return abs(a) + abs(b)
    }
    
    int result = add_abs(2, 3);
    
    // 函数式
    def abs = {i -> i * 2} // 传入 i 返回 i*2
    def add_abs = {(a, b) -> abs(a)+abs(b)} // 传入a,b ,返回经过 a*2 + b*2 的值
    int a = add_adb(1, 2)
    
    能看到这里就是通过组合函数来达到计算结果的过程
    
    

    http://www.ruanyifeng.com/blog/2012/04/functional_programming.html

    • 函数式相应编程

    结合函数式编程以及响应式编程就得到了函数响应式编程

    例子

    我们如果有一个输入框,输入的文字达到预期时,弹出提示框

    传统的话我们的话会给这个输入框添加一个监听事件,用于监听文字的输入,然后在监听方法中,去做判断是否需要弹出提示框。

    input.setInputListener(inputListener) // 设置输入监听
    
    void inputListener(input) {
        if (!input.text.isEmpty()) {
                if input.text.count >= 100 {
                    alert.show("不能输入更多了")
                } 
        }   
    }
    

    如果我们使用函数响应式编程的话,就可以用一下代码来表示

    input.text
        .isEmpty
        .map {text.count >= 100} // 
        .bind {alert.show("不能输入更多了")}
            
    

    这样就实现了输入与提示框的绑定。

    而 Rx(RxJava) 的原理就是这种模式,在这个模式中有 4 个角色

    角色 作用
    被观察者(Observable) 产生事件
    观察者(Observer) 接收事件,并给出响应动作
    订阅(Subscribe) 连接 被观察者 & 观察者
    事件(Event) 被观察者 & 观察者 沟通的载体

    在Rx 中还有一种特殊的存在 Subject,它既是可监听序列也是观察者。它同时充当了 Observer 和 Observable 的角色。因为它是一个 Observer,它可以订阅一个或多个 Observable ;又因为它是一个 Observable ,它可以转发它收到( Observe )的数据,也可以发射新的数据。

    Rx N部曲

    Rx 的使用步骤

    被观察者 (Observable) 通过 订阅(Subscribe) 按顺序发送事件 给观察者 (Observer)

    观察者(Observer) 按顺序接收事件 & 作出对应的响应动作。

    1. 初始化 Observable
    2. 初始化 Observer
    3. 建立订阅关系
    4. 取消订阅
    // 初始化 Observable
    Observable<Integer> observable = Observable.create(observer -> {
                observer.onNext(1);
                observer.onNext(2);
                observer.onNext(3);
                observer.onComplete();
                        observer.onError
            });
    
    Observer observer = new Observer<Integer>() { // 初始化 Observer
             public void onNext(Integer integer) {
                // 收到消息
             }
    
             public void onError(Error e) {
                // 发生错误
             }
    
             public void onComplete() {
               // 完成订阅
             }
            }
    
    // 建立订阅关系
    observable.subscribe(observer);
    
    // 简化
     Observable
                    .create(new ObservableOnSubscribe<String>() {
                        @Override
                        public void subscribe(ObservableEmitter<String> emitter) throws Exception {
                            emitter.onNext(1);
                            emitter.onComplete();
                        }
                    })
                    .subscribe(new Observer<String>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                        }
    
                        @Override
                        public void onNext(String s) {
                            System.out.println(s);
                        }
    
                        @Override
                        public void onError(Throwable e) {
                            e.printStackTrace();
                        }
    
                        @Override
                        public void onComplete() {
                            System.out.println("接受完成");
                        }
                    });
    
    // 再次使用强大的 Rx 操作符进行再次简化
    Observable.just(1)
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(@NonNull String s) throws Exception {
                            System.out.println(s);
                        }
                    });
    
    ?>
    // 其实还可以更简单
    Observable.just(1)
                .subScribe(System.out::println)
      
      
    

    onNext:用来发送数据,可多次调用,每调用一次发送一条数据
    onError:用来发送异常通知,只发送一次,若多次调用只发送第一条
    onComplete:用来发送完成通知,只发送一次,若多次调用只发送第一条

    Consumer

    Consumer 可以看做是对观察者Observer功能单一化之后的产物, 其函数accept只接收可观察对象发射的数据,不接收异常信息或完成信息。

    如果想接收异常或完成信息需要使用

     Observable.just("Hello World")
                    .subscribe(new Consumer<String>() {
                        @Override
                        public void accept(@NonNull String s) throws Exception {
                            System.out.println(s);
                        }
                    }, new Consumer<Throwable>() {
                        @Override
                        public void accept(@NonNull Throwable throwable) throws Exception {
                            throwable.printStackTrace();
                        }
                    }, new Action() {
                        @Override
                        public void run() throws Exception {
                            System.out.println("接收完成");
                        }
                    });
    

    第二个参数Consumer规定泛型<Throwable>通过函数accept接收异常信息。
    第三个参数Action也是对观察者Observer功能单一化之后的产物--行动,通过函数run接收完成信息,作出响应行动。

    Disposable

    public void onSubscribe(Disposable d) {
    }
    

    在观察者 Observer 与可观察对象Observable ,建立订阅关系后,回调这个方法,并且传过来一个 Disposable 类型的参数,可通过 Disposable 来控制 Observer 与 Observable 之间的订阅。
    无论观察者 Observer 以何种方式订阅可观察对象 Observable,都会生成一个 Disposable。

    Operator - 操作符

    其实质是函数式编程中的高阶函数,是对响应式编程的各个过程拆分封装后的产物。以便于我们操作数据流。

    常用操作符分类

    创建:创建一个可观察对象 Observable 并发射数据
    过滤:从 Observable 发射的数据中取出特定的值
    变换:对 Observable 发射的数据执行变换操作
    组合:组合多个 Observable ,例如:{1,2,3} + {4,5,6} --> {1,2,3,4,5,6}
    聚合:聚合多个 Observable ,例如:{1,2,3} + {4,5,6} --> {[1,4],[2,5],[3,6]}

    常用操作符

    创建
    • create

    • just

      just操作符可用来发送单条数据,数字,字符串,数组,对象,集合都可以当做单条数据发送。

      Observable.just("hello world");//发送一个字符串"hello world"
      Observable.just(1,2,3,4);//逐一发送1,2,3,4这四个整数
      
    • fromArray

      创建一个Observable,接受一个数组,并将数组中的数据逐一发送

    过滤操作
    • filter

      filter使用 Predicate 函数接口传入条件值,来判断Observable发射的每一个值是否满足这个条件,如果满足,则继续向下传递,如果不满足,则过滤掉。

    • distinct

      过滤掉重复的数据项,过滤规则为:只允许还没有发射过的数据项通过。

    变换
    • map

      对Observable发射的每一项数据应用一个函数,执行变换操作

    • flatMap

      将一个发射数据的 Observable 变换为多个 Observables ,然后将它们发射的数据合并后放进一个单独的 Observable

    组合
    • merge

    使用Merge操作符你可以将多个 Observables 的输出合并,就好像它们是一个单个的 Observable 一样。

    Merge可能会让合并的 Observables 发射的数据交错(有一个类似的操作符Concat不会让数据交错,它会按顺序一个接着一个发射多个 Observables 的发射物)。

    聚合
    • zip

    通过一个函数将多个Observables的发射物结合到一起,基于这个函数的结果为每个结合体发射单个数据项。

    Schedulers - 调度器

    如果你想给 Observable 操作符链添加多线程功能,你可以指定操作符(或者特定的 Observable )在特定的调度器( Scheduler / 线程 )上执行。

    当然在每个语言的实现或者平台下都有属于自己特殊的调度器,比如 Android 的AndroidSchedulers.mainThread(),Swift 上的 MainScheduler等。

    subscribeOn

    指定一个观察者在哪个调度器上观察这个 Observable

    若多次设定,则只有一次起作用。

    observeOn

    指定 Observable 自身在哪个调度器上执行

    若多次设定,每次均起作用。

    Rx 常用的场景?

    它有什么使用场景?

    场景 1:单请求异步处理

    在显示场景中 ui线程(主线程)不能做耗时操作,比如网络请求、大文件读取等。

    伪代码:

    // Api 封装类
    public class API {
            // 获取 token 的实现
            Observable<String> token(username, passowrd) {
                // 返回一个 Observable
                return Observable.create({ observer
                        request.get(xxx)
                                .callback({
                                    void success(Response response) {
                                      String token = ....// 解析返回值,获得 token
                                      observer.onNext(token)
                        observer.onComplete(token)  
                              }
                        
                                  void error(error error) {
                                            observer.onError(error)         
                                    }
                            })
                        
                })
            }
    }
    
    // 
    API.token()
      
      .subscribe{
      
        void onNext()
       
        void onError()
          
        void onComplete()
    }
    
    
    

    Swift:

    enum API {
        /// 通过用户名密码取得一个 token
        static func token(username: String, password: String) -> Observable<String> { ... }
    }
    
    
    RxMoyaProvider<API>()
        .request(.token("song", "123"))
        .subscribe{ event in {
        switch event  {
          case .Next(let response):
            // 获得请求结果
          case .Error(let error):
            // 发生错误
          case .Complete():
                // 完成请求
        }
      }}
    

    Java:

    public interface Api {
        
         @GET("getToken")
       Observable<ResponseBody> token(@Query("username") String username,
                                      @Query("password") String passowrd);
    }
    
    retrofit.create(Api.class)
        .token("song", "123")
        .subscribenOn(Schedulers.io()) ///在IO线程进行网络请求
        .observeOn(AndroidSchedulers.mainThread()) //回到主线程去处理请求结果
        .subscribe(new Observer<ResponseBody>() {
                        @Override
                        public void onSubscribe(Disposable d) {
                                // 开启订阅
                        }
    
                        @Override
                        public void onNext(ResponseBody responseBody) {
                                                    // 获得请求结果
                        }
    
                        @Override
                        public void onError(Throwable e) {
                                                    // 发生错误
                        }
    
                        @Override
                        public void onComplete() {
                                                    // 完成订阅
                        }
                    })
    

    场景 2:多异步请求连续调用

    比如先通过用户名密码取得 Token, 然后通过 Token 取得用户信息。

    传统方式实现:

    /// 用回调的方式封装接口
    enum API {
        /// 通过用户名密码取得一个 token
        static func token(username: String, password: String,
            success: (String) -> Void,
            failure: (Error) -> Void) { ... }
    
        /// 通过 token 取得用户信息
        static func userinfo(token: String,
            success: (UserInfo) -> Void,
            failure: (Error) -> Void) { ... }
    }
    
    /// 通过用户名和密码获取用户信息
    API.token(username: "123", password: "123",
        success: { token in
            API.userInfo(token: token,
                success: { userInfo in
                    print("获取用户信息成功: \(userInfo)")
                },
                failure: { error in
                    print("获取用户信息失败: \(error)")
            })
        },
        failure: { error in
            print("获取用户信息失败: \(error)")
    })
    

    Rx 实现:

    /// 用 Rx 封装接口
    enum API {
        /// 通过用户名密码取得一个 token
        static func token(username: String, password: String) -> Observable<String> { ... }
        /// 通过 token 取得用户信息
        static func userInfo(token: String) -> Observable<UserInfo> { ... }
    }
    
    API.token(username: "song", password: "123")
        .flatMapLatest(API.userInfo) // 将 Observable 的元素转换成其他的 Observable(将当前请求转换为另一请求)
        .subscribe(onNext: { userInfo in
            print("获取用户信息成功: \(userInfo)")
        }, onError: { error in
            print("获取用户信息失败: \(error)")
        })
        .disposed(by: disposeBag)
    

    当然这样可能感觉不到,但是当你当前的业务 需要有关系的连续请求 4 个、5个的时候,你就会发现这样可以避免回调地狱,从而使得代码易读,易维护。

    场景 3:多异步请求合并处理

    有时候在项目中,我们会碰到组合多个请求的结果后,再更新UI的情况,比如同时取得当前商品信息和评论

    Swift

    /// 用 Rx 封装接口
    enum API {
    
        /// 取得老师的详细信息
        static func info(id: Int) -> Observable<Teacher> { ... }
    
        /// 取得老师的评论
        static func comments(id: Int) -> Observable<[Comment]> { ... }
    }
    
    
    Observable.zip(
          API.info(id: 1),
          API.comments(id: 1)
        ).subscribe(onNext: { (teacher, comments) in
            print("获得当前商品信息: \(teacher)")
            print("获得当前商品评论: \(comments.count) 条")
        }, onError: { error in
            print("获取商品信息或评论失败: \(error)")
        })
        .disposed(by: disposeBag)
    
    

    java:

    MyService myService = retrofit.create(MyService.class);
    Observable getInfo = myService.getInfo();
    Observable getComments = myService.getComments();
    Observable.zip(getInfo,getComments)
                .subscribeOn(Schedulers.io())
                .observeOn(AndroidSchedulers.mainThread())
                .subscribe(this::updateUI, this::showError);
    

    题外话

    Swift: Combine (官方)

    Combine 可以使代码更加简洁、易于维护,也免除了饱受诟病的嵌套闭包和回调地狱。Combine 是 Reactive Programming 在 Swift 中的一个实现,更确切的说是对 ReactiveX (Reactive Extensions, 简称 Rx) 的实现,而这个实现正是基于观察者模式的。

    https://icodesign.me/posts/swift-combine/

    Kotlin: 协程(线程切换框架)
    https://space.bilibili.com/27559447?from=search&seid=9369351450622626138

    https://juejin.im/post/5a0ab91451882533d0229556

    参考文档:

    ReactiveX/RxJava: https://mcxiaoke.gitbooks.io/rxdocs/content/Intro.html
    RxSwift: https://beeth0ven.github.io/RxSwift-Chinese-Documentation/

    相关文章

      网友评论

        本文标题:ReactiveX 入门系列(一)

        本文链接:https://www.haomeiwen.com/subject/qsxmnctx.html