RxSwift-Subject即攻也守

作者: Cooci_和谐学习_不急不躁 | 来源:发表于2019-08-12 16:19 被阅读16次

    在掌握前面序列以还有观察者的前提下,我们今天来看一个非常特殊的类型-Subject.为什么说它特殊呢?原因很简单:Subject既可以做序列,也可以做观察者!正是因为这一特性,所以在实际开发中被大量运用。下面我们一起来解读一下这个特殊的Subject

    即攻也守的原理

    首先我们来看看:SubjectType的原理!

    public protocol SubjectType : ObservableType {
          // 关联了观察者类型,具备这个类型的能力
        associatedtype SubjectObserverType : ObserverType
        func asObserver() -> SubjectObserverType
    }
    
    • SubjectType首先就是继承了ObservableType,具有序列特性
    • 关联了观察者类型,具备这个类型的能力
    • 下面我们通过一个具体类型来感受一下subject
    // 1:初始化序列
    let publishSub = PublishSubject<Int>() 
    // 2:发送响应序列
    publishSub.onNext(1)
    // 3:订阅序列
    publishSub.subscribe { print("订阅到了:",$0)}
        .disposed(by: disposbag)
    // 再次发送响应
    publishSub.onNext(2)
    publishSub.onNext(3)
    
    • 很明显能够订阅信号(序列最基本的能力)
    • 能够发送响应,又是观察者的能力
    • 查看底层源码分析

    订阅响应流程

    public override func subscribe -> Disposable {
        self._lock.lock()
        let subscription = self._synchronized_subscribe(observer)
        self._lock.unlock()
        return subscription
    }
    
    func _synchronized_subscribe -> Disposable  {
        // 省略不必要的代码
        let key = self._observers.insert(observer.on)
        return SubscriptionDisposable(owner: self, key: key)
    }
    
    • self._observers.insert(observer.on): 通过一个集合添加进去所有的订阅事件,很明显在合适的地方一次性全部执行
    • 其中也返回这次订阅的销毁者,方便执行善后工作: synchronizedUnsubscribe->self._observers.removeKey(disposeKey)
    mutating func removeKey(_ key: BagKey) -> T? {
        if _key0 == key {
            _key0 = nil
            let value = _value0!
            _value0 = nil
            return value
        }
    
        if let existingObject = _dictionary?.removeValue(forKey: key) {
            return existingObject
        }
    
        for i in 0 ..< _pairs.count where _pairs[i].key == key {
            let value = _pairs[i].value
            _pairs.remove(at: i)
            return value
        }
        return nil
    }
    
    • 便利通过key获取响应bag中的value,执行集合移除
    • 因为没有相应持有关系,达到自动释放销毁

    发送信号流程

        public func on(_ event: Event<Element>) {
            dispatch(self._synchronized_on(event), event)
        }
    
    • 这个地方估计大家看起来麻烦恶心一点,但是你用心看不难体会
    • 这里主要调用了dispatch函数,传了两个参数:self._synchronized_on(event)event
    • 查看dispatch函数源码
    func dispatch<E>(_ bag: Bag) {
        bag._value0?(event)
    
        if bag._onlyFastPath {
            return
        }
    
        let pairs = bag._pairs
        for i in 0 ..< pairs.count {
            pairs[i].value(event)
        }
    
        if let dictionary = bag._dictionary {
            for element in dictionary.values {
                element(event)
            }
        }
    }
    
    • bag._value0?(event)首先执行事件的回调
    • 判断bag._onlyFastPath的情况,默认会开启快速通道!
    • 如果是开启慢速通道,需要从刚刚添加进bag包裹里面的匹配对挨个进行pairs[i].value(event),外界事件回调,然后拿回外界封装的闭包的闭包调用:element(event)
    func _synchronized_on(_ event: Event<E>) -> Observers {
        self._lock.lock(); defer { self._lock.unlock() }
        switch event {
        case .next:
            if self._isDisposed || self._stopped {
                return Observers()
            }
            
            return self._observers
        case .completed, .error:
            if self._stoppedEvent == nil {
                self._stoppedEvent = event
                self._stopped = true
                let observers = self._observers
                self._observers.removeAll()
                return observers
            }
    
            return Observers()
        }
    }
    
    • 这里如果self._isDisposed || self._stopped成立就会返回一个空的集合,也就没有序列的响应
    • .completed, .error都会改变状态self._stopped = true,也就是说序列完成或者错误之后都无法再次响应了
    • .completed, .error还会移除添加在集合里面的内容

    其实如果你对前面序列的流程掌握了,这个subject的流程也不再话下,只是subject 把订阅流程和响应流程都内部实现,所以也就没有必要引入sink

    各种Subject

    PublishSubject

    可以不需要初始来进行初始化(也就是可以为空),并且它只会向订阅者发送在订阅之后才接收到的元素。

    // PublishSubject
    // 1:初始化序列
    let publishSub = PublishSubject<Int>() //初始化一个PublishSubject 装着Int类型的序列
    // 2:发送响应序列
    publishSub.onNext(1)
    // 3:订阅序列
    publishSub.subscribe { print("订阅到了:",$0)}
        .disposed(by: disposbag)
    // 再次发送响应
    publishSub.onNext(2)
    publishSub.onNext(3)
    
    • 信号:1是无法被订阅的,只接受订阅之后的响应

    BehaviorSubject

    通过一个默认初始值来创建,当订阅者订阅BehaviorSubject时,会收到订阅后Subject上一个发出的Event,如果还没有收到任何数据,会发出一个默认值。之后就和PublishSubject一样,正常接收新的事件。

    publish 稍微不同就是behavior这个家伙有个存储功能:存储上一次的信号

    // BehaviorSubject
    // 1:创建序列
    let behaviorSub = BehaviorSubject.init(value: 100)
    // 2:发送信号
    behaviorSub.onNext(2)
    behaviorSub.onNext(3)
    // 3:订阅序列
    behaviorSub.subscribe{ print("订阅到了:",$0)}
        .disposed(by: disposbag)
    // 再次发送
    behaviorSub.onNext(4)
    behaviorSub.onNext(5)
    // 再次订阅
    behaviorSub.subscribe{ print("订阅到了:",$0)}
        .disposed(by: disposbag)
    
    • 当没有信号的时候,会默认发送 信号:100
    • 只能储存一个信号:信号2 会被 信号3 覆盖
    • 订阅信号之前能够储存信号
    // 初始化
    public init(value: Element) {
          self._element = value
    }
    
    // 事件响应
    func _synchronized_on(_ event: Event<E>) -> Observers {
    
        switch event {
        case .next(let element):
            self._element = element
        case .error, .completed:
            self._stoppedEvent = event
        }
        return self._observers
    }
    
    • 初始化的时候带有一个属性保存一个信号
    • 事件响应:新事件会覆盖原来的事件
    • 其他流程和publish一样

    ReplaySubject

    ReplaySubject 发送源Observable 的所有事件无论observer什么时候开始订阅。

    // ReplaySubject
    // 1:创建序列
    let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
    // let replaySub = ReplaySubject<Int>.createUnbounded()
    
    // 2:发送信号
    replaySub.onNext(1)
    replaySub.onNext(2)
    replaySub.onNext(3)
    replaySub.onNext(4)
    
    // 3:订阅序列
    replaySub.subscribe{ print("订阅到了:",$0)}
        .disposed(by: disposbag)
    // 再次发送
    replaySub.onNext(7)
    replaySub.onNext(8)
    replaySub.onNext(9)
    
    • 一个bufferSize空间,想存储多少次响应就是多少次
    • 其他流程照旧
    • 源码里面就是相对于BehaviorSubject的储存属性变成了集合

    AsyncSubject

    AsyncSubject只发送由源Observable发送的最后一个事件,并且只在源Observable完成之后。(如果源Observable没有发送任何值,AsyncSubject也不会发送任何值。)

    // AsyncSubject
    // 1:创建序列
    let asynSub = AsyncSubject<Int>.init()
    // 2:发送信号
    asynSub.onNext(1)
    asynSub.onNext(2)
    // 3:订阅序列
    asynSub.subscribe{ print("订阅到了:",$0)}
        .disposed(by: disposbag)
    // 再次发送
    asynSub.onNext(3)
    asynSub.onNext(4)
    //        asynSub.onError(NSError.init(domain: "lgcooci", code: 10086, userInfo: nil))
    asynSub.onCompleted()
    
    • 我们普通序列发送回来,都不会响应!直到完成序列响应
    func _synchronized_on(_ event: Event<E>) -> (Observers, Event<E>) {
        switch event {
        case .next(let element):
            self._lastElement = element
            return (Observers(), .completed)
        case .error:
            self._stoppedEvent = event
    
            let observers = self._observers
            self._observers.removeAll()
    
            return (observers, event)
        case .completed:
    
            let observers = self._observers
            self._observers.removeAll()
    
            if let lastElement = self._lastElement {
                self._stoppedEvent = .next(lastElement)
                return (observers, .next(lastElement))
            }
            else {
                self._stoppedEvent = event
                return (observers, .completed)
            }
        }
    }
    
    • 可以很清晰的看出,普通Next事件都是,元素的替换,根本没有响应出来
      *complete事件发送到时候,就会把最新保存的self._lastElement当成事件值传出去,响应.next(lastElement)
    • 如果没有保存事件就发送完成事件:.completed
    • error事件会移空整个响应集合:self._observers.removeAll()

    Variable

    Variable废弃了,这里贴出代码以供大家遇到老版本! 由于这个Variable的灵活性所以在开发里面应用非常之多!

    // Variable : 5.0已经废弃(BehaviorSubject 替换) - 这里板书 大家可以了解一下
    // 1:创建序列
    let variableSub = Variable.init(1)
    // 2:发送信号
    variableSub.value = 100
    variableSub.value = 10
    // 3:订阅信号        })
    
    variableSub.asObservable().subscribe{ print("订阅到了:",$0)}
        .disposed(by: disposbag)
    // 再次发送
    variableSub.value = 1000
    

    BehaviorRelay

    • 替换原来的Variable
    • 可以储存一个信号
    • 随时订阅响应
    • 响应发送的时候要注意:behaviorR.accept(20)
    let behaviorRelay = BehaviorRelay(value: 100)
    behaviorRelay.subscribe(onNext: { (num) in
        print(num)
    .disposed(by: disposbag)
    print("打印:\(behaviorRelay.value)")
    
    behaviorRelay.accept(1000)
    

    Subject在实际开发中,应用非常的广泛!平时很多时候都会在惆怅选择什么序列更合适,那么聪明的你一定要掌握底层的原理,并不说你背下特色就能真正开发的,因为如果后面一旦发生了BUG,你根本无法解决。作为iOS中高级发开人员一定要知其然,而知其所以然!碌碌无为的应用层开发毕竟走不长远!
    就问此时此刻还有谁?45度仰望天空,该死!我这无处安放的魅力!

    相关文章

      网友评论

        本文标题:RxSwift-Subject即攻也守

        本文链接:https://www.haomeiwen.com/subject/kfcujctx.html