上一篇文章解读了创建Observable和订阅Observable的流程,今天继续解读dispose源码。
dispose即订阅取消,取消后相关内存资源释放,包括被观察者,观察者,之后无法再发送序列,也无法接收序列。
我们可以对dispose事件进行订阅,这样订阅取消后会调用对应闭包。我们还是拿上一篇文章的代码举例:
let observable = Observable<Int>.create { (anyObserver) -> Disposable in
anyObserver.onNext(1)
return Disposables.create { print("disposed 1")}
}
let dispose = observable.subscribe(onNext: { ele in
print(ele)
}, onDisposed: {
print("dispose2")
})
dispose.dispose()
当我们创建Observable时,在闭包中返回一个Disposables.create { print("disposed 1")}
或者Disposables.create()
, 如果订阅取消,会打印disposed 1
,我们同时通过onDisposed订阅了订阅取消的事件,这样还会打印disposed2
,那先打印哪个呢,经过测试先打印disposed1
再打印disposed 2
; 关于dispose,涉及到一下几个结构体或协议。
- 协议:
Disposable
和Cancelable
,在上篇文章已经对两个协议做了讲解。 - 结构体:
Disposables
,可以把它理解为一个工厂,创建各种类型的Disposable,它有如下几个工厂方法
// Create AnonymousDisposable
public static func create(with dispose: @escaping () -> Void) -> Cancelable {
return AnonymousDisposable(disposeAction: dispose)
}
// Create BinaryDisposable
public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable {
return BinaryDisposable(disposable1, disposable2)
}
/// Creates a disposable with the given disposables.
public static func create(_ disposable1: Disposable, _ disposable2: Disposable, _ disposable3: Disposable) -> Cancelable {
return CompositeDisposable(disposable1, disposable2, disposable3)
}
/// Creates a disposable with the given disposables.
public static func create(_ disposable1: Disposable, _ disposable2: Disposable, _ disposable3: Disposable, _ disposables: Disposable ...) -> Cancelable {
var disposables = disposables
disposables.append(disposable1)
disposables.append(disposable2)
disposables.append(disposable3)
return CompositeDisposable(disposables: disposables)
}
/// Creates a disposable with the given disposables.
public static func create(_ disposables: [Disposable]) -> Cancelable {
switch disposables.count {
case 2:
return Disposables.create(disposables[0], disposables[1])
default:
return CompositeDisposable(disposables: disposables)
}
}
static public func create() -> Disposable {
return NopDisposable.noOp
}
分别创建了AnonymousDisposable
BinaryDisposable
CompositeDisposable
NopDisposable
,这些都直接或间接遵循了Disposeable协议,所以都实现了自己的dispose方法。AnonymousDisposable
BinaryDisposable
CompositeDisposable
都继承了DisposeBase
, DisposeBase
代码很简单,用来实现rxswift自身的引用计数:初始化时调用Resources.incrementTotal() 增加引用计数,释放时调用Resources.decrementTotal()
-
AnonymousDisposable 遵循了Cancelable, 保存一个闭包,当调用dispose时会调用这个闭包,我们例子中的代码
Disposables.create { print("disposed 1")}
创建的就是一个AnonymousDisposable,我们看看这个dispose代码:
private let _isDisposed = AtomicInt(0)
/// - returns: Was resource disposed.
public var isDisposed: Bool {
return isFlagSet(self._isDisposed, 1)
}
/// Calls the disposal action if and only if the current instance hasn't been disposed yet.
///
/// After invoking disposal action, disposal action will be dereferenced.
fileprivate func dispose() {
if fetchOr(self._isDisposed, 1) == 0 {
if let action = self._disposeAction {
self._disposeAction = nil
action()
}
}
}
_isDisposed 是一个AtomicInt,它是一个Int整型数据的封装,对Int操作是线程安全的,有兴趣自己看源码。
首先调用fetchOr(self._isDisposed, 1) 将_isDisposed的value 和 1进行或运算操作,返回原value,并写回value。如果原value == 0则返回true,这时将value修改为1,并执行action, 这样做的目的是为了防止action被多次调用,一般只调用一次。所以上面的 isDisposed方法实现中如果value==1 则返回true,表示已经释放资源,否则返回false。
- BinaryDisposable 同样遵循了Cancelable,包含两个Disposable,dispose时会调用这两个Disposable的dispose,代码如下:
func dispose() {
if fetchOr(self._isDisposed, 1) == 0 {
self._disposable1?.dispose()
self._disposable2?.dispose()
self._disposable1 = nil
self._disposable2 = nil
}
}
所以我们可以将两个AnonymousDisposable或者任意Disposable 包装到BinaryDisposable里。
- CompositeDisposable 可以包含任意多的Disposable,使用Bag这个结构体去存放所有Disposable对象,Bag使用ContiguousArray和 Dictionary保存所有Disposable,Bag的具体实现细节以后有机会再说吧!
-
NopDisposable 这个diposeable不做任何操作,它的dispose的实现是空的,当我们不需要释放资源时可以用这个类。
刚刚我们说到Bag,我们还有个类叫DisposeBag这个在开发中用的最多。开发中一般把disposable放入DispoaseBag,当DisposeBag被释放时,里面的所有disposable会调用dispose。我们把例子中的代码改下:
// 在当前类中定义bag,当当前对象被释放时bag被释放,此时dispose被释放。
let bag = DisposeBag()
// 将dispose放入bag
dispose.disposed(by: bag)
我们看看DisposeBag的源码,它也继承了DisposeBase,没有遵循Cancelable,包含一个数组_disposables = [Disposable]()
, 保存了所有disposable,实现了insert dispose等方法。在deinit方法中调用dispose方法,dispose方法中遍历数组释放所有disposeable。
下面开始讲解dispose的流程,当我们调用订阅onDisposed时,我们进入subscribe方法看看:
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
/// 此处代码省略。。。
let observer = /// 此处代码省略。。。
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
- 如果外面实现了
onDisposed
方法,则创建Disposables.create(with: disposed)
并把onDisposed
传入,它创建AnonymousDisposable(disposeAction: dispose)
,否则Disposables.create()
,这个创建NopDisposable.noOp
,上面已经对AnonymousDisposable
和NopDisposable
解释过了,AnonymousDisposable
包含一个闭包,dispose时会调用这个闭包,NopDisposable
不做任何事情。将这个disposable传给Disposables.create(disposable1, disposable2)
, 另一个参数是self.asObservable().subscribe(observer)
的返回值。我们进入这个subscribe看代码:
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
// 代码省略。。。
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
- 创建了一个
let disposer = SinkDisposer()
并返回,SinkDisposer
是一个类,遵循了Cancelable
,保存两个Disposable
,一个是sink
一个是subscription
,通过调用disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
,给这两个成员赋值。而sink
和subscription
又是通过self.run(observer, cancel: disposer)
返回, 我们看看这个run
方法:
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
又调用了sink.run(self)
func run(_ parent: Parent) -> Disposable {
parent.subscribeHandler(AnyObserver(self))
}
subscription
是 subscribeHandler
闭包的返回值,即文章开头的例子代码中的Disposables.create { print("disposed 1")}
而sink
就是创建的AnonymousObservableSink对象,AnonymousObservableSink的父类Sink遵循了Disposable,所以它有一个dispose方法。
这样我们大概能搞明白这些Disposable之间的关联关系了,我画了一个关系图;如下:
按照这个关系图依次调用每个类的dispose方法, 从BinaryDisposeable开始:
- 最外面的就是
BinaryDisposable
,当调用它的dispose
时,先调用disposeable1
的dispose
,这时会依次调用sink
的dispose
和subscription
的dispose
,调用sink
的dispose
时会调用它保存的cancel
对象的dispose
,这个cancel
实际上是let disposer = SinkDisposer()
。
注意调用cancel的dispose时,因为cancel又回到了SinkDisposer的dispose,它会判断这个类是否已调用过dispose,如果是,则停止。否则会形成一个调用死循环。我们可以看看这块的代码:
func dispose() {
let previousState = fetchOr(self.state, DisposeState.disposed.rawValue)
if (previousState & DisposeState.disposed.rawValue) != 0 {
return
}
if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
guard let sink = self.sink else {
rxFatalError("Sink not set")
}
guard let subscription = self.subscription else {
rxFatalError("Subscription not set")
}
sink.dispose()
subscription.dispose()
self.sink = nil
self.subscription = nil
}
}
if (previousState & DisposeState.disposed.rawValue) != 0
这句判断状态是否已经dispose 是就return。
- 接着我们调用
subscription
的dispose
时会调用它保存的闭包,打印disposed 1。
3 这条线完成后,调用disposeable2
的dispose
,它会打印dispose2,所以打印顺序就是先打印dispose1 再打印dispose2.
说完这种情况,我们再讨论另外一种dispose方式,我们在通过调用onCompleted 释放资源,在onNext后面加句onCompleted,代码如下:
let observable = Observable<Int>.create { (anyObserver) -> Disposable in
anyObserver.onNext(1)
anyObserver.onCompleted()
return Disposables.create() {
print("disposed1")
}
}
let dispose = observable.subscribe(onNext: { ele in
print(ele)
}, onDisposed: {
print("disposed2")
})
测试结果:先打印disposed2 后打印disposed1
分析下原因:
- 我们在调用observable.subscribe方法时,最终会调到anyObserver.onCompleted() 所在的闭包,这个过程上篇文章已经分析过了,然后调用onCompleted的时候,流程跟onNext基本一致,最后会调到ObservableType + extension的subscribe
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
}
else {
disposable = Disposables.create()
}
// 中间省略...
case .completed:
onCompleted?()
disposable.dispose()
}
- 这里不再啰嗦,注意调完onCompleted后会调用disposable.dispose(),这里明显会调到onDisposed回调,所以会先打印disposed2,这里的disposable就是图中的disposeable2。
- 之后有一个关键调用是在AnonymousObservableSink 的on方法:
func on(_ event: Event<Element>) {
#if DEBUG
self.synchronizationTracker.register(synchronizationErrorMessage: .default)
defer { self.synchronizationTracker.unregister() }
#endif
switch event {
case .next:
if load(self.isStopped) == 1 {
return
}
self.forwardOn(event)
case .error, .completed:
if fetchOr(self.isStopped, 1) == 0 {
self.forwardOn(event)
self.dispose()
}
}
}
但我们调用完forwardOn后,接着调用self.dispose 将当前sink dispose, 这时根据上面的关系图,会调用sink内的cancel的dispose. 然后调用SinkDisposer的dispose,这个时候会将state改成1,除此之外不会做其他事情,因为原 state == 0,
- 接下去整个run方法执行完了,在执行run后面的方法:
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
self.sink = sink
self.subscription = subscription
let previousState = fetchOr(self.state, DisposeState.sinkAndSubscriptionSet.rawValue)
if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
rxFatalError("Sink and subscription were already set")
}
if (previousState & DisposeState.disposed.rawValue) != 0 {
sink.dispose()
subscription.dispose()
self.sink = nil
self.subscription = nil
}
}
- 在这个方法中因为第2步把state改成了1,所以执行sink.dispose subscription.dispose(), 然后按图中的箭头顺序往下调。执行
subscription.dispose
时打印dispose1,这样disposable1
执行完dispose了,结束!只剩下空壳BinaryDisposable. 已经无关紧要了。
总结
理顺各个dispose类的关系图,基本就没问题了。
网友评论