美文网首页
RxSwift #03 | Subjects

RxSwift #03 | Subjects

作者: JeremyTechBlog | 来源:发表于2021-12-05 23:30 被阅读0次

    Observable 是 RxSwift 的基础,但它们本质上是只读(read-only)的。你只能通过订阅 observable,来获得它们产生的新事件的通知。

    举个栗子:

    Observable.create { observer in
        cache.calculateDiskStorageSize { result in
            switch result {
            case .success(let size):
                observer.onNext(Int64(size))
        case .failure(let error):
                observer.error(error)
          log.error("calculate cache failed with error:\\(error)")
        }
            observer.onCompleted()
        }
        return Disposables.create()
    }
    
    

    从上述代码中可以看出,Observable 在创建的时候,就已经确定了通过某种固定的逻辑,去发出事件,从而产生事件流。

    上面这个例子中的固定逻辑,指的就是计算 cache 大小。通过 cache 计算结果,来决定发出 onNext , error 或者 completable 事件。

    而在日常开发中,我们通常需要根据不同的逻辑,来决定发出事件。抽象来说,就是需要一个既能作为 Observable 又能作为 Observer 的东西。这种东西称为 Subject:

    let subject = PublishSubject<String>()
    
    subject.on(.next("Is anyone listening?"))
    
    let subscriptionOne = subject
      .subscribe(onNext: { string in
        print(string)
      })
    
    

    上述代码创建了一个 PublishSubject, 它的名字很贴切:就像一个报纸出版商一样,它接收信息,然后发布给订阅者。

    执行上述代码,会发现控制台中没有打印任何东西,这是因为:PublishSubject 只给当前的订阅者发出事件,如果一个 observer 是在事件发出之后才订阅的,那么将不会收到任何事件。

    什么是 Subject?

    Subject 既是一个 observable, 又是一个 observer。在上面的例子中,可以看到 subject 既可以接收事件,又可以被订阅。

    Observable 和 Subject 的区别,除了 Subject 既可以作为 Observable,又可以作为 Observer以外,也可以这么理解:

    Observable 已经把各种事件都定好了,比如发送网络请求,然后 Observer 在 subscribe 的时候,就触发这个网络请求,然后发送各种事件。

    Subject 则是事件没有定好,可以灵活地根据业务需求去进行触发,比如选择相片,比如发送网络请求,然后发送各种事件。

    在 RxSwift 中,有四种类型的 subject:

    • PublishSubject: 初始时是空的,只向订阅者发出新元素。
    • BehaviorSubject: 有一个初始值,并将其初始值和最新的元素发送给新的订阅者。
    • ReplaySubject: 初始化时需要有一个缓冲区大小,并将维持一个该大小的缓冲区,缓冲区内的元素都会发送给新的订阅者。
    • AsyncSubject: 只发出序列中的最后一个 next 事件,并且只在 subject 接收到 completed 事件时才发出。这是一种很少使用的 subject。

    RxSwift 中还提供了一种叫做 Relay 的概念(在使用是需要 import RxRelay),RxSwift 中提供了两种 relay:

    • PublishRelay
    • BehaviorRelay

    这两种 relay 包含着对应的 subject, 但只能接收和转发 next 事件,不能添加 completed 或者 error 事件,所以它们对于非终止序列来说是非常友好的。

    Relay 只能 accept 事件,不能发送 completed 或者 error 等终止事件,因此没有结束的概念。如果要正确释放 replay, 需要把它添加到 disposeBag 中。

    使用 PublishSubject

    let subject = PublishSubject<String>()
    
    subject.on(.next("Is anyone listening?"))
    
    let subscriptionOne = subject
      .subscribe(onNext: { string in
        print(string)
      })
    
    let subscriptionTwo = subject
      .subscribe { event in
        print("2)", event.element ?? event)
      }
    
    subject.onNext("3")
    
    /**
    output:
    3
    2) 3
    **/
    
    subscriptionOne.dispose()
    subject.onNext("4")
    
    /**
    output:
    2) 4
    **/
    
    

    当一个 publish subject 收到一个 completed 或 errror 事件,也就是终止事件,它将向新的订阅者发出该终止事件,它将不再发出 next 事件。

    而且,它将向后来的订阅者重新发出其终止事件。(Subject 被终止后,如果还有 observer 去 subscribe 它,那么 subject 会重复给这些 observers 发送终止事件

    // 1
    subject.onCompleted()
    
    // 2
    subject.onNext("5")
    
    // 3
    subscriptionTwo.dispose()
    
    let disposeBag = DisposeBag()
    
    // 4
    subject
      .subscribe {
        print("3)", $0.element ?? $0)
      }
      .disposed(by: disposeBag)
    
    subject.onNext("?")
    
    /**
    output:
    2) completed
    3) completed
    **/
    
    

    使用 BehaviorSubject

    顶部的第一行是 subject。

    第二行的第一个订阅者在 1 之后但在 2 之前订阅,所以它在订阅后立即收到 1,然后在主体发出 2 和 3 的时候收到。

    同样地,第二个订阅者在 2 之后但在 3 之前订阅,所以它在订阅后立即收到 2,然后在 3 被发出时收到。

    // 1
    enum MyError: Error {
      case anError
    }
    
    // 2
    func print<T: CustomStringConvertible>(label: String, event: Event<T>) {
      print(label, (event.element ?? event.error) ?? event)
    }
    
    // 3
    example(of: "BehaviorSubject") {
      // 4
      let subject = BehaviorSubject(value: "Initial value")
      let disposeBag = DisposeBag()
    
        subject
          .subscribe {
            print(label: "1)", event: $0)
          }
          .disposed(by: disposeBag
    
        /**
        1) Initial value
        **/
    
        subject.onNext("X")
    
        /**
        1) X
        **/
    
        // 1
        subject.onError(MyError.anError)
    
        // 2
        subject
          .subscribe {
            print(label: "2)", event: $0)
          }
          .disposed(by: disposeBag)
    
        /**
        1) anError
        2) anError
        **/
    }
    
    

    BehaviorSubject 向新的订阅者重放他们的最新值。这使得它们很适合用来模拟各种状态的转移,比如“请求正在加载中”→“请求完成”。

    那如果想要显示比上一个值还更多的内容呢,比如在搜索框上,需要显示最近使用的五个搜索值,这个时候就要用到 ReplaySubject 了。

    使用 ReplaySubject

    ReplaySubject 将暂时缓存、或缓冲他们发出的最新元素,直到达到你选择的指定大小。然后,他们将向新的订阅者重新发出该缓冲区内的元素。

    下面的大理石图描述了一个缓冲区大小为2的重放主体:

    第一个订阅者(中间一行)已经订阅了 replay subject(最上面一行),所以它在元素被发射出来的时候得到了元素。第二个订阅者(底线)在 2 之后订阅了,所以它得到了 1 和 2 的重放。

    请记住,当使用一个 replay subject 时,这个缓冲区是在内存中保存的,所以很有可能会导致太高的内存占用。比如你为某种类型的 replay subject 设置一个大的缓冲区大小,而这种类型的实例都会占用大量的内存,比如图像。

    另一件需要注意的事情是创建一个数组类型的 replay subject。每个发射的元素将是一个数组,所以缓冲区的大小将缓冲那么多数组。如果不小心的话,也很容易在这里产生内存压力。

    example(of: "ReplaySubject") {
      // 1
      let subject = ReplaySubject<String>.create(bufferSize: 2)
      let disposeBag = DisposeBag()
    
      // 2
      subject.onNext("1")
      subject.onNext("2")
      subject.onNext("3")
    
      // 3
      subject
        .subscribe {
          print(label: "1)", event: $0)
        }
        .disposed(by: disposeBag)
    
      subject
        .subscribe {
          print(label: "2)", event: $0)
        }
        .disposed(by: disposeBag)
    
    /**
    --- Example of: ReplaySubject ---
    1) 2
    1) 3
    2) 2
    2) 3
    **/
    
        subject.onNext("4")
        subject.onError(MyError.anError)
    
        subject
          .subscribe {
            print(label: "3)", event: $0)
          }
          .disposed(by: disposeBag)
    
    /**
    前两个订阅者将正常接收当前元素,因为当新元素被添加到主题时,他们已经被订阅了,而新的第三个订阅者将得到最后两个缓冲的元素重放给它。
    虽然最后订阅流中发出了一个 error 事件,但是缓冲区还在内存中,所以它还会把缓冲区之前的元素发给订阅者。
    
    1) 4
    2) 4
    1) anError
    2) anError
    3) 3
    3) 4
    3) anError
    **/
        }
    
    subject.dispose()
    
    // 因为 subject 在前面已经发出了 error 事件,所以它被终止并且释放了,这里再调用 dispose 会报错
    // 3) Object `RxSwift...ReplayMany<Swift.String>` was already disposed.
    
    

    使用 Relay

    在前面的介绍中,我们知道:Relay 实际上是对应 Subject 的一层封装——PublishRelay 是 PublishSubject 的封装,BehaviorRelay 是 BehaviorSubject 的封装。它和 Subject 不一样的地方在于:它只能通过 accept(_:) 方法接收并发出事件,它不能使用 onNext(_:) 发出事件,也不能使用 onCompleted() 或者 onError(_:) 去终止订阅流,因此,Relay 保证了永远不会终止。

    let relay = PublishRelay<String>()  
    let disposeBag = DisposeBag()
    relay.accept("Knock knock, anyone home?")
    relay
      .subscribe(onNext: {
        print($0)
      })
      .disposed(by: disposeBag)
    
    relay.accept("1")
    
    // output: 1
    
    relay.accept(MyError.anError)
    relay.onCompleted()
    // compile error
    
    

    相关文章

      网友评论

          本文标题:RxSwift #03 | Subjects

          本文链接:https://www.haomeiwen.com/subject/aozwxrtx.html