publish使用
let subject = PublishSubject<Any>()
subject.subscribe{print("00:\($0)")}
.disposed(by: disposeBag)
let netOB = Observable<Any>.create { (observer) -> Disposable in
sleep(2)// 模拟网络延迟
print("我开始请求网络了")
observer.onNext("请求到的网络数据")
observer.onNext("请求到的本地")
observer.onCompleted()
return Disposables.create {
print("销毁回调了")
}
}.publish()
netOB.subscribe(onNext: { (anything) in
print("订阅1:",anything)
})
.disposed(by: disposeBag)
// 我们有时候不止一次网络订阅,因为有时候我们的数据可能用在不同的额地方
// 所以在订阅一次 会出现什么问题?
netOB.subscribe(onNext: { (anything) in
print("订阅2:",anything)
})
.disposed(by: disposeBag)
_ = netOB.connect()
/*打印结果:
我开始请求网络了
订阅1: 请求到的网络数据
订阅2: 请求到的网络数据
订阅1: 请求到的本地
订阅2: 请求到的本地
销毁回调了
*/
我们看到网络只会请求一次。这种请求一次,订阅到多个不同的地方的场景很多。所以我们有必要了解一下publish
是怎么实现的。
探索publish
的源码,还是从RxSwift的流程:创建序列,订阅序列,发送响应入手分析。
publish的序列创建
public func publish() -> ConnectableObservable<Element> {
return self.multicast { PublishSubject() }
}
我们看到publish
实际返回的是multicast
public func multicast<Subject: SubjectType>(_ subject: Subject)
-> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: { subject })
}
返回类型为ConnectableObservableAdapter
init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
self._source = source
self._makeSubject = makeSubject
self._subject = nil
self._connection = nil
}
1.保存源序列_source
2.保存初始化序列PublishSubject()
至_makeSubject
序列的订阅
当序列订阅时,调用ConnectableObservableAdapter
的subscribe(observer)
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
return self.lazySubject.subscribe(observer)
}
这里订阅lazySubject
序列,这是一个懒加载方式,每次订阅的都是序列_subject
,即创建中保存的_makeSubject
。
fileprivate var lazySubject: Subject {
if let subject = self._subject {
return subject
}
let subject = self._makeSubject()
self._subject = subject
return subject
}
订阅_subject
时,将会调用PublishSubject.subscribe(observer)
let subscription = self._synchronized_subscribe(observer)
调用_synchronized_subscribe(observer)
func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if let stoppedEvent = self._stoppedEvent {
observer.on(stoppedEvent)
return Disposables.create()
}
if self._isDisposed {
observer.on(.error(RxError.disposed(object: self)))
return Disposables.create()
}
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
}
调用self._observers.insert(observer.on)
mutating func insert(_ element: T) -> BagKey {
let key = _nextKey
_nextKey = BagKey(rawValue: _nextKey.rawValue &+ 1)
if _key0 == nil {
_key0 = key
_value0 = element
return key
}
_onlyFastPath = false
if _dictionary != nil {
_dictionary![key] = element
return key
}
if _pairs.count < arrayDictionaryMaxSize {
_pairs.append((key: key, value: element))
return key
}
_dictionary = [key: element]
return key
}
publish
订阅时,将observer.on
的回调方法保存起来,如果是首次订阅将key
保存在_key0
,observer.on
保存在_value0
,不是首次保存订阅,那么就保存在_dictionary
中。
key
是BagKey
类型:
struct BagKey {
fileprivate let rawValue: UInt64
}
publish
将所有订阅的观察者回调方法保存起来,以备后续发起响应时,调用所有观察者的回调方法。
发送响应
使用connect
发送响应
override func connect() -> Disposable {
return self._lock.calculateLocked {
if let connection = self._connection {
return connection
}
let singleAssignmentDisposable = SingleAssignmentDisposable()
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
self._connection = connection
let subscription = self._source.subscribe(connection)
singleAssignmentDisposable.setDisposable(subscription)
return connection
}
}
self._lock.calculateLocked
加了递归锁
final func calculateLocked<T>(_ action: () -> T) -> T {
self.lock(); defer { self.unlock() }
return action()
}
然后执行action()
,就是下列代码:
if let connection = self._connection {
return connection
}
let singleAssignmentDisposable = SingleAssignmentDisposable()
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
self._connection = connection
let subscription = self._source.subscribe(connection)
singleAssignmentDisposable.setDisposable(subscription)
return connection
1.保证只有一个_connection
,当_connection
存在,直接返回_connection
。保证下列的原序列_source
永远只被订阅一次,那么外界网络请求的闭包就只会执行一次。
2.当_connection
为nil
时,会创建
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
self._connection = connection
let subscription = self._source.subscribe(connection)
(1)创建Connection
init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
self._parent = parent
self._subscription = subscription
self._lock = lock
self._subjectObserver = subjectObserver
}
保存ConnectableObservableAdapter
到_parent
,self.lazySubject.asObserver()
保存至_subscription
(2)保存Connection
至_connection
(3)订阅源序列_source
,connection
是观察者. 将会来到AnonymousObservable.run
,然后再到sink.run
,最终会调用到_subscribeHandler
执行到外界的闭包中。
在外界闭包中,observer.onNext("请求到的网络数据")
发起响应,原source
的观察者是Connection
,Connection.on
func on(_ event: Event<Subject.Observer.Element>) {
if isFlagSet(self._disposed, 1) {
return
}
if event.isStopEvent {
self.dispose()
}
self._subjectObserver.on(event)
}
Connection.on
调用self._subjectObserver.on(event)
self._subjectObserver
就是publish
创建时保存的那个PublishSubject()
self._subjectObserver.on(event)
等同于PublishSubject.on(event)
public func on(_ event: Event<Element>) {
#if DEBUG
self._synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self._synchronizationTracker.unregister() }
#endif
dispatch(self._synchronized_on(event), event)
}
1.调用_synchronized_on
func _synchronized_on(_ event: Event<Element>) -> Observers {
self._lock.lock(); defer { self._lock.unlock() }
switch event {
case .next:
if self._isDisposed || self._stopped {
return Observers()
}
return self._observers
case .completed, .error:
if self._stoppedEvent == nil {
self._stoppedEvent = event
self._stopped = true
let observers = self._observers
self._observers.removeAll()
return observers
}
return Observers()
}
}
_synchronized_on
中返回self._observers
,即订阅时保存的所有的observer.on
2.dispatch(self._synchronized_on(event), event)
调用dispatch
func dispatch<Element>(_ bag: Bag<(Event<Element>) -> Void>, _ event: Event<Element>) {
bag._value0?(event)
if bag._onlyFastPath {
return
}
let pairs = bag._pairs
for i in 0 ..< pairs.count {
pairs[i].value(event)
}
if let dictionary = bag._dictionary {
for element in dictionary.values {
element(event)
}
}
}
bag._value0?(event)
首先调用第一个订阅者的回调方法。
然后循环执行_pairs
、_dictionary
中所有保存的回调方法。
以上就是publish
所有流程的源码解析。
总结:
1.publish
创建,实际返回是ConnectableObservableAdapter
序列,保存了源序列,并且创建保存PublishSubject()
序列。
2.publish
订阅时,订阅的是中间层ConnectableObservableAdapter
的PublishSubject()
序列,并保存所有的回调方法observer.on
3.connect
时,实际上是订阅了源序列,观察者为我们自己创建的Connection
对象,这个时候将会来到创建源序列时的闭包,我们就是在这个闭包中请求网络.
4.给源序列发送响应时,实际上会来到观察者Connection.on
方法,Connection
中将会对订阅时保存的_observers
一一执行。
以上就是publish
序列的所有流程。
网友评论