Publish的作用,它可以在序列被订阅后并不会收到消息,只有经过connect后,才会收到消息,并且在被多次订阅后subscribeHandler只会被执行一次,这个是非常实用的,比如在subscribeHandler中发送请求,拿到数据后,发送消息,如果不用publish处理,会发送多次请求,浪费带宽,设计也不合理,这时祭出Publish,万事解决,还有谁,下面详细了解下它的使用与源码解析
使用
let ob = Observable<String>.create { (observe) -> Disposable in
print("我来了")
observe.onNext("h")
return Disposables.create{
print("Observable Disposables")
}
}.publish()
_ = ob.subscribe(onNext: {
print("订阅1:\($0)")
}).disposed(by: disposeBag)
_ = ob.subscribe(onNext: {
print("订阅2:\($0)")
}).disposed(by: disposeBag)
_ = ob.connect()
打印
我来了
订阅1:h
订阅2:h
源码解析
//Multicast.swift
extension ObservableType {
public func publish() -> ConnectableObservable<Element> {
return self.multicast { PublishSubject() }
}
}
调用multicast方法,并传入一个PublishSubject
//Multicast.swift
extension ObservableType {
public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
-> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
}
}
创建内部类ConnectableObservableAdapter时,传入source-源序列和makeSubject-前面闭包中生成的PublishSuject
final private class ConnectableObservableAdapter<Subject: SubjectType>
: ConnectableObservable<Subject.Element> {
typealias ConnectionType = Connection<Subject>
fileprivate let _source: Observable<Subject.Observer.Element>
fileprivate let _makeSubject: () -> Subject
fileprivate let _lock = RecursiveLock()
fileprivate var _subject: Subject?
// state
fileprivate var _connection: ConnectionType?
init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
self._source = source //源序列
self._makeSubject = makeSubject //前面用来创建PublishSubject的闭包
self._subject = nil
self._connection = nil
}
override func connect() -> Disposable {
return self._lock.calculateLocked {
if let connection = self._connection { //判断是否已经进行过订阅
return connection
}
let singleAssignmentDisposable = SingleAssignmentDisposable()
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
self._connection = connection
let subscription = self._source.subscribe(connection)
singleAssignmentDisposable.setDisposable(subscription)
return connection
}
}
fileprivate var lazySubject: Subject {
if let subject = self._subject {
return subject
}
let subject = self._makeSubject()
self._subject = subject
return subject
}
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
return self.lazySubject.subscribe(observer)
}
}
ConnectableObservableAdapter中重要点
- 声明了一个lazySubject,这个属性只会初始化一次,它的值就是_makeSubject闭包返回的PublishSubject
- 实现了subscribe方法,调用lazySubject.subscribe(observer)
- 实现了connect方法,方法内部创建了Connection对象,然后调用_source.subscribe(connection),也就是源序列的subscribe订阅方法,并传入connection
//Multicast.swift
final private class Connection<Subject: SubjectType>: ObserverType, Disposable {
typealias Element = Subject.Observer.Element
private var _lock: RecursiveLock
// state
private var _parent: ConnectableObservableAdapter<Subject>?
private var _subscription : Disposable?
private var _subjectObserver: Subject.Observer
private let _disposed = AtomicInt(0)
init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
self._parent = parent
self._subscription = subscription
self._lock = lock
self._subjectObserver = subjectObserver
}
func on(_ event: Event<Subject.Observer.Element>) {
if isFlagSet(self._disposed, 1) {
return
}
if event.isStopEvent {
self.dispose()
}
self._subjectObserver.on(event)
}
...
}
在调用connect方法时,会创建内部类Connection,以备后用,这个类中主要实现了on方法,on方法调用makeSubject的on方法
除PublishSubject和Observable序列处理逻辑外,剩下的关键代码已经罗列出来,剩下就来梳理下完整的流程
- 创建序列,生成内部类ConnectableObservableAdapter,保留源序列_source,通过muticast传入闭包生成序列_makeSubject,声明属性lazySubject,保证_makeSubject只创建一次,并实现subscribe、connect方法
- 订阅序列,调用1中subscribe,然后执行PublishSubject订阅流程
-
执行connect,创建内部类Connection,然后执行源序列_source的subscribe方法并传入内部类对象Connection。接下来会执行源序列的订阅流程,执行_subscribeHandler闭包,闭包中会执行onNext方法,接下来走到源序列的subscribe闭包中,也就是Connection中,接下来执行Connection的on方法,进而执行lazySubject的on方法,最终执行到lazySubject的subscribe闭包方法,结束一个完整流程
接下来依旧来看下思维导图
Publish
生活如此美好,今天就点到为止。。。
网友评论