美文网首页
RxSwift中的Subject攻受兼备

RxSwift中的Subject攻受兼备

作者: 简_爱SimpleLove | 来源:发表于2019-08-20 17:59 被阅读0次

在前面的RxSwift中的publish和connect函数这篇文章中,我们认识了一个类PublishSubject,既可以作为观察者,也可以作为订阅者。

PublishSubject其实就是Subject的一种。

那么Subject为什么既可以作为观察者,又可以作为订阅者呢?

我们来看看Subject都继承的关键协议SubjectType:

/// Represents an object that is both an observable sequence as well as an observer.
public protocol SubjectType : ObservableType {
    /// The type of the observer that represents this subject.
    ///
    /// Usually this type is type of subject itself, but it doesn't have to be.
    associatedtype SubjectObserverType : ObserverType

    /// Returns observer interface for subject.
    ///
    /// - returns: Observer interface for subject.
    func asObserver() -> SubjectObserverType
    
}
  • 官方注释:这个协议代表遵守这个协议的类,既可以是一个可观察序列,也可以是一个观察者
  • 观察者类型通常就是subject本身的类型,但也不一定必须是
  • 继承了ObservableType,所以具有序列的特性,即有subscribe方法,可以订阅观察者
  • 关联了一个观察者类型,也就具备了观察者的特性,即有on函数,可以响应事件

常见的Subject

PublishSubject

可以不需要初始来进行初始化(也就是可以为空),并且它只会向订阅者发送在订阅之后才接收到的元素。

        // 1:初始化序列
        let publishSub = PublishSubject<Int>() //初始化一个PublishSubject 装着Int类型的序列
        // 2:发送响应序列
        publishSub.onNext(1)
        // 3:订阅序列
        publishSub.subscribe { print("订阅到了:",$0)}
            .disposed(by: disposbag)
        publishSub.subscribe { print("订阅到了2:",$0)}
            .disposed(by: disposbag)
        // 再次发送响应
        publishSub.onNext(2)
        publishSub.onNext(3)
        /*
         订阅到了: next(2)
         订阅到了2: next(2)
         订阅到了: next(3)
         订阅到了2: next(3)
         */
  • 1不会被订阅,只会订阅到subscribe方法之后发送的事件
  • 发送的事件,可以被前面多个订阅依次响应
BehaviorSubject

通过一个默认初始值来创建。订阅前面有发送元素,就订阅最后一个元素,如果没有就订阅到默认值。
始终保存最后一个元素。

        // 1:创建序列
        let behaviorSub = BehaviorSubject.init(value: 100)
        // 2:发送信号
        behaviorSub.onNext(2)
        behaviorSub.onNext(3)
        // 3:订阅序列
        behaviorSub.subscribe{ print("front 订阅到了:",$0)}
            .disposed(by: disposbag)
        // 再次发送
        behaviorSub.onNext(4)
        behaviorSub.onNext(5)
        // 再次订阅
        behaviorSub.subscribe{ print("back  订阅到了:",$0)}
            .disposed(by: disposbag)
        /*
         front 订阅到了: next(3)
         front 订阅到了: next(4)
         front 订阅到了: next(5)
         back  订阅到了: next(5)
         */
  • 订阅前的元素,始终响应该订阅前的最后一个元素。订阅之后的就都正常响应
  • 如果订阅之前没有发送元素,就响应默认值value
  • 可以订阅原因,是因为初始化的时候,有个属性,始终用来保存初始值,或者最后一个元素
    public init(value: Element) {
        self._element = value  // 用来保存元素
}
ReplaySubject

升级版的BehaviorSubjectBehaviorSubject只能保存发送的最后一个元素,但是ReplaySubject可以保存倒数的多个发送元素,因为它里面的存储属性变成了集合。

        // 1:创建序列
        let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
        // 2:发送信号
        replaySub.onNext(1)
        replaySub.onNext(2)
        replaySub.onNext(3)
        replaySub.onNext(4)
        // 3:订阅序列
        replaySub.subscribe{ print("订阅到了:",$0)}
            .disposed(by: disposbag)
        // 再次发送
        replaySub.onNext(7)
        replaySub.onNext(8)
        replaySub.onNext(9)
        /*
         订阅到了: next(3)
         订阅到了: next(4)
         订阅到了: next(7)
         订阅到了: next(8)
         订阅到了: next(9)
         */
  • bufferSize能够存储的元素个数
  • 响应订阅之前发送的最后两个元素
  • 订阅之后的发送元素,都正常响应,不受限制
AsyncSubject

只响应发送的最后一个元素,而且只会在发送completed信号的时候,才会响应发送的最后一个元素。
如果在发送completed前,有error信号,那么将不会响应任何元素。

        // 1:创建序列
        let asynSub = AsyncSubject<Int>.init()
        // 2:发送信号
        asynSub.onNext(1)
        asynSub.onNext(2)
        // 3:订阅序列
        asynSub.subscribe{ print("订阅到了:",$0)}
            .disposed(by: disposbag)
        // 再次发送
        asynSub.onNext(3)
        asynSub.onNext(4)
//        asynSub.onError(NSError.init(domain: "lgcooci", code: 10086, userInfo: nil))
        asynSub.onCompleted()
        /*
         没有注释onError的打印:
         订阅到了: error(Error Domain=lgcooci Code=10086 "(null)")
         
         注释onError的打印:
         订阅到了: next(4)
         订阅到了: completed
         */
  • 如果中间有错误事件,那么只会响应错误,不会响应别的任何元素
  • 没有错误事件,只会响应最后一个元素
  • 关键代码如下:
        switch event {
        case .next(let element):
            self._lastElement = element
            return (Observers(), .completed)
        case .error:
            self._stoppedEvent = event

            let observers = self._observers
            self._observers.removeAll()  // 遇到错误,就移除所有元素

            return (observers, event)
        case .completed:

            let observers = self._observers
            self._observers.removeAll()

            if let lastElement = self._lastElement {
                self._stoppedEvent = .next(lastElement)
                return (observers, .next(lastElement)) // 完成的时候,响应最后一个元素
            }
            else {
                self._stoppedEvent = event
                return (observers, .completed)
            }
        }
  • next事件的时候,只是保存当前的元素作为最后一个元素_lastElement
  • error事件的时候,会清空所以的元素
  • completed事件的时候,有最后一个元素_lastElement就响应它,不然就只是发送一个completed信号
Variable(已经被废弃)

有初始值value,通过给value赋值来发送元素。现在使用BehaviorRelay来代替。

        // 1:创建序列
        let variableSub = Variable.init(1)
        // 2:发送信号
        variableSub.value = 100
        variableSub.value = 10
        // 3:订阅信号
        variableSub.asObservable().subscribe{ print("订阅到了:",$0)}
            .disposed(by: disposbag)
        // 再次发送
        variableSub.value = 1000
          /*
         ℹ️ [DEPRECATED] `Variable` is planned for future deprecation. Please consider `BehaviorRelay` as a replacement. Read more at: https://git.io/vNqvx
         订阅到了: next(10)
         订阅到了: next(1000)
         */
  • variableSub.value = 1000赋值的方法来发送元素
  • 里面是用BehaviorSubject实现的,所以又默认值,并且始终保存最后一个元素
 public init(_ value: Element) {
        self._value = value
        self._subject = BehaviorSubject(value: value)
    }
BehaviorRelay

替换原来的Variable,是对BehaviorSubject的一层封装。
不能发送错误和完成信号。

        let behaviorRelay = BehaviorRelay(value:100)
//        behaviorRelay.subscribe(onNext: { (num) in
//            print(num)
//        })
//        .disposed(by: disposbag)
        print("front value:\(behaviorRelay.value)")
        behaviorRelay.accept(1000)
        print("back  value:\(behaviorRelay.value)") // 不需要写subscribe,就可以拿到最新值1000
        /*
         100 (放开注释过后,订阅里面打印的100)
         front value:100
         1000
         back  value:1000
         订阅到了: completed
         */
  • 有一个默认值value
  • behaviorRelay.accept(1000)accept来更新value
  • 可以behaviorRelay.value这样来随时拿到最新的value
  • 存储最后一个元素,behaviorRelay.value一般取最后一个元素value,没有就取默认值value
public final class BehaviorRelay<Element>: ObservableType {
    public typealias E = Element

    private let _subject: BehaviorSubject<Element>

    /// Accepts `event` and emits it to subscribers
    public func accept(_ event: Element) {
        self._subject.onNext(event)
    }

    /// Current value of behavior subject
    public var value: Element {
        // this try! is ok because subject can't error out or be disposed
        return try! self._subject.value()
    }

    /// Initializes behavior relay with initial value.
    public init(value: Element) {
        self._subject = BehaviorSubject(value: value)
    }
}
  • 就是对BehaviorSubject的一次封装
  • accept其实就是BehaviorSubjectonNext方法
  • 没有主动发送errorcompleted的方法

相关文章

网友评论

      本文标题:RxSwift中的Subject攻受兼备

      本文链接:https://www.haomeiwen.com/subject/xxfmsctx.html