在使用RxSwift框架的过程中有一个步骤很重要,那就是销毁。一般序列的使用流程是创建、订阅、发送和响应、销毁。所以销毁是最后一步,如果销毁没有完成就会占用内存,大家都是知道的。销毁涉及最重要的两个类分别是:Disposable和DisposeBag,那我们来看看它们是怎么进行销毁的。
(一)通过添加进DisposeBag进行销毁
先把链式代码拆分更细一点,其他流程我们分析过,就跳过直接看销毁流程:
let observable = Observable<Any>.create { (anyObserver) -> Disposable in
anyObserver.onNext("发送响应")
return Disposables.create {
print("销毁1")
}
}
//销毁者
let dispose = observable.subscribe { (text) in
print("收到响应")
} onDisposed: {
print("销毁2")
}
dispose.disposed(by: disposbag)
- 首先创建了序列,然后进行订阅,在
subscribe
函数里创建了一个Disposable
,最终返回时又创建了一个Disposable
:
let observable = ...
let dispose = observable.subscribe { (text) in
print("收到响应")
}
extension ObservableType {
...
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable //1个
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed) //返回的是AnonymousDisposable
}
else {
disposable = Disposables.create() //返回的是NopDisposable,区别在于没有闭包
}
...
let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
let observer = AnonymousObserver<Element> { event in
...
}
//2个
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
-
Disposables.create( , )
返回的是一个BinaryDisposable
,BinaryDisposable
继承了Cancelable
协议,而Cancelable
协议继承了Disposable
协议,从而拥有了销毁的能力。BinaryDisposable
保存了disposable1
和disposable2
:
extension Disposables {
public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable {
return BinaryDisposable(disposable1, disposable2)
}
}
private final class BinaryDisposable : DisposeBase, Cancelable {
private let _isDisposed = AtomicInt(0) //默认值
...
init(_ disposable1: Disposable, _ disposable2: Disposable) {
self._disposable1 = disposable1
self._disposable2 = disposable2
super.init()
}
...
}
public protocol Cancelable : Disposable {
/// Was resource disposed.
var isDisposed: Bool { get }
}
public protocol Disposable {
/// Dispose resource.
func dispose()
}
- 返回的
BinaryDisposable
添加到了DisposeBag
垃圾袋里,内部实际是添加到了self._disposables
数组里:
dispose.disposed(by: disposbag)
extension Disposable {
public func disposed(by bag: DisposeBag) {
bag.insert(self)
}
}
public final class DisposeBag: DisposeBase {
...
fileprivate var _disposables = [Disposable]()
fileprivate var _isDisposed = false
...
public func insert(_ disposable: Disposable) {
self._insert(disposable)?.dispose() //self._insert返回了nil,所以不会执行dispose()
}
private func _insert(_ disposable: Disposable) -> Disposable? {
self._lock.lock(); defer { self._lock.unlock() }
if self._isDisposed {
return disposable
}
self._disposables.append(disposable)
return nil
}
...
}
而disposbag
的初始化是这样的,一般我们会把它作为全局属性保存:
var disposeBag = DisposeBag()
- 现在只是进行添加,那什么时候回进行销毁呢?就是页面进行销毁的时候。页面进行销毁时,全局属性
disposeBag
就会进行销毁调用deinit
,从而进行销毁:
public final class DisposeBag: DisposeBase {
...
private func dispose() {
let oldDisposables = self._dispose()
for disposable in oldDisposables {
disposable.dispose()
}
}
private func _dispose() -> [Disposable] {
self._lock.lock(); defer { self._lock.unlock() }
let disposables = self._disposables
self._disposables.removeAll(keepingCapacity: false)
self._isDisposed = true
return disposables
}
deinit {
self.dispose()
}
}
最终通过self._disposables
函数获取所有添加的disposable
,然后遍历进行dispose()
。
- 而
dispose()
具体由BinaryDisposable
实现:
private final class BinaryDisposable : DisposeBase, Cancelable {
...
func dispose() {
if fetchOr(self._isDisposed, 1) == 0 {
self._disposable1?.dispose()
self._disposable2?.dispose()
self._disposable1 = nil
self._disposable2 = nil
}
}
}
- 首先会先进行标记和判断
fetchOr(self._isDisposed, 1) == 0
,利用的是位运算(位运算的好处,大家可以自行百度),初始值为0,后面一直标记为1:
final class AtomicInt: NSLock {
fileprivate var value: Int32
public init(_ value: Int32 = 0) {
self.value = value
}
}
...
@discardableResult
@inline(__always)
func fetchOr(_ this: AtomicInt, _ mask: Int32) -> Int32 {
this.lock()
let oldValue = this.value
this.value |= mask
this.unlock()
return oldValue
}
fileprivate final class AnonymousDisposable : DisposeBase, Cancelable {
...
private let _isDisposed = AtomicInt(0) //默认值
...
}
- 接着分别由保存的
self._disposable1
和self._disposable2
进行dispose()
。
5-1-1. 首先self._disposable1
是self.asObservable().subscribe(observer)
返回的SinkDisposer
:
class Producer<Element> : Observable<Element> {
...
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
// The returned disposable needs to release all references once it was disposed.
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
...
}
- 这里注意一下,
SinkDisposer
是通过setSinkAndSubscription
保存sink
和subscription
,并进行了一些标记和安全判断:
fileprivate final class SinkDisposer: Cancelable {
...
func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
self._sink = sink
self._subscription = subscription
let previousState = fetchOr(self._state, DisposeState.sinkAndSubscriptionSet.rawValue)
if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
rxFatalError("Sink and subscription were already set")
}
if (previousState & DisposeState.disposed.rawValue) != 0 {
sink.dispose()
subscription.dispose()
self._sink = nil
self._subscription = nil
}
}
...
}
继续流程,也就是SinkDisposer
进行dispose()
,会先做标记以及一堆判断:
fileprivate final class SinkDisposer: Cancelable {
...
private let _state = AtomicInt(0) //默认值
...
func dispose() {
let previousState = fetchOr(self._state, DisposeState.disposed.rawValue) //标记
if (previousState & DisposeState.disposed.rawValue) != 0 {
return
}
if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
guard let sink = self._sink else {
rxFatalError("Sink not set")
}
guard let subscription = self._subscription else {
rxFatalError("Subscription not set")
}
sink.dispose()
subscription.dispose()
//保证回调完成再=nil
self._sink = nil
self._subscription = nil
}
}
}
5-1-2. 最终由self.run(observer, cancel: disposer)
返回的sink
和subscription
进行dispose()
:
final private class AnonymousObservable<Element>: Producer<Element> {
...
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
AnonymousObservableSink
没有实现dispose()
,但是父类Sink
实现了:
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
...
}
class Sink<Observer: ObserverType> : Disposable {
...
func dispose() {
fetchOr(self._disposed, 1) //标记
self._cancel.dispose()
}
...
}
接着看到self._cancel.dispose()
,self._cancel
是初始化保存的SinkDisposer
,所以这里再一次调用了SinkDisposer
的dispose()
函数,这时候标记就发挥它的作用了,避免无限循环dispose()
。
5-1-3. 然后看下一个,subscription
是sink.run(self)
返回的:
final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
...
func run(_ parent: Parent) -> Disposable {
return parent._subscribeHandler(AnyObserver(self))
}
}
根据RxSwift核心逻辑,subscription
就是外面的序列闭包返回的:
let observable = Observable<Any>.create { (anyObserver) -> Disposable in
...
return Disposables.create {
print("销毁1")
}
}
5-1-4. Disposables.create { }
返回的是AnonymousDisposable
,AnonymousDisposable
继承了Cancelable
协议,而Cancelable
协议继承了Disposable
协议,从而拥有了销毁的能力;AnonymousDisposable
保存了外面的disposeAction
销毁闭包:
extension Disposables {
public static func create(with dispose: @escaping () -> Void) -> Cancelable {
return AnonymousDisposable(disposeAction: dispose) //执行回调
}
}
fileprivate final class AnonymousDisposable : DisposeBase, Cancelable {
...
fileprivate init(_ disposeAction: @escaping DisposeAction) {
self._disposeAction = disposeAction
super.init()
}
...
}
5-1-5. 所以也就是由AnonymousDisposable
进行dispose()
:
fileprivate final class AnonymousDisposable : DisposeBase, Cancelable {
...
fileprivate func dispose() {
if fetchOr(self._isDisposed, 1) == 0 { //标记
if let action = self._disposeAction {
self._disposeAction = nil //因为action可能会遇到耗时操作,所以先=nil
action()
}
}
}
}
最后也是会先进行标记和判断,再调用action()
,就是外面的self._disposeAction
销毁闭包:
Disposables.create {
print("销毁1")
}
所以
self._disposable1?.dispose()
=>
SinkDisposer.dispose()
=>
AnonymousObservableSink.dispose()
+AnonymousDisposable.dispose()
=>
AnonymousObservableSink._cancel.dispose()
+AnonymousDisposable.dispose()
=>
SinkDisposer.dispose()
+AnonymousDisposable.dispose()
所以这里其实产生了循环引用,这是作者故意的,为的就是不让各种对象释放,等在适当的时候才进行解除循环引用自然释放。
5-2-1. 然后我们现在回头看self._disposable2?.dispose()
,self._disposable2
其实就是第二个AnonymousDisposable
,所以:
所以调用了onDisposed
,也就是外面第二个销毁闭包:
let dispose = observable.subscribe { (text) in
...
} onDisposed: {
print("销毁2")
}
- 到这里已经完成了销毁流程,但是并没有看到真正销毁对象的操作,只是调用销毁闭包以及把私有对象等于
nil
,因为销毁者要销毁的不是对象,而是响应关系。
总结
总结一下,创建出
BinaryDisposable
销毁者加入DisposeBag
垃圾袋,当DisposeBag
进入deinit
时,遍历添加的BinaryDisposable
进行dispose()
。
BinaryDisposable.dispose()
简单来说就是各种继承了Disposable
协议的对象进行dispose()
,最终就是调用两个AnonymousDisposable
保存的销毁闭包和把私有化的AnonymousObservableSink
和AnonymousDisposable
等于nil
。
所以销毁者实际并不是销毁对象,而是销毁响应关系;并且由于循环引用会走两次SinkDisposer.dispose()
函数;而真正的释放是通过等待系统回收(ARC)。
(二)主动销毁
以上是属于被动销毁,很多时候会需要主动销毁。而主动销毁可以这样做:
let observable = Observable<Any>.create { (anyObserver) -> Disposable in
anyObserver.onNext("发送响应")
return Disposables.create {
print("销毁1")
}
}
//销毁者
let dispose = observable.subscribe { (text) in
print("收到响应")
} onDisposed: {
print("销毁2")
}
//主动销毁
dispose.dispose() //=>BinaryDisposable.dispose()
(三)补充
发送完成或错误事件时会调用AnonymousDisposable.dispose()
:
let observable = Observable<Any>.create { (anyObserver) -> Disposable in
anyObserver.onNext("发送响应")
anyObserver.onCompleted() //发送完成
return Disposables.create {
print("销毁1")
}
}
//销毁者
let dispose = observable.subscribe { (text) in
print("收到响应")
} onDisposed: {
print("销毁2")
}
extension ObservableType {
...
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
...
let observer = AnonymousObserver<Element> { event in
...
switch event {
case .next(let value):
onNext?(value)
case .error(let error):
...
disposable.dispose() //销毁
case .completed:
onCompleted?()
disposable.dispose() //销毁
}
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
}
由于发送完成或错误事件会进行标记,所以AnonymousDisposable.dispose()
后就算没有释放对象也不能再次发送响应:
而且序列也可以主动销毁,通过自身调用dispose()
来进行标记断开响应:
let observable = Observable<Any>.create { (anyObserver) -> Disposable in
anyObserver.onNext("发送响应")
return Disposables.create {
print("销毁1")
}
}
observable.dispose()
网友评论