美文网首页
RxSwift-publish源码解析

RxSwift-publish源码解析

作者: king_jensen | 来源:发表于2019-08-14 13:27 被阅读0次

    publish使用

    let subject = PublishSubject<Any>()
            subject.subscribe{print("00:\($0)")}
                .disposed(by: disposeBag)
            
            let netOB = Observable<Any>.create { (observer) -> Disposable in
                    sleep(2)// 模拟网络延迟
                    print("我开始请求网络了")
                    observer.onNext("请求到的网络数据")
                    observer.onNext("请求到的本地")
                    observer.onCompleted()
                    return Disposables.create {
                        print("销毁回调了")
                    }
                }.publish()
            
            netOB.subscribe(onNext: { (anything) in
                    print("订阅1:",anything)
                })
                .disposed(by: disposeBag)
    
            // 我们有时候不止一次网络订阅,因为有时候我们的数据可能用在不同的额地方
            // 所以在订阅一次 会出现什么问题?
            netOB.subscribe(onNext: { (anything) in
                    print("订阅2:",anything)
                })
                .disposed(by: disposeBag)
            
            _ = netOB.connect()
      /*打印结果:
             我开始请求网络了
             订阅1: 请求到的网络数据
             订阅2: 请求到的网络数据
             订阅1: 请求到的本地
             订阅2: 请求到的本地
             销毁回调了
             */
    

    我们看到网络只会请求一次。这种请求一次,订阅到多个不同的地方的场景很多。所以我们有必要了解一下publish是怎么实现的。
    探索publish的源码,还是从RxSwift的流程:创建序列,订阅序列,发送响应入手分析。

    publish的序列创建

    public func publish() -> ConnectableObservable<Element> {
            return self.multicast { PublishSubject() }
        }
    

    我们看到publish实际返回的是multicast

      public func multicast<Subject: SubjectType>(_ subject: Subject)
            -> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
            return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: { subject })
        }
    

    返回类型为ConnectableObservableAdapter

    init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
            self._source = source
            self._makeSubject = makeSubject
            self._subject = nil
            self._connection = nil
        }
    

    1.保存源序列_source
    2.保存初始化序列PublishSubject()_makeSubject

    序列的订阅

    当序列订阅时,调用ConnectableObservableAdaptersubscribe(observer)

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
            return self.lazySubject.subscribe(observer)
        }
    

    这里订阅lazySubject序列,这是一个懒加载方式,每次订阅的都是序列_subject,即创建中保存的_makeSubject

    fileprivate var lazySubject: Subject {
            if let subject = self._subject {
                return subject
            }
    
            let subject = self._makeSubject()
            self._subject = subject
            return subject
        }
    

    订阅_subject时,将会调用PublishSubject.subscribe(observer)

    let subscription = self._synchronized_subscribe(observer)
    

    调用_synchronized_subscribe(observer)

    func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
            if let stoppedEvent = self._stoppedEvent {
                observer.on(stoppedEvent)
                return Disposables.create()
            }
            
            if self._isDisposed {
                observer.on(.error(RxError.disposed(object: self)))
                return Disposables.create()
            }
            
            let key = self._observers.insert(observer.on)
            return SubscriptionDisposable(owner: self, key: key)
        }
    

    调用self._observers.insert(observer.on)

    mutating func insert(_ element: T) -> BagKey {
            let key = _nextKey
    
            _nextKey = BagKey(rawValue: _nextKey.rawValue &+ 1)
    
            if _key0 == nil {
                _key0 = key
                _value0 = element
                return key
            }
    
            _onlyFastPath = false
    
            if _dictionary != nil {
                _dictionary![key] = element
                return key
            }
    
            if _pairs.count < arrayDictionaryMaxSize {
                _pairs.append((key: key, value: element))
                return key
            }
            
            _dictionary = [key: element]
            
            return key
        }
    

    publish订阅时,将observer.on的回调方法保存起来,如果是首次订阅将key保存在_key0,observer.on保存在_value0,不是首次保存订阅,那么就保存在_dictionary中。
    keyBagKey类型:

    struct BagKey {
        fileprivate let rawValue: UInt64
    }
    

    publish将所有订阅的观察者回调方法保存起来,以备后续发起响应时,调用所有观察者的回调方法。

    发送响应

    使用connect发送响应

    override func connect() -> Disposable {
            return self._lock.calculateLocked {
                if let connection = self._connection {
                    return connection
                }
    
                let singleAssignmentDisposable = SingleAssignmentDisposable()
                let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
                self._connection = connection
                let subscription = self._source.subscribe(connection)
                singleAssignmentDisposable.setDisposable(subscription)
                return connection
            }
        }
    

    self._lock.calculateLocked加了递归锁

     final func calculateLocked<T>(_ action: () -> T) -> T {
            self.lock(); defer { self.unlock() }
            return action()
        }
    

    然后执行action(),就是下列代码:

    if let connection = self._connection {
                    return connection
                }
    
                let singleAssignmentDisposable = SingleAssignmentDisposable()
                let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
                self._connection = connection
                let subscription = self._source.subscribe(connection)
                singleAssignmentDisposable.setDisposable(subscription)
                return connection
    

    1.保证只有一个_connection,当_connection存在,直接返回_connection。保证下列的原序列_source永远只被订阅一次,那么外界网络请求的闭包就只会执行一次。
    2.当_connectionnil时,会创建

     let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
           self._connection = connection
      let subscription = self._source.subscribe(connection)
    

    (1)创建Connection

     init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
            self._parent = parent
            self._subscription = subscription
            self._lock = lock
            self._subjectObserver = subjectObserver
        }
    

    保存ConnectableObservableAdapter_parent,self.lazySubject.asObserver()保存至_subscription
    (2)保存Connection_connection
    (3)订阅源序列_source,connection是观察者. 将会来到AnonymousObservable.run,然后再到sink.run,最终会调用到_subscribeHandler执行到外界的闭包中。

    在外界闭包中,observer.onNext("请求到的网络数据")发起响应,原source的观察者是Connection,Connection.on

    func on(_ event: Event<Subject.Observer.Element>) {
            if isFlagSet(self._disposed, 1) {
                return
            }
            if event.isStopEvent {
                self.dispose()
            }
            self._subjectObserver.on(event)
        }
    

    Connection.on调用self._subjectObserver.on(event)
    self._subjectObserver就是publish创建时保存的那个PublishSubject()
    self._subjectObserver.on(event)等同于PublishSubject.on(event)

     public func on(_ event: Event<Element>) {
            #if DEBUG
                self._synchronizationTracker.register(synchronizationErrorMessage: .default)
                defer { self._synchronizationTracker.unregister() }
            #endif
            dispatch(self._synchronized_on(event), event)
        }
    

    1.调用_synchronized_on

       func _synchronized_on(_ event: Event<Element>) -> Observers {
            self._lock.lock(); defer { self._lock.unlock() }
            switch event {
            case .next:
                if self._isDisposed || self._stopped {
                    return Observers()
                }
                
                return self._observers
            case .completed, .error:
                if self._stoppedEvent == nil {
                    self._stoppedEvent = event
                    self._stopped = true
                    let observers = self._observers
                    self._observers.removeAll()
                    return observers
                }
    
                return Observers()
            }
        }
    

    _synchronized_on中返回self._observers,即订阅时保存的所有的observer.on

    2.dispatch(self._synchronized_on(event), event)调用dispatch

    func dispatch<Element>(_ bag: Bag<(Event<Element>) -> Void>, _ event: Event<Element>) {
        bag._value0?(event)
    
        if bag._onlyFastPath {
            return
        }
    
        let pairs = bag._pairs
        for i in 0 ..< pairs.count {
            pairs[i].value(event)
        }
    
        if let dictionary = bag._dictionary {
            for element in dictionary.values {
                element(event)
            }
        }
    }
    

    bag._value0?(event)首先调用第一个订阅者的回调方法。
    然后循环执行_pairs_dictionary中所有保存的回调方法。
    以上就是publish所有流程的源码解析。

    总结:

    1.publish创建,实际返回是ConnectableObservableAdapter序列,保存了源序列,并且创建保存PublishSubject()序列。
    2.publish订阅时,订阅的是中间层ConnectableObservableAdapterPublishSubject()序列,并保存所有的回调方法observer.on
    3.connect时,实际上是订阅了源序列,观察者为我们自己创建的Connection对象,这个时候将会来到创建源序列时的闭包,我们就是在这个闭包中请求网络.
    4.给源序列发送响应时,实际上会来到观察者Connection.on方法,Connection中将会对订阅时保存的_observers一一执行。
    以上就是publish序列的所有流程。

    相关文章

      网友评论

          本文标题:RxSwift-publish源码解析

          本文链接:https://www.haomeiwen.com/subject/zruwjctx.html