美文网首页RxSwift源码解析
RxSwift PublishSubject 分析

RxSwift PublishSubject 分析

作者: 狼性刀锋 | 来源:发表于2018-10-10 16:47 被阅读50次

PublishSubject继承类和协议

public final class PublishSubject<Element>
    : Observable<Element>
    , SubjectType
    , Cancelable
    , ObserverType
    , SynchronizedUnsubscribeType


  • Observable: 被观察者

  • Cancelable: 可以被dispose

  • SynchronizedUnsubscribeType 可以取消订阅

  • ObserverType: 观察者

  • SubjectType:

    1. asObserver
    2. subscribe

这个协议很好的描述对象即可以作为Observer ,也可以作为 Observerable 的特性

Observers Type

要理解PublishSubject的核心,得先理解Observers 这个类型

typealias Observers = AnyObserver<Element>.s

extension AnyObserver {
    /// Collection of `AnyObserver`s
    typealias s = Bag<(Event<Element>) -> ()>
}

about Bag

Bag: 是一个自定义容器,类似于Dictionary, 之所以自定义实现主要是出于性能考虑,有兴趣的可以研究下,这里先留个坑

总的来说Observers 是一个 Bag 容器, 元素的类型为 (Event<Element>) -> ()

订阅

    public override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
        _lock.lock()
        let subscription = _synchronized_subscribe(observer)
        _lock.unlock()
        return subscription
    }

    func _synchronized_subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == E {
        if let stoppedEvent = _stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        
        if _isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
        
        let key = _observers.insert(observer.on)
        return SubscriptionDisposable(owner: self, key: key)
    }


一个同步锁, 几个异常处理, 剩下的就是核心代码:
let key = _observers.insert(observer.on) : 将 Observer.on 函数塞入Bag容器

return SubscriptionDisposable(owner: self, key: key)
返回SubscriptionDisposable

struct SubscriptionDisposable<T: SynchronizedUnsubscribeType> : Disposable {
    private let _key: T.DisposeKey
    private weak var _owner: T?

    init(owner: T, key: T.DisposeKey) {
        _owner = owner
        _key = key
    }

    func dispose() {
        _owner?.synchronizedUnsubscribe(_key)
    }
}


SubscriptionDisposable 功能比较简单,只是对其进行简单的封装,那么这里为什么不让PublishSubject 直接支持Disposable协议呢?这是为了保证设计的一致性,只有被订阅的Observable才能被dispose, 如果让PublishSubject直接支持Disposable协议就有损这个设计原则,固这里选择返回SubscriptionDisposable

on 事件

    public func on(_ event: Event<Element>) {
        #if DEBUG
            _synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { _synchronizationTracker.unregister() }
        #endif
        dispatch(_synchronized_on(event), event)
    }

    func _synchronized_on(_ event: Event<E>) -> Observers {
        _lock.lock(); defer { _lock.unlock() }
        switch event {
        case .next(_):
            if _isDisposed || _stopped {
                return Observers()
            }
            
            return _observers
        case .completed, .error:
            if _stoppedEvent == nil {
                _stoppedEvent = event
                _stopped = true
                let observers = _observers
                _observers.removeAll()
                return observers
            }

            return Observers()
        }
    }


@inline(__always)
func dispatch<E>(_ bag: Bag<(Event<E>) -> ()>, _ event: Event<E>) {
    if bag._onlyFastPath {
        bag._value0?(event)
        return
    }

    let value0 = bag._value0
    let dictionary = bag._dictionary

    if let value0 = value0 {
        value0(event)
    }

    let pairs = bag._pairs
    for i in 0 ..< pairs.count {
        pairs[i].value(event)
    }

    if let dictionary = dictionary {
        for element in dictionary.values {
            element(event)
        }
    }
}


这段逻辑复杂一点,先讲一下Bag容器

  • 数据量为1 的时候 使用,直接使用value0 存储元素
  • 数据量在(1,30)的时候, 使用 ContiguousArray 存储元素, value0 仍然存储0号元素
  • 超过30的数据,使用Dictionary 存储超出的元素

理解了这些就明白dispatch 干了些啥了, 简单说就是

 Bag.foreach { $0(event) }

一个PublishSubject 可以被多次订阅,每次订阅的时候将Observer.on 塞入Bag容器,在执行PublishSubject.on 事件的时候, 会执行所有Observeron 事件。

Unsubscribe

    func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
        _lock.lock()
        _synchronized_unsubscribe(disposeKey)
        _lock.unlock()
    }

    func _synchronized_unsubscribe(_ disposeKey: DisposeKey) {
        _ = _observers.removeKey(disposeKey)
    }

这个就很简单,unsubscribe的时候,将ObserverBag容器中移除就可以。

相关文章

网友评论

    本文标题:RxSwift PublishSubject 分析

    本文链接:https://www.haomeiwen.com/subject/safmaftx.html