美文网首页
07RXSwift的publish状态共享

07RXSwift的publish状态共享

作者: 越来越胖了 | 来源:发表于2019-08-10 23:17 被阅读0次

    问题:如果一个网络请求的数据被多个地方多次使用,我们会进行多次订阅,结果会造成多次的网络请求,代码如下:

     let netOB = Observable<Any>.create { (observer) -> Disposable in
                sleep(2)// 模拟网络延迟
                print("我开始请求网络了")
                observer.onNext("请求到的网络数据")
                observer.onNext("请求到的本地")
                observer.onCompleted()
                return Disposables.create {
                    print("销毁回调了")
                }
                }
                .publish()
            
            netOB.subscribe(onNext: { (anything) in
                print("订阅1:",anything)
            })
                
                .disposed(by: disposeBag)
            
            // 我们有时候不止一次网络订阅,因为有时候我们的数据可能用在不同的额地方
            // 所以在订阅一次 会出现什么问题?
            netOB.subscribe(onNext: { (anything) in
                print("订阅2:",anything)
            })
                .disposed(by: disposeBag)
             _ = netOB.connect()
    

    如果没有添加 .publish() 和 _ = netOB.connect(),则网络请求data序列的创建会执行两次,而这是我们不需要的。

    publish是如何做到网络共享的呢?且看源码分析:

    首先,因为肯定会用到RXSwift的核心逻辑,这里放一张RXSwift的核心逻辑的思维导图以供参考:


    RXSwift核心逻辑简化图.png

    ok,上源码:

    ----------------------------publish进来---------------------------------
    //封装了一个multicast
     public func publish() -> ConnectableObservable<Element> {
            return self.multicast { PublishSubject() }
        }
    
    public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
            -> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
            return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
        }
    ----------------------------核心方法,老长了---------------------------------
    final private class ConnectableObservableAdapter<Subject: SubjectType>
        : ConnectableObservable<Subject.Element> {
        typealias ConnectionType = Connection<Subject>
    
        fileprivate let _source: Observable<Subject.Observer.Element>
        fileprivate let _makeSubject: () -> Subject
    
        fileprivate let _lock = RecursiveLock()
        fileprivate var _subject: Subject?
    
        // state
        fileprivate var _connection: ConnectionType?
    
        init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
            self._source = source
            self._makeSubject = makeSubject
            self._subject = nil
            self._connection = nil
        }
    
        override func connect() -> Disposable {
            return self._lock.calculateLocked {
                if let connection = self._connection {
                    return connection
                }
    
                let singleAssignmentDisposable = SingleAssignmentDisposable()
                let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
                self._connection = connection
                let subscription = self._source.subscribe(connection)
                singleAssignmentDisposable.setDisposable(subscription)
                return connection
            }
        }
    
        fileprivate var lazySubject: Subject {
            if let subject = self._subject {
                return subject
            }
    
            let subject = self._makeSubject()
            self._subject = subject
            return subject
        }
    //自己重写一个subscribe
        override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
            return self.lazySubject.subscribe(observer) 
        }
    }
    

    ConnectableObservableAdapter序列继承的是ConnectableObservable,并没有继承produce,没有继承produce??是的,问题很严重,因为没有produce,意味着就没有subscribe函数的具体实现,继承ConnectableObservable我们只有一个subscribe的抽象函数,怎么办?答案是自己重写一个subscribe.

    先总结一下序列的创建:

    1. 首先是正常的序列创建create,创建的序列保存了闭包1️⃣;
    2. 然后通过.publish(),得到一个ConnectableObservableAdapter序列,也就是我们的netOB
    3. netOB序列,没有继承produce,自己重写了subscribe
    4. self.lazySubject.subscribe(observer) 中的序列self.lazySubject是懒加载的,意味着每次执行重写的subscribe,调用的是同一个序列。

    然后就是订阅了,订阅---->创建一个观察者observer,保存闭包---->然后subscribe,是调用的重写的self.lazySubject.subscribe方法,最后走的是PublishSubjectsubscribe

     public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
            self._lock.lock()
            let subscription = self._synchronized_subscribe(observer)
            self._lock.unlock()
            return subscription
        }
    
        func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
            if let stoppedEvent = self._stoppedEvent {
                observer.on(stoppedEvent)
                return Disposables.create()
            }
            
            if self._isDisposed {
                observer.on(.error(RxError.disposed(object: self)))
                return Disposables.create()
            }
            
            let key = self._observers.insert(observer.on)
            return SubscriptionDisposable(owner: self, key: key)
        }
    
    

    到这里,其实只做了两件事情:

    1. let key = self._observers.insert(observer.on)---->观察者回调的收集,这样就可以等接收到信号发送后统一响应,而且self._lock.lock()self._lock.unlock()的处理还能保证响应的执行顺序不会打乱;
    2. return SubscriptionDisposable(owner: self, key: key)---->垃圾袋的处理,这个先不管。
      说明:因为没有对应sink.on方法的执行,或者说是没有具体闭包1️⃣的调用,也可以理解为是没有消息发送onNext,就是create保存的闭包没法调用。解决的方法就是调用_ = netOB.connect().大家可以尝试一下,如果只写.publish()不写_ = netOB.connect(),是得不到如何的回调的,没有任何的打印出现。

    下面讲解connect()方法:netOB.connect()就是序列 ConnectableObservableAdapter内的connect方法,上面已经贴出来了,这里再放一次代码:

    override func connect() -> Disposable {
            return self._lock.calculateLocked {
                if let connection = self._connection {
                    return connection
                }
    
                let singleAssignmentDisposable = SingleAssignmentDisposable()
                let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
                self._connection = connection
                let subscription = self._source.subscribe(connection)
                singleAssignmentDisposable.setDisposable(subscription)
                return connection
            }
        }
    

    let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)这个方法,内部是这样的:

    final private class Connection<Subject: SubjectType>: ObserverType, Disposable {
       ...
    init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
            self._parent = parent
            self._subscription = subscription
            self._lock = lock
            self._subjectObserver = subjectObserver
        }
        func on(_ event: Event<Subject.Observer.Element>) {
            if isFlagSet(self._disposed, 1) {
                return
            }
            if event.isStopEvent {
                self.dispose()
            }
            self._subjectObserver.on(event)
        }
    ...
    }
    

    传过来的观察者是 self.lazySubject.asObserver()创建的,self.lazySubject只有一个,意味着观察者只有一个,然后实现了on方法,on方法如下:

     public func on(_ event: Event<Element>) {
            #if DEBUG
                self._synchronizationTracker.register(synchronizationErrorMessage: .default)
                defer { self._synchronizationTracker.unregister() }
            #endif
            dispatch(self._synchronized_on(event), event)
        }
    
        func _synchronized_on(_ event: Event<Element>) -> Observers {
            self._lock.lock(); defer { self._lock.unlock() }
            switch event {
            case .next:
                if self._isDisposed || self._stopped {
                    return Observers()
                }
                
                return self._observers
            case .completed, .error:
                if self._stoppedEvent == nil {
                    self._stoppedEvent = event
                    self._stopped = true
                    let observers = self._observers
                    self._observers.removeAll()
                    return observers
                }
    
                return Observers()
            }
        }
    

    后面的逻辑就是和RXSwift一模模一样样的了,就不再赘叙了。

    最后的思考:

    1. 为什么publish能实现只网络请求一次?因为观察者只有一个,所以对闭包1️⃣的调用只有一次;(类似于有多个block实现,但是我在调用了一次self.block();
      详细点解释就是我们通过self._subjectObserver.on(event)发起了响应--->接收的是有netOB.subscribe创建保存的两个闭包,so。
    2. 如果不用懒加载lazySubject行不行?答案是可以的,只是用了更好,为啥子,自己探究下🙃。
    3. publishconnetc被多次调用会有什么影响?(懒加载起到了什么作用,connect创建流程分析就可以知道答案了)
    4. 为什么publish新创建的序列是publishSubject类型?见后面博客Subject的分析。

    相关文章

      网友评论

          本文标题:07RXSwift的publish状态共享

          本文链接:https://www.haomeiwen.com/subject/eppdjctx.html