美文网首页
07RXSwift的publish状态共享

07RXSwift的publish状态共享

作者: 越来越胖了 | 来源:发表于2019-08-10 23:17 被阅读0次

问题:如果一个网络请求的数据被多个地方多次使用,我们会进行多次订阅,结果会造成多次的网络请求,代码如下:

 let netOB = Observable<Any>.create { (observer) -> Disposable in
            sleep(2)// 模拟网络延迟
            print("我开始请求网络了")
            observer.onNext("请求到的网络数据")
            observer.onNext("请求到的本地")
            observer.onCompleted()
            return Disposables.create {
                print("销毁回调了")
            }
            }
            .publish()
        
        netOB.subscribe(onNext: { (anything) in
            print("订阅1:",anything)
        })
            
            .disposed(by: disposeBag)
        
        // 我们有时候不止一次网络订阅,因为有时候我们的数据可能用在不同的额地方
        // 所以在订阅一次 会出现什么问题?
        netOB.subscribe(onNext: { (anything) in
            print("订阅2:",anything)
        })
            .disposed(by: disposeBag)
         _ = netOB.connect()

如果没有添加 .publish() 和 _ = netOB.connect(),则网络请求data序列的创建会执行两次,而这是我们不需要的。

publish是如何做到网络共享的呢?且看源码分析:

首先,因为肯定会用到RXSwift的核心逻辑,这里放一张RXSwift的核心逻辑的思维导图以供参考:


RXSwift核心逻辑简化图.png

ok,上源码:

----------------------------publish进来---------------------------------
//封装了一个multicast
 public func publish() -> ConnectableObservable<Element> {
        return self.multicast { PublishSubject() }
    }

public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
        -> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
        return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
    }
----------------------------核心方法,老长了---------------------------------
final private class ConnectableObservableAdapter<Subject: SubjectType>
    : ConnectableObservable<Subject.Element> {
    typealias ConnectionType = Connection<Subject>

    fileprivate let _source: Observable<Subject.Observer.Element>
    fileprivate let _makeSubject: () -> Subject

    fileprivate let _lock = RecursiveLock()
    fileprivate var _subject: Subject?

    // state
    fileprivate var _connection: ConnectionType?

    init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
        self._source = source
        self._makeSubject = makeSubject
        self._subject = nil
        self._connection = nil
    }

    override func connect() -> Disposable {
        return self._lock.calculateLocked {
            if let connection = self._connection {
                return connection
            }

            let singleAssignmentDisposable = SingleAssignmentDisposable()
            let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
            self._connection = connection
            let subscription = self._source.subscribe(connection)
            singleAssignmentDisposable.setDisposable(subscription)
            return connection
        }
    }

    fileprivate var lazySubject: Subject {
        if let subject = self._subject {
            return subject
        }

        let subject = self._makeSubject()
        self._subject = subject
        return subject
    }
//自己重写一个subscribe
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
        return self.lazySubject.subscribe(observer) 
    }
}

ConnectableObservableAdapter序列继承的是ConnectableObservable,并没有继承produce,没有继承produce??是的,问题很严重,因为没有produce,意味着就没有subscribe函数的具体实现,继承ConnectableObservable我们只有一个subscribe的抽象函数,怎么办?答案是自己重写一个subscribe.

先总结一下序列的创建:

  1. 首先是正常的序列创建create,创建的序列保存了闭包1️⃣;
  2. 然后通过.publish(),得到一个ConnectableObservableAdapter序列,也就是我们的netOB
  3. netOB序列,没有继承produce,自己重写了subscribe
  4. self.lazySubject.subscribe(observer) 中的序列self.lazySubject是懒加载的,意味着每次执行重写的subscribe,调用的是同一个序列。

然后就是订阅了,订阅---->创建一个观察者observer,保存闭包---->然后subscribe,是调用的重写的self.lazySubject.subscribe方法,最后走的是PublishSubjectsubscribe

 public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        self._lock.lock()
        let subscription = self._synchronized_subscribe(observer)
        self._lock.unlock()
        return subscription
    }

    func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        if let stoppedEvent = self._stoppedEvent {
            observer.on(stoppedEvent)
            return Disposables.create()
        }
        
        if self._isDisposed {
            observer.on(.error(RxError.disposed(object: self)))
            return Disposables.create()
        }
        
        let key = self._observers.insert(observer.on)
        return SubscriptionDisposable(owner: self, key: key)
    }

到这里,其实只做了两件事情:

  1. let key = self._observers.insert(observer.on)---->观察者回调的收集,这样就可以等接收到信号发送后统一响应,而且self._lock.lock()self._lock.unlock()的处理还能保证响应的执行顺序不会打乱;
  2. return SubscriptionDisposable(owner: self, key: key)---->垃圾袋的处理,这个先不管。
    说明:因为没有对应sink.on方法的执行,或者说是没有具体闭包1️⃣的调用,也可以理解为是没有消息发送onNext,就是create保存的闭包没法调用。解决的方法就是调用_ = netOB.connect().大家可以尝试一下,如果只写.publish()不写_ = netOB.connect(),是得不到如何的回调的,没有任何的打印出现。

下面讲解connect()方法:netOB.connect()就是序列 ConnectableObservableAdapter内的connect方法,上面已经贴出来了,这里再放一次代码:

override func connect() -> Disposable {
        return self._lock.calculateLocked {
            if let connection = self._connection {
                return connection
            }

            let singleAssignmentDisposable = SingleAssignmentDisposable()
            let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
            self._connection = connection
            let subscription = self._source.subscribe(connection)
            singleAssignmentDisposable.setDisposable(subscription)
            return connection
        }
    }

let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)这个方法,内部是这样的:

final private class Connection<Subject: SubjectType>: ObserverType, Disposable {
   ...
init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
        self._parent = parent
        self._subscription = subscription
        self._lock = lock
        self._subjectObserver = subjectObserver
    }
    func on(_ event: Event<Subject.Observer.Element>) {
        if isFlagSet(self._disposed, 1) {
            return
        }
        if event.isStopEvent {
            self.dispose()
        }
        self._subjectObserver.on(event)
    }
...
}

传过来的观察者是 self.lazySubject.asObserver()创建的,self.lazySubject只有一个,意味着观察者只有一个,然后实现了on方法,on方法如下:

 public func on(_ event: Event<Element>) {
        #if DEBUG
            self._synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self._synchronizationTracker.unregister() }
        #endif
        dispatch(self._synchronized_on(event), event)
    }

    func _synchronized_on(_ event: Event<Element>) -> Observers {
        self._lock.lock(); defer { self._lock.unlock() }
        switch event {
        case .next:
            if self._isDisposed || self._stopped {
                return Observers()
            }
            
            return self._observers
        case .completed, .error:
            if self._stoppedEvent == nil {
                self._stoppedEvent = event
                self._stopped = true
                let observers = self._observers
                self._observers.removeAll()
                return observers
            }

            return Observers()
        }
    }

后面的逻辑就是和RXSwift一模模一样样的了,就不再赘叙了。

最后的思考:

  1. 为什么publish能实现只网络请求一次?因为观察者只有一个,所以对闭包1️⃣的调用只有一次;(类似于有多个block实现,但是我在调用了一次self.block();
    详细点解释就是我们通过self._subjectObserver.on(event)发起了响应--->接收的是有netOB.subscribe创建保存的两个闭包,so。
  2. 如果不用懒加载lazySubject行不行?答案是可以的,只是用了更好,为啥子,自己探究下🙃。
  3. publishconnetc被多次调用会有什么影响?(懒加载起到了什么作用,connect创建流程分析就可以知道答案了)
  4. 为什么publish新创建的序列是publishSubject类型?见后面博客Subject的分析。

相关文章

网友评论

      本文标题:07RXSwift的publish状态共享

      本文链接:https://www.haomeiwen.com/subject/eppdjctx.html