publish
一般都和connect
连用。一般用于网络请求序列,有多次订阅的时候,只是进行一次网络请求。
如下:
let netOB = Observable<Any>.create { (observer) -> Disposable in
sleep(2)// 模拟网络延迟
print("我开始请求网络了")
observer.onNext("请求到的网络数据")
observer.onNext("请求到的本地")
observer.onCompleted()
return Disposables.create {
print("销毁回调了")
}
}
.publish()
// 走的协议ObservableType的subscribe方法
netOB.subscribe(onNext: { (anything) in
print("订阅1:",anything)
})
.disposed(by: disposeBag)
netOB.subscribe(onNext: { (anything) in
print("订阅2:",anything)
})
.disposed(by: disposeBag)
_ = netOB.connect()
打印结果:
我开始请求网络了
订阅1: 请求到的网络数据
订阅2: 请求到的网络数据
订阅1: 请求到的本地
订阅2: 请求到的本地
销毁回调了
如果注释掉publish
和connect
,打印结果如下:
我开始请求网络了
订阅1: 请求到的网络数据
订阅1: 请求到的本地
销毁回调了
我开始请求网络了
订阅2: 请求到的网络数据
订阅2: 请求到的本地
销毁回调了
分析
从上面打印可以看出来:
-
当注释掉
publish
和connect
的时候,会有两次网络请求,并且当一个订阅1
都响应了两个事件的结束过后,订阅2
才开始请求网络并响应了两个事件。 -
如果只是注释掉
connect
就没有打印(后面分析知道是源序列没有发送事件)。 -
当都不注释
publish
和connect
的时候,只会进行一次网络请求,并且发送的事件会依次响应给订阅1
和订阅2
,然后下一个事件再依次响应给订阅1
和订阅2
。
我们继续还是通过代码的流程来进行分析:
1、肯定首先来到源序列的创建create
方法中,初始化一个可观察序列AnonymousObservable
,并保存闭包。
init(_ subscribeHandler: @escaping SubscribeHandler) {
self._subscribeHandler = subscribeHandler
}
2、来到publish
方法中
/**
- returns: A connectable observable sequence that shares a single subscription to the underlying sequence.
*/
public func publish() -> ConnectableObservable<Element> {
return self.multicast { PublishSubject() }
}
传了一个参数PublishSubject()
过去,并返回了一个multicast
可连接可观察序列(共享对基础序列的单个订阅。分析后面知道,它的意思是对源序列订阅一次,其响应结果可被传递到后面的多个订阅)
3、来到multicast
的实现
public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
-> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
}
传进去两个参数:source
即源序列,makeSubject
即PublishSubject()
。
返回一个ConnectableObservableAdapter
序列
4、来到ConnectableObservableAdapter
的初始化方法
final private class ConnectableObservableAdapter<Subject: SubjectType>
: ConnectableObservable<Subject.Element> {
init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
self._source = source // 保存源序列,即调用publish方法的那个序列
self._makeSubject = makeSubject // 保存了初始化的时候,传进来的PublishSubject()
self._subject = nil // 用于保存subject,初始化为nil
self._connection = nil // 用于保存connection,初始化为nil
}
}
保存了传进来的两个参数source
和makeSubject
。注意ConnectableObservableAdapter
作为中间层,并没有直接或者间接继承自Producer
5、初始化完成过后,来的最外层的订阅:订阅1
// 走的协议ObservableType的subscribe方法
netOB.subscribe(onNext: { (anything) in
print("订阅1:",anything)
})
.disposed(by: disposeBag)
- 这里的
netOB
的实际类型其实是ConnectableObservableAdapter
。 - 订阅就来到
ObservableType
协议subscribe
的实现,在这里面创建了一个观察者AnonymousObserver
,并保存了事件处理闭包eventHandler
- 最后来到
self.asObservable().subscribe(observer)
这句,因为ConnectableObservableAdapter
没有继承Producer
,所以就只有走自己实现的subscribe
方法中去。
这里的
subscribe
并不像以前一样走到Producer
的subscribe
方法,然后走到sink
的run
方法,然后再走到源序列初始化的闭包_subscribeHandler
里面,走到发送事件流程。
6、来到ConnectableObservableAdapter
自己实现的subscribe
方法
fileprivate var lazySubject: Subject {
if let subject = self._subject { // 以后永远都是同一个subject,即lazySubject
return subject
}
let subject = self._makeSubject() // 第一次进来,进行赋值
self._subject = subject
return subject
}
// 没有间接继承Producer, 就只有自己实现subscribe
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
// 每次都是lazySubject订阅的观察者,即外面有多少subscribe就会来这里多少次
// 这里的观察者,是保存外面订阅处理事件闭包eventHandler的AnonymousObserver
return self.lazySubject.subscribe(observer)
}
- 这里的订阅方法中有一个
self.lazySubject
,看上面代码可以知道这是一个懒加载,而且被赋值一次过后,以后再进来不会再被赋值,也就是说始终都是:makeSubject
即PublishSubject()
-
ConnectableObservableAdapter
的subscribe
,即是PublishSubject()
的subscribe
方法 - 这里的观察者,是保存外面订阅处理事件闭包
eventHandler
的AnonymousObserver
7、来到PublishSubject()
的subscribe
方法
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
self._lock.lock()
let subscription = self._synchronized_subscribe(observer)
self._lock.unlock()
return subscription
}
加了锁,保证了订阅观察者的顺序,来看_synchronized_subscribe
方法里面的核心代码:
let key = self._observers.insert(observer.on)
return SubscriptionDisposable(owner: self, key: key)
- 保存了
observer.on
方法,这里的observer
就是外面传进来的AnonymousObserver
,即每次订阅的事件处理方法 - 点
insert
进去看,知道,里面是用字典存储的 - 这里直接返回了
SubscriptionDisposable
,没有进行到发送事件,即没有走到源序列创建是闭包中的onNext
方法,一般情况下这里都是到发送事件 - 所以整个订阅流程只是将订阅创建的
AnonymousObserver.on
保存在PublishSubject()
的字典_observers
中,并没有走到事件发送。
8、接下来来到最外层的第二个订阅:订阅2
netOB.subscribe(onNext: { (anything) in
print("订阅2:",anything)
})
.disposed(by: disposeBag)
同订阅1
一样,最后也只是保存订阅2
中创建的AnonymousObserver.on
保存在PublishSubject()
的字典_observers
中,还并没有走到事件发送。
这时我们会好奇,那到底什么时候发生事件呢?这时就来到下面这重要的一步
9、来到最外层的connect
方法
_ = netOB.connect()
即ConnectableObservableAdapter
的connect
方法:
override func connect() -> Disposable {
return self._lock.calculateLocked {
// 避免外界多次调用connect方法的时候,返回不同的connection
if let connection = self._connection {
return connection
}
let singleAssignmentDisposable = SingleAssignmentDisposable()
let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
self._connection = connection
// connection是观察者,始终都是同一个观察者
// 源序列订阅connection,最后走到connection.on方法
let subscription = self._source.subscribe(connection)
singleAssignmentDisposable.setDisposable(subscription)
return connection
}
}
- 和之前保存的
_connection
判断,没有就创建,保证就算多次调用connect
,也始终返回同一个connection
- 这里先走
connection
的初始化,里面保存了PublishSubject()
init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
self._parent = parent // ConnectableObservableAdapter
self._subscription = subscription
self._lock = lock
self._subjectObserver = subjectObserver // self.lazySubject.asObserver() 即 PublishSubject()
}
- 然后又来到最关键的一步
let subscription = self._source.subscribe(connection)
,即源序列订阅connection
。和前面文章中源序列订阅中间层MapSink
一样。
因为源序列AnonymousObservable
继承自Producer
,所以后面的流程就是:
Producer
的subscribe
方法 -->Producer
的run
方法--> AnonymousObservable
的run
方法-->AnonymousObservableSink
的run
方法-->AnonymousObservable
的_subscribeHandler
-->发送事件onNext
方法-->观察者observer
的on
方法-->connection
的on
方法
10、connection
的on
方法
func on(_ event: Event<Subject.Observer.Element>) {
if isFlagSet(self._disposed, 1) {
return
}
if event.isStopEvent {
self.dispose()
}
self._subjectObserver.on(event)
}
上面知道_subjectObserver
就是PublishSubject
,所以来到PublishSubject
的on
方法。
11、PublishSubject
的on
方法中关键代码
func _synchronized_on(_ event: Event<Element>) -> Observers {
self._lock.lock(); defer { self._lock.unlock() }
switch event {
case .next:
if self._isDisposed || self._stopped {
return Observers()
}
return self._observers // 返回前面保存的所有订阅的事件处理
}
return self._observers
这句非常关键,意思是返回之前PublishSubject
保存的_observers
, 即AnonymousObserver.on
方法,也就是说依次响应前面的订阅处理事件(订阅1
和订阅2
中的打印)。
订阅1: 请求到的网络数据
订阅2: 请求到的网络数据
12、同理当第二个onNext
发送事件的时候,继续走第10和第11步,依次响应前面保存的AnonymousObserver.on
方法。
总结:
- 使用中间层
ConnectableObservableAdapter
将所有订阅事件AnonymousObserver.on
保存在PublishSubject
的一个字典中 - 直到调用
connect
的时候,才让源序列订阅一个中间类connection
,然后响应connection
的on
方法,然后走PublishSubject
的on
方法,依次响应原来保存的AnonymousObserver.on
- 所以当我们前面把
connect
方法注释过后,没有打印,因为注释过后,都没有走源序列的闭包,也就没有走发送事件 - 核心思想就是:用一个类将所有的订阅事件保存起来,然后让源序列去订阅这个类,然后将响应结果依次分给所以的订阅事件。
网友评论