美文网首页
RxSwift (三)Observable的创建,订阅,销毁

RxSwift (三)Observable的创建,订阅,销毁

作者: 孔雨露 | 来源:发表于2019-08-10 00:08 被阅读0次

    @TOC

    可观察的序列Observable

    通过前面博客对Rxswift的源码分析,我们知道在Rxswift中一条主线思想就是万物皆序列,这里的序列就是我们的可观察序列,也可以称之为观察者。所以使用Rxswift总会跟观察者Observer打交道,这里重温一下观察者的定义。

    本篇博客主要讲解Observer的创建,订阅,销毁的用法,不深入讲解它的底层实现,不讲解它的源码,如要了解源码实现过程分析请参考我之前的博客:序列的核心逻辑

    Observable定义

    观察者(Observable)的作用就是监听事件,然后对这个事件做出响应。或者说任何响应事件的行为都是观察者。比如:

    • 当我们点击按钮,弹出一个提示框。那么这个“弹出一个提示框”就是观察者Observer<Void>
    • 当我们请求一个远程的json 数据后,将其打印出来。那么这个“打印 json 数据”就是观察者 Observer<JSON>
    image

    Observable的创建,订阅,销毁

    Observable创建

    在 subscribe 方法中创建Observable

    1. 创建观察者最直接的方法就是在 Observablesubscribe 方法后面描述当事件发生时,需要如何做出响应。
    2. 比如下面的样例,观察者就是由后面的 onNextonErroronCompleted 这些闭包构建出来的。
    3. 实例1

    代码:

    let observable = Observable.of("A", "B", "C")
              
    observable.subscribe(onNext: { element in
        print(element)
    }, onError: { error in
        print(error)
    }, onCompleted: {
        print("completed")
    })
    

    运行结果:

    A
    B
    C
    completed
    

    在 bind 方法中创建Observable

    1. 实例2
      下面代码我们创建一个定时生成索引数的 Observable 序列,并将索引数不断显示在 label 标签上

    代码:

    import UIKit
    import RxSwift
    import RxCocoa
     
    class ViewController: UIViewController {
         
        @IBOutlet weak var label: UILabel!
         
        let disposeBag = DisposeBag()
         
        override func viewDidLoad() {
             
            //Observable序列(每隔1秒钟发出一个索引数)
            let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
     
            observable
                .map { "当前索引数:\($0 )"}
                .bind { [weak self](text) in
                    //收到发出的索引数后显示到label上
                    self?.label.text = text
                }
                .disposed(by: disposeBag)
        }
    }
    

    使用 AnyObserver 创建Observable

    AnyObserver 可以用来描叙任意一种观察者。

    1. AnyObserver配合 subscribe 方法使用
      比如上面实例1我们可以改成如下代码:
    //观察者
    let observer: AnyObserver<String> = AnyObserver { (event) in
        switch event {
        case .next(let data):
            print(data)
        case .error(let error):
            print(error)
        case .completed:
            print("completed")
        }
    }
     
    let observable = Observable.of("A", "B", "C")
    observable.subscribe(observer)
    
    1. AnyObserver配合bindTo 方法使用
      也可配合 Observable 的数据绑定方法(bindTo)使用。比如上面的实例2可以改成如下代码:
    import UIKit
    import RxSwift
    import RxCocoa
     
    class ViewController: UIViewController {
         
        @IBOutlet weak var label: UILabel!
        let disposeBag = DisposeBag()
        override func viewDidLoad() {
             
            //观察者
            let observer: AnyObserver<String> = AnyObserver { [weak self] (event) in
                switch event {
                case .next(let text):
                    //收到发出的索引数后显示到label上
                    self?.label.text = text
                default:
                    break
                }
            }
             
            //Observable序列(每隔1秒钟发出一个索引数)
            let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
            observable
                .map { "当前索引数:\($0 )"}
                .bind(to: observer)
                .disposed(by: disposeBag)
        }
    }
    

    使用 Binder 创建Observable

    1. 相较于AnyObserver 的大而全,Binder 更专注于特定的场景。Binder 主要有以下两个特征:
    • 不会处理错误事件
    • 确保绑定都是在给定 Scheduler 上执行(默认 MainScheduler)

    一旦产生错误事件,在调试环境下将执行 fatalError,在发布环境下将打印错误信息。

    针对上面的实例2 中的label 标签的文字显示就是一个典型的 UI 观察者。它在响应事件时,只会处理 next 事件,而且更新 UI 的操作需要在主线程上执行。那么这种情况下更好的方案就是使用 Binder改用 Binder 会简单许多。

    使用Binder改造后的代码如下:

    import UIKit
    import RxSwift
    import RxCocoa
     
    class ViewController: UIViewController {
         
        @IBOutlet weak var label: UILabel!
        let disposeBag = DisposeBag()
      
        override func viewDidLoad() {      
            //观察者
            let observer: Binder<String> = Binder(label) { (view, text) in
                //收到发出的索引数后显示到label上
                view.text = text
            }
            //Observable序列(每隔1秒钟发出一个索引数)
            let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
            observable
                .map { "当前索引数:\($0 )"}
                .bind(to: observer)
                .disposed(by: disposeBag)
        }
    }
    

    使用工厂方法创建Observable

    在Observable的扩展类里提供了一系列创建被观察对象的工厂化方法。主要有下面这些:

    • never() :创建一个Never序列,该序列不会发出任何事件,也不会终止。
    • empty(): 创建一个Empty序列,该序列只发出completed事件。
    • error(): 创建一个错误的序列,该序列以’error’事件终止。即创建一个不会发送任何条目并且立即终止错误的Observerable序列
    • just(): 创建一个Just序列,该序列只包含一个元素。
    • of(): 创建一个新的被观察序列的对象,它包含可变数量的元素。
    • from(): 通过数组来创建一个被观察序列。从一个序列(如Array/Dictionary/Set)中创建一个Observer
    • range():在指定范围内生成一个被观察的整数序列,发出事件n次。即创建一个Observable序列,它会发出一系列连续的整数,然后终止。
    • repeatElement(): 生成一个被观察的序列,重复发出指定元素n次。
    • generate(): 创建一个被观察的序列,只要提供的条件为真,就发出状态值。
    • deferred(): 为每个订阅事件的观察者都创建一个新的被观察的序列。(一对一的关系)
    • doOn(): 在订阅的被观察者的事件执行之前,先执行do后面和要执行的订阅事件对应的方法。
    • create(): 通过指定的方法实现来自定义一个被观察的序列。
    • timer(): 获取计时器Observable序列
    • interval(): 底层就是封装timer

    针对上面的工厂方法,下面来举例说明

    never创建序列

    never()方法创建一个永远不会发出 Event(也不会终止)的 Observable 序列。
    实例4
    代码:

    let disposeBag = DisposeBag()
    let neverSequence = Observable<String>.never()
    _ = neverSequence.subscribe {_ in
        print("This will never be printed")
    }.addDisposableTo(disposeBag)
    

    结果:

    不会输出任何结果。
    
    empty创建序列

    empty()方法创建一个空内容的 Observable 序列。
    实例5
    代码:

    let disposeBag = DisposeBag()
        Observable<Int>.empty().subscribe {
            event in
            print(event)
        }.addDisposableTo(disposeBag)
    

    结果:

    completed
    
    error创建序列

    error()方法创建一个不做任何操作,而是直接发送一个错误的Observable 序列。
    实例6
    代码:

    let disposeBag = DisposeBag()
        Observable<Int>.error(TestError.test)
            .subscribe { print($0) }
            .addDisposableTo(disposeBag)
    

    结果:

    error(test)
    
    just创建序列

    just()方法通过传入一个默认值来初始化。

    实例7
    代码:

    let disposeBag = DisposeBag()
    Observable.just("1").subscribe { event in
                print(event)
            }.addDisposableTo(disposeBag)
    

    结果:

    next(1)
    completed
    
    of创建序列

    of()方法可以接受可变数量的参数(必需要是同类型的)
    实例8
    代码:

    let disposeBag = DisposeBag()
    Observable.of("1", "2", "3", "4").subscribe(onNext: { element in
         print(element)
    }).addDisposableTo(disposeBag)
    

    结果:

    1
    2
    3
    4
    
    from创建序列

    from()方法需要一个数组参数。

    实例9
    代码:

    let disposeBag = DisposeBag()
    Observable.from(["1", "2", "3", "4"]).subscribe(onNext: { print($0) }).addDisposableTo(disposeBag)
    

    结果:

    1
    2
    3
    4
    
    range创建序列

    range()方法通过指定起始和结束数值,创建一个以这个范围内所有值作为初始值的Observable序列。
    实例10
    代码:

    let disposeBag = DisposeBag()
    Observable.range(start: 1, count: 10).subscribe { print($0) }.addDisposableTo(disposeBag)
    

    结果:

    next(1)
    next(2)
    next(3)
    next(4)
    next(5)
    next(6)
    next(7)
    next(8)
    next(9)
    next(10)
    completed
    
    repeatElement创建序列

    repeatElement()方法创建一个可以无限发出给定元素的 Event的 Observable 序列(永不终止)。
    实例11
    代码:

    let disposeBag = DisposeBag()
    Observable.repeatElement("1")
    .take(3)
    .subscribe(onNext: { print($0) })
    .addDisposableTo(disposeBag)
    

    结果:

    1
    1
    1
    
    generate创建序列

    generate()方法创建一个只有当提供的所有的判断条件都为 true 的时候,才会给出动作的 Observable 序列。
    实例12
    代码:

    let disposeBag = DisposeBag()
    Observable.generate(
               initialState: 0,
               condition: { $0 < 3 },
               iterate: { $0 + 1 }
            )
            .subscribe(onNext: { print($0) })
            .addDisposableTo(disposeBag)
    

    结果:

    0
    1
    2
    
    deferred创建序列

    实例13
    代码:

    let disposeBag = DisposeBag()
        var count = 1
        let deferredSequence = Observable<String>.deferred {
            print("Creating \(count)")
            count += 1
    
            return Observable.create { observer in
                print("Emitting...")
                observer.onNext("1")
                observer.onNext("2")
                observer.onNext("3")
                return Disposables.create()
            }
        }
    
        deferredSequence
            .subscribe(onNext: { print($0) })
            .addDisposableTo(disposeBag)
    
        deferredSequence
            .subscribe(onNext: { print($0) })
            .addDisposableTo(disposeBag)
    

    结果:

    Creating 1
    Emitting...
    1
    2
    3
    Creating 2
    Emitting...
    1
    2
    3
    
    deferred创建序列

    deferred()方法相当于是创建一个 Observable 工厂,通过传入一个 block 来执行延迟 Observable序列创建的行为,而这个 block 里就是真正的实例化序列对象的地方。

    实例14
    代码:

    let disposeBag = DisposeBag()
        var count = 1
        let deferredSequence = Observable<String>.deferred {
            print("Creating \(count)")
            count += 1
    
            return Observable.create { observer in
                print("Emitting...")
                observer.onNext("1")
                observer.onNext("2")
                observer.onNext("3")
                return Disposables.create()
            }
        }
    
        deferredSequence
            .subscribe(onNext: { print($0) })
            .addDisposableTo(disposeBag)
    
        deferredSequence
            .subscribe(onNext: { print($0) })
            .addDisposableTo(disposeBag)
    

    结果:

    Creating 1
    Emitting...
    1
    2
    3
    Creating 2
    Emitting...
    1
    2
    3
    
    doOn创建序列

    实例15
    代码:

    let disposeBag = DisposeBag()
    Observable.of("1", "2", "3", "4")
              .do(onNext: { print("Intercepted:", $0) }, 
                  onError { print("Intercepted error:", $0) },
                  onCompleted: { print("Completed")  })
              .subscribe(onNext { print($0) },
                         onCompleted: { print("结束") })
              .addDisposableTo(disposeBag)
    

    结果:

    Intercepted: 1
    1
    Intercepted: 2
    2
    Intercepted: 3
    3
    Intercepted: 4
    4
    Completed
    结束
    
    create创建序列

    create()方法接受一个 block 形式的参数,任务是对每一个过来的订阅进行处理。
    代码:
    实例16

    let disposeBag = DisposeBag()
    let myJust = { (element: String) -> Observable<String> in
        return Observable.create { observer in  
        observer.on(.next(element))
        observer.on(.completed)
        return Disposables.create()
        }
    }
    myJust("1").subscribe { print($0) }.addDisposableTo(disposeBag)
    

    结果:

    next(1)
    completed
    
    timer创建序列

    timer()方法有两种用法,一种是创建的 Observable序列在经过设定的一段时间后,产生唯一的一个元素。另一种是创建的 Observable 序列在经过设定的一段时间后,每隔一段时间产生一个元素。

    实例17

    方式一:
    //5秒种后发出唯一的一个元素0
    let observable = Observable<Int>.timer(5, scheduler: MainScheduler.instance)
    observable.subscribe { event in
        print(event)
    }
    
    方式二:
    //延时5秒种后,每隔1秒钟发出一个元素
    let observable = Observable<Int>.timer(5, period: 1, scheduler: MainScheduler.instance)
    observable.subscribe { event in
        print(event)
    }
    
    interval创建序列

    interval()方法创建的 Observable 序列每隔一段设定的时间,会发出一个索引数的元素。而且它会一直发送下去。

    实例18

    let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
    observable.subscribe { event in
        print(event)
    }
    

    Observer订阅

    有了 Observable,我们还要使用 subscribe() 方法来订阅它,接收它发出的 Event。

    订阅Observable有两种方式:

    1. 我们使用 subscribe() 订阅了一个Observable 对象,该方法的 block 的回调参数就是被发出的 event 事件,如果想要获取到这个事件里的数据,可以通过 event.element 得到。
      例如:
    let observable = Observable.of("A", "B", "C")
    observable.subscribe { event in
        print(event)
        //print(event.element)
    }
    

    结果:
    next(A)
    next(B)
    next(C)
    completed

    1. RxSwift 还提供了另一个 subscribe方法,它可以把 event 进行分类,通过不同的 block 回调处理不同类型的 event.同时会把 event 携带的数据直接解包出来作为参数,方便我们使用。subscribe() 方法的 onNextonErroronCompletedonDisposed 这四个回调 block 参数都是有默认值的,即它们都是可选的。所以我们也可以只处理 onNext而不管其他的情况。

    实例19
    代码如下:

    let observable = Observable.of("A", "B", "C")
             
    observable.subscribe(onNext: { element in
        print(element)
    }, onError: { error in
        print(error)
    }, onCompleted: {
        print("completed")
    }, onDisposed: {
        print("disposed")
    })
    

    Observable销毁

    一个 Observable 序列被创建出来后它不会马上就开始被激活从而发出 Event,而是要等到它被某个人订阅了才会激活它。而Observable 序列激活之后要一直等到它发出了.error或者 .completed的 event 后,它才被终结。dispose()就是用来销毁Observable。

    • dispose()
      (1)使用该方法我们可以手动取消一个订阅行为。
      (2)如果我们觉得这个订阅结束了不再需要了,就可以调用 dispose()方法把这个订阅给销毁掉,防止内存泄漏。
      (3)当一个订阅行为被dispose 了,那么之后 observable 如果再发出 event,这个已经 dispose 的订阅就收不到消息了。

    实例20
    代码:

    let observable = Observable.of("A", "B", "C")
             
    //使用subscription常量存储这个订阅方法
    let subscription = observable.subscribe { event in
        print(event)
    }
             
    //调用这个订阅的dispose()方法
    subscription.dispose()
    
    • DisposeBag
      除了 dispose()方法之外,我们更经常用到的是一个叫 DisposeBag 的对象来管理多个订阅行为的销毁:
    1. 我们可以把一个 DisposeBag对象看成一个垃圾袋,把用过的订阅行为都放进去。
    2. 而这个DisposeBag 就会在自己快要dealloc 的时候,对它里面的所有订阅行为都调用 dispose()方法。

    实例21

    代码:

    let disposeBag = DisposeBag()
             
    //第1个Observable,及其订阅
    let observable1 = Observable.of("A", "B", "C")
    observable1.subscribe { event in
        print(event)
    }.disposed(by: disposeBag)
     
    //第2个Observable,及其订阅
    let observable2 = Observable.of(1, 2, 3)
    observable2.subscribe { event in
        print(event)
    }.disposed(by: disposeBag)
    

    相关文章

      网友评论

          本文标题:RxSwift (三)Observable的创建,订阅,销毁

          本文链接:https://www.haomeiwen.com/subject/kyqljctx.html