美文网首页Rx
RxSwift源码分析(11)——publish

RxSwift源码分析(11)——publish

作者: 无悔zero | 来源:发表于2020-10-13 19:14 被阅读0次

    今天分析一下publish这个高阶函数,这个函数使用起来有点像网络请求,需要启动连接才会进行相关订阅、发送和响应。可以直接来看下面的例子:

    let ob = Observable<Any>.create { (observer) -> Disposable in
        observer.onNext("连接后才订阅、发送和响应")
        observer.onCompleted()
        return Disposables.create()
    }.publish()  //注意
    ob.subscribe { (text) in
        print(text)
    }.disposed(by: disposeBag)
    _ = ob.connect()  //连接
    
    1. 我们先来看publish()源码:
    extension ObservableType {
        public func publish() -> ConnectableObservable<Element> {
            return self.multicast { PublishSubject() }
        }
    }
    
    public final class PublishSubject<Element>
        : Observable<Element>
        , SubjectType
        , Cancelable
        , ObserverType
        , SynchronizedUnsubscribeType {
        ...
    }
    

    publish函数其实就是创建了ConnectableObservableAdapter序列返回:

    extension ObservableType {
        ...
        public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
            -> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
            return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
        }
    }
    

    ConnectableObservableAdapter其实也继承了ObservableObservableType,所以也是一个序列:

    final private class ConnectableObservableAdapter<Subject: SubjectType>
        : ConnectableObservable<Subject.Element> {
        ...
        init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
            self._source = source  //保存源序列
            self._makeSubject = makeSubject  //保存PublishSubject
            self._subject = nil
            self._connection = nil
        }
        ...
    }
    
    public class ConnectableObservable<Element>
        : Observable<Element>
        , ConnectableObservableType {
        ...
    }
    
    public class Observable<Element> : ObservableType {
        ...
    }
    
    1. 然后序列开始订阅:
    ob.subscribe { (text) in
        ...
    }
    

    根据RxSwift核心逻辑,来到ConnectableObservableAdaptersubscribe函数,可以看到有一个lazySubjectlazySubject只会创建一次,有利于节省内存和减少性能消耗:

    final private class ConnectableObservableAdapter<Subject: SubjectType>
        : ConnectableObservable<Subject.Element> {
        ...
        //只创建一次
        fileprivate var lazySubject: Subject {
            if let subject = self._subject {
                return subject
            }
    
            let subject = self._makeSubject()
            self._subject = subject
            return subject
        }
    
        override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
            return self.lazySubject.subscribe(observer)
        }
    }
    
    1. lazySubject订阅后来到:
    public final class PublishSubject<Element>
        : Observable<Element>
        , SubjectType
        , Cancelable
        , ObserverType
        , SynchronizedUnsubscribeType {
        public typealias SubjectObserverType = PublishSubject<Element>
    
        typealias Observers = AnyObserver<Element>.s
        ...
        private var _observers = Observers()  //默认初始化
        ...
        public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
            self._lock.lock()
            let subscription = self._synchronized_subscribe(observer)
            self._lock.unlock()
            return subscription
        }
    
        func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
            ...
            //重点
            let key = self._observers.insert(observer.on)
            return SubscriptionDisposable(owner: self, key: key)
        }
        ...
    }
    

    _synchronized_subscribe函数的代码很容易迷惑人,其实它的重点在self._observers.insert(observer.on)

    1. self._observers是默认初始化的,具有保存的作用,它把observer.on保存下来了:
    struct Bag<T> : CustomDebugStringConvertible {
        ...
            mutating func insert(_ element: T) -> BagKey {
            let key = _nextKey
    
            _nextKey = BagKey(rawValue: _nextKey.rawValue &+ 1)
    
            if _key0 == nil {
                _key0 = key
                _value0 = element
                return key
            }
    
            _onlyFastPath = false
    
            if _dictionary != nil {
                _dictionary![key] = element
                return key
            }
    
            if _pairs.count < arrayDictionaryMaxSize {
                _pairs.append((key: key, value: element)) //保存了observer.on
                return key
            }
            
            _dictionary = [key: element]
            
            return key
        }
        ...
    }
    
    1. 保存之后,下一步就是在适当的时候调用connect
    ob.connect()
    

    进入源码,我们可以看到先创建connection,然后用self._source源序列来进行subscribe订阅connection

    final private class ConnectableObservableAdapter<Subject: SubjectType>
        : ConnectableObservable<Subject.Element> {
        ...
        override func connect() -> Disposable {
            return self._lock.calculateLocked {
                if let connection = self._connection {
                    return connection
                }
    
                let singleAssignmentDisposable = SingleAssignmentDisposable()
                let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
                self._connection = connection
                let subscription = self._source.subscribe(connection)
                singleAssignmentDisposable.setDisposable(subscription)
                return connection
            }
        }
        ...
    }
    
    final private class Connection<Subject: SubjectType>: ObserverType, Disposable {
        ...
        init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
            self._parent = parent
            self._subscription = subscription
            self._lock = lock
            self._subjectObserver = subjectObserver
        }
        ...
    }
    
    1. 根据RxSwift核心逻辑,最终来到connectionon函数:
    final private class Connection<Subject: SubjectType>: ObserverType, Disposable {
        ...
        func on(_ event: Event<Subject.Observer.Element>) {
            if isFlagSet(self._disposed, 1) {
                return
            }
            if event.isStopEvent {
                self.dispose()
            }
            self._subjectObserver.on(event)
        }
        ...
    }
    
    1. 然后调用self._subjectObserver.on(event)=>
      Connection._subjectObserver.on=>
      ConnectableObservableAdapter.lazySubject.on=>
      ConnectableObservableAdapter._makeSubject.on=>
      PublishSubject.on
    public final class PublishSubject<Element>
        : Observable<Element>
        , SubjectType
        , Cancelable
        , ObserverType
        , SynchronizedUnsubscribeType {
        ...
        public func on(_ event: Event<Element>) {
            ...
            dispatch(self._synchronized_on(event), event)
        }
    
        func _synchronized_on(_ event: Event<Element>) -> Observers {
            self._lock.lock(); defer { self._lock.unlock() }
            switch event {
            case .next:
                if self._isDisposed || self._stopped {
                    return Observers()
                }
                
                return self._observers
            case .completed, .error:  
                ...
                return Observers()
            }
        }
        ...
    }
    

    然后通过_synchronized_on函数拿到self._observers后执行dispatch(self._synchronized_on(event), event)

    1. 最后遍历调用保存的observer.on
    @inline(__always)
    func dispatch<Element>(_ bag: Bag<(Event<Element>) -> Void>, _ event: Event<Element>) {
        bag._value0?(event)
    
        if bag._onlyFastPath {
            return
        }
    
        let pairs = bag._pairs
        for i in 0 ..< pairs.count {
            pairs[i].value(event)
        }
    
        if let dictionary = bag._dictionary {
            for element in dictionary.values {
                element(event)
            }
        }
    }
    
    1. 根据RxSwift核心逻辑observer.on就是AnonymousObserver.on,最终来到外面的响应闭包:
    ob.subscribe { (text) in
        print(text)
    }
    

    整个流程简单来说就是:
    1.创建序列,用publish包装一下;
    2.然后通过subscribe函数保存observer.on
    3.最后调用connect来进行源序列的订阅,以此唤起发送和响应。

    相关文章

      网友评论

        本文标题:RxSwift源码分析(11)——publish

        本文链接:https://www.haomeiwen.com/subject/apeupktx.html