问题:如果一个网络请求的数据被多个地方多次使用,我们会进行多次订阅,结果会造成多次的网络请求,代码如下:
let netOB = Observable<Any>.create { (observer) -> Disposable in
sleep(2)// 模拟网络延迟
print("我开始请求网络了")
observer.onNext("请求到的网络数据")
observer.onNext("请求到的本地")
observer.onCompleted()
return Disposables.create {
print("销毁回调了")
}
}
.publish()
netOB.subscribe(onNext: { (anything) in
print("订阅1:",anything)
})
.disposed(by: disposeBag)
// 我们有时候不止一次网络订阅,因为有时候我们的数据可能用在不同的额地方
// 所以在订阅一次 会出现什么问题?
netOB.subscribe(onNext: { (anything) in
print("订阅2:",anything)
})
.disposed(by: disposeBag)
_ = netOB.connect()
如果没有添加 .publish() 和 _ = netOB.connect(),则网络请求data序列的创建会执行两次,而这是我们不需要的。
publish是如何做到网络共享的呢?且看源码分析:
首先,因为肯定会用到RXSwift的核心逻辑,这里放一张RXSwift的核心逻辑的思维导图以供参考:
RXSwift核心逻辑简化图.png
ok,上源码:
----------------------------publish进来---------------------------------
//封装了一个multicast
public func publish() -> ConnectableObservable<Element> {
return self.multicast { PublishSubject() }
}
public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
-> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
}
----------------------------核心方法,老长了---------------------------------
final private class ConnectableObservableAdapter<Subject: SubjectType>
: ConnectableObservable<Subject.Element> {
typealias ConnectionType = Connection<Subject>
fileprivate let _source: Observable<Subject.Observer.Element>
fileprivate let _makeSubject: () -> Subject
fileprivate let _lock = RecursiveLock()
fileprivate var _subject: Subject?
// state
fileprivate var _connection: ConnectionType?
init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
self._source = source
self._makeSubject = makeSubject
self._subject = nil
self._connection = nil
}
override func connect() -> Disposable {
return self._lock.calculateLocked {
if let connection = self._connection {
return connection
}
let singleAssignmentDisposable = SingleAssignmentDisposable()
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
self._connection = connection
let subscription = self._source.subscribe(connection)
singleAssignmentDisposable.setDisposable(subscription)
return connection
}
}
fileprivate var lazySubject: Subject {
if let subject = self._subject {
return subject
}
let subject = self._makeSubject()
self._subject = subject
return subject
}
//自己重写一个subscribe
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
return self.lazySubject.subscribe(observer)
}
}
ConnectableObservableAdapter
序列继承的是ConnectableObservable
,并没有继承produce
,没有继承produce
??是的,问题很严重,因为没有produce
,意味着就没有subscribe
函数的具体实现,继承ConnectableObservable
我们只有一个subscribe
的抽象函数,怎么办?答案是自己重写一个subscribe
.
先总结一下序列的创建:
- 首先是正常的序列创建
create
,创建的序列保存了闭包1️⃣;- 然后通过
.publish()
,得到一个ConnectableObservableAdapter
序列,也就是我们的netOB
;netOB
序列,没有继承produce
,自己重写了subscribe
;self.lazySubject.subscribe(observer)
中的序列self.lazySubject
是懒加载的,意味着每次执行重写的subscribe
,调用的是同一个序列。
然后就是订阅了,订阅
---->创建一个观察者observer,保存闭包
---->然后subscribe
,是调用的重写的self.lazySubject.subscribe
方法,最后走的是PublishSubject
的subscribe
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self._lock.lock()
let subscription = self._synchronized_subscribe(observer)
self._lock.unlock()
return subscription
}
func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if let stoppedEvent = self._stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
if self._isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
到这里,其实只做了两件事情:
let key = self._observers.insert(observer.on)
---->观察者回调的收集,这样就可以等接收到信号发送后统一响应,而且self._lock.lock()
与self._lock.unlock()
的处理还能保证响应的执行顺序不会打乱;return SubscriptionDisposable(owner: self, key: key)
---->垃圾袋的处理,这个先不管。
说明:因为没有对应sink.on
方法的执行,或者说是没有具体闭包1️⃣的调用,也可以理解为是没有消息发送onNext
,就是create
保存的闭包没法调用。解决的方法就是调用_ = netOB.connect()
.大家可以尝试一下,如果只写.publish()
不写_ = netOB.connect()
,是得不到如何的回调的,没有任何的打印出现。
下面讲解connect()
方法:netOB.connect()
就是序列 ConnectableObservableAdapter
内的connect
方法,上面已经贴出来了,这里再放一次代码:
override func connect() -> Disposable {
return self._lock.calculateLocked {
if let connection = self._connection {
return connection
}
let singleAssignmentDisposable = SingleAssignmentDisposable()
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
self._connection = connection
let subscription = self._source.subscribe(connection)
singleAssignmentDisposable.setDisposable(subscription)
return connection
}
}
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
这个方法,内部是这样的:
final private class Connection<Subject: SubjectType>: ObserverType, Disposable {
...
init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
self._parent = parent
self._subscription = subscription
self._lock = lock
self._subjectObserver = subjectObserver
}
func on(_ event: Event<Subject.Observer.Element>) {
if isFlagSet(self._disposed, 1) {
return
}
if event.isStopEvent {
self.dispose()
}
self._subjectObserver.on(event)
}
...
}
传过来的观察者是 self.lazySubject.asObserver()
创建的,self.lazySubject
只有一个,意味着观察者只有一个,然后实现了on
方法,on
方法如下:
public func on(_ event: Event<Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
dispatch(self._synchronized_on(event), event)
}
func _synchronized_on(_ event: Event<Element>) -> Observers {
self._lock.lock(); defer { self._lock.unlock() }
switch event {
case .next:
if self._isDisposed || self._stopped {
return Observers()
}
return self._observers
case .completed, .error:
if self._stoppedEvent == nil {
self._stoppedEvent = event
self._stopped = true
let observers = self._observers
self._observers.removeAll()
return observers
}
return Observers()
}
}
后面的逻辑就是和RXSwift
一模模一样样的了,就不再赘叙了。
最后的思考:
- 为什么
publish
能实现只网络请求一次?因为观察者只有一个,所以对闭包1️⃣的调用只有一次;(类似于有多个block
实现,但是我在调用了一次self.block()
;
详细点解释就是我们通过self._subjectObserver.on(event)
发起了响应--->接收的是有netOB.subscribe
创建保存的两个闭包
,so。- 如果不用
懒加载lazySubject
行不行?答案是可以的,只是用了更好,为啥子,自己探究下🙃。publish
和connetc
被多次调用会有什么影响?(懒加载起到了什么作用,connect创建流程分析就可以知道答案了)- 为什么
publish
新创建的序列是publishSubject
类型?见后面博客Subject
的分析。
网友评论