美文网首页Rx
RxSwift源码分析(14)——Disposable销毁者

RxSwift源码分析(14)——Disposable销毁者

作者: 无悔zero | 来源:发表于2020-10-24 03:01 被阅读0次

    在使用RxSwift框架的过程中有一个步骤很重要,那就是销毁。一般序列的使用流程是创建、订阅、发送和响应、销毁。所以销毁是最后一步,如果销毁没有完成就会占用内存,大家都是知道的。销毁涉及最重要的两个类分别是:DisposableDisposeBag,那我们来看看它们是怎么进行销毁的。

    (一)通过添加进DisposeBag进行销毁

    先把链式代码拆分更细一点,其他流程我们分析过,就跳过直接看销毁流程:

    let observable = Observable<Any>.create { (anyObserver) -> Disposable in
        anyObserver.onNext("发送响应")
        return Disposables.create {
            print("销毁1")
        }
    }
    //销毁者
    let dispose = observable.subscribe { (text) in
        print("收到响应")
    } onDisposed: {
       print("销毁2")         
    }
    dispose.disposed(by: disposbag)
    
    1. 首先创建了序列,然后进行订阅,在subscribe函数里创建了一个Disposable,最终返回时又创建了一个Disposable
    let observable = ...
    let dispose = observable.subscribe { (text) in
        print("收到响应")
    }
    
    extension ObservableType {
        ...
        public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
            -> Disposable {
                let disposable: Disposable  //1个
                
                if let disposed = onDisposed {
                    disposable = Disposables.create(with: disposed)  //返回的是AnonymousDisposable
                }
                else {
                    disposable = Disposables.create()  //返回的是NopDisposable,区别在于没有闭包
                }
                ...
                let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
                
                let observer = AnonymousObserver<Element> { event in
                    ...
                }
                //2个
                return Disposables.create(
                    self.asObservable().subscribe(observer), 
                    disposable
                )
        }
    }
    
    1. Disposables.create( , )返回的是一个BinaryDisposableBinaryDisposable继承了Cancelable协议,而Cancelable协议继承了Disposable协议,从而拥有了销毁的能力。BinaryDisposable保存了disposable1disposable2
    extension Disposables {
    
        public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable {
            return BinaryDisposable(disposable1, disposable2)
        }
    
    }
    
    private final class BinaryDisposable : DisposeBase, Cancelable {
        
        private let _isDisposed = AtomicInt(0)  //默认值
        ...
        init(_ disposable1: Disposable, _ disposable2: Disposable) {
            self._disposable1 = disposable1
            self._disposable2 = disposable2
            super.init()
        }
        ...
    }
    
    public protocol Cancelable : Disposable {
        /// Was resource disposed.
        var isDisposed: Bool { get }
    }
    
    public protocol Disposable {
        /// Dispose resource.
        func dispose()
    }
    
    1. 返回的BinaryDisposable添加到了DisposeBag垃圾袋里,内部实际是添加到了self._disposables数组里:
    dispose.disposed(by: disposbag)
    
    extension Disposable {
        public func disposed(by bag: DisposeBag) {
            bag.insert(self)
        }
    }
    
    public final class DisposeBag: DisposeBase {
        ...
        fileprivate var _disposables = [Disposable]()
        fileprivate var _isDisposed = false
        ...
        public func insert(_ disposable: Disposable) {
            self._insert(disposable)?.dispose()  //self._insert返回了nil,所以不会执行dispose()
        }
    
        private func _insert(_ disposable: Disposable) -> Disposable? {
            self._lock.lock(); defer { self._lock.unlock() }
            if self._isDisposed {
                return disposable
            }
    
            self._disposables.append(disposable)
    
            return nil
        }
        ...
    }
    

    disposbag的初始化是这样的,一般我们会把它作为全局属性保存:

    var disposeBag = DisposeBag() 
    
    1. 现在只是进行添加,那什么时候回进行销毁呢?就是页面进行销毁的时候。页面进行销毁时,全局属性disposeBag就会进行销毁调用deinit,从而进行销毁:
    public final class DisposeBag: DisposeBase {
        ...
        private func dispose() {
            let oldDisposables = self._dispose()
    
            for disposable in oldDisposables {
                disposable.dispose()
            }
        }
    
        private func _dispose() -> [Disposable] {
            self._lock.lock(); defer { self._lock.unlock() }
    
            let disposables = self._disposables
            
            self._disposables.removeAll(keepingCapacity: false)
            self._isDisposed = true
            
            return disposables
        }
        
        deinit {
            self.dispose()
        }
    }
    

    最终通过self._disposables函数获取所有添加的disposable,然后遍历进行dispose()

    1. dispose()具体由BinaryDisposable实现:
    private final class BinaryDisposable : DisposeBase, Cancelable {
        ...
        func dispose() {
            if fetchOr(self._isDisposed, 1) == 0 {
                self._disposable1?.dispose()  
                self._disposable2?.dispose() 
                self._disposable1 = nil
                self._disposable2 = nil
            }
        }
    }
    
    • 首先会先进行标记和判断fetchOr(self._isDisposed, 1) == 0,利用的是位运算(位运算的好处,大家可以自行百度),初始值为0,后面一直标记为1:
    final class AtomicInt: NSLock {
        fileprivate var value: Int32
        public init(_ value: Int32 = 0) {
            self.value = value
        }
    }
    
    ...
    
    @discardableResult
    @inline(__always)
    func fetchOr(_ this: AtomicInt, _ mask: Int32) -> Int32 {
        this.lock()
        let oldValue = this.value
        this.value |= mask
        this.unlock()
        return oldValue
    }
    
    fileprivate final class AnonymousDisposable : DisposeBase, Cancelable {
        ...
        private let _isDisposed = AtomicInt(0)  //默认值
        ...
    }
    
    1. 接着分别由保存的self._disposable1self._disposable2进行dispose()
      5-1-1. 首先self._disposable1self.asObservable().subscribe(observer)返回的SinkDisposer
    class Producer<Element> : Observable<Element> {
        ...
        override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
            if !CurrentThreadScheduler.isScheduleRequired {
                // The returned disposable needs to release all references once it was disposed.
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                return disposer
            }
            else {
                return CurrentThreadScheduler.instance.schedule(()) { _ in
                    let disposer = SinkDisposer()
                    let sinkAndSubscription = self.run(observer, cancel: disposer)
                    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                    return disposer
                }
            }
        }
        ...
    }
    
    • 这里注意一下,SinkDisposer是通过setSinkAndSubscription保存sinksubscription,并进行了一些标记和安全判断:
    fileprivate final class SinkDisposer: Cancelable {
        ...
        func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
            self._sink = sink 
            self._subscription = subscription
    
            let previousState = fetchOr(self._state, DisposeState.sinkAndSubscriptionSet.rawValue)
            if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
                rxFatalError("Sink and subscription were already set")
            }
    
            if (previousState & DisposeState.disposed.rawValue) != 0 {
                sink.dispose()
                subscription.dispose()
                self._sink = nil
                self._subscription = nil
            }
        }
        ...
    }
    

    继续流程,也就是SinkDisposer进行dispose(),会先做标记以及一堆判断:

    fileprivate final class SinkDisposer: Cancelable {
        ...
        private let _state = AtomicInt(0)  //默认值
        ...
        func dispose() {
            let previousState = fetchOr(self._state, DisposeState.disposed.rawValue)  //标记
    
            if (previousState & DisposeState.disposed.rawValue) != 0 {
                return
            }
    
            if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
                guard let sink = self._sink else {
                    rxFatalError("Sink not set")
                }
                guard let subscription = self._subscription else {
                    rxFatalError("Subscription not set")
                }
    
                sink.dispose()
                subscription.dispose()
                //保证回调完成再=nil
                self._sink = nil
                self._subscription = nil
            }
        }
    }
    

    5-1-2. 最终由self.run(observer, cancel: disposer)返回的sinksubscription进行dispose()

    final private class AnonymousObservable<Element>: Producer<Element> {
        ...
        override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
            let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
            let subscription = sink.run(self)
            return (sink: sink, subscription: subscription)
        }
    }
    

    AnonymousObservableSink没有实现dispose(),但是父类Sink实现了:

    final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
        ...
    }
    
    class Sink<Observer: ObserverType> : Disposable {
        ...
        func dispose() {
            fetchOr(self._disposed, 1) //标记
            self._cancel.dispose()
        }
        ...
     }
    

    接着看到self._cancel.dispose()self._cancel是初始化保存的SinkDisposer,所以这里再一次调用了SinkDisposerdispose()函数,这时候标记就发挥它的作用了,避免无限循环dispose()
    5-1-3. 然后看下一个,subscriptionsink.run(self)返回的:

    final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
        ...
        func run(_ parent: Parent) -> Disposable {
            return parent._subscribeHandler(AnyObserver(self))
        }
    }
    

    根据RxSwift核心逻辑subscription就是外面的序列闭包返回的:

    let observable = Observable<Any>.create { (anyObserver) -> Disposable in
        ...
        return Disposables.create {
             print("销毁1")
        }
    }
    

    5-1-4. Disposables.create { }返回的是AnonymousDisposableAnonymousDisposable继承了Cancelable协议,而Cancelable协议继承了Disposable协议,从而拥有了销毁的能力;AnonymousDisposable保存了外面的disposeAction销毁闭包:

    extension Disposables {
    
        public static func create(with dispose: @escaping () -> Void) -> Cancelable {
            return AnonymousDisposable(disposeAction: dispose) //执行回调
        }
    
    }
    
    fileprivate final class AnonymousDisposable : DisposeBase, Cancelable {
        ...
        fileprivate init(_ disposeAction: @escaping DisposeAction) {
            self._disposeAction = disposeAction
            super.init()
        }
        ...
    }
    

    5-1-5. 所以也就是由AnonymousDisposable进行dispose()

    fileprivate final class AnonymousDisposable : DisposeBase, Cancelable {
        ...
        fileprivate func dispose() {
            if fetchOr(self._isDisposed, 1) == 0 {  //标记
                if let action = self._disposeAction {
                    self._disposeAction = nil  //因为action可能会遇到耗时操作,所以先=nil
                    action() 
                }
            }
        }
    }
    

    最后也是会先进行标记和判断,再调用action(),就是外面的self._disposeAction销毁闭包:

    Disposables.create {
        print("销毁1")
    }
    

    所以self._disposable1?.dispose()=>
    SinkDisposer.dispose()=>
    AnonymousObservableSink.dispose()+AnonymousDisposable.dispose()=>
    AnonymousObservableSink._cancel.dispose()+AnonymousDisposable.dispose()=>
    SinkDisposer.dispose()+AnonymousDisposable.dispose()
    所以这里其实产生了循环引用,这是作者故意的,为的就是不让各种对象释放,等在适当的时候才进行解除循环引用自然释放。

    5-2-1. 然后我们现在回头看self._disposable2?.dispose()self._disposable2其实就是第二个AnonymousDisposable,所以:

    所以调用了onDisposed,也就是外面第二个销毁闭包:

    let dispose = observable.subscribe { (text) in
        ...
    } onDisposed: {
       print("销毁2")         
    }
    
    • 到这里已经完成了销毁流程,但是并没有看到真正销毁对象的操作,只是调用销毁闭包以及把私有对象等于nil,因为销毁者要销毁的不是对象,而是响应关系。
    总结

    总结一下,创建出BinaryDisposable销毁者加入DisposeBag垃圾袋,当DisposeBag进入deinit时,遍历添加的BinaryDisposable进行dispose()
    BinaryDisposable.dispose()简单来说就是各种继承了Disposable协议的对象进行dispose(),最终就是调用两个AnonymousDisposable保存的销毁闭包和把私有化的AnonymousObservableSinkAnonymousDisposable等于nil
    所以销毁者实际并不是销毁对象,而是销毁响应关系;并且由于循环引用会走两次SinkDisposer.dispose()函数;而真正的释放是通过等待系统回收(ARC)。

    (二)主动销毁

    以上是属于被动销毁,很多时候会需要主动销毁。而主动销毁可以这样做:

    let observable = Observable<Any>.create { (anyObserver) -> Disposable in
        anyObserver.onNext("发送响应")
        return Disposables.create {
            print("销毁1")
        }
    }
    //销毁者
    let dispose = observable.subscribe { (text) in
        print("收到响应")
    } onDisposed: {
       print("销毁2")         
    }
    //主动销毁
    dispose.dispose()  //=>BinaryDisposable.dispose()
    

    (三)补充

    发送完成或错误事件时会调用AnonymousDisposable.dispose()

    let observable = Observable<Any>.create { (anyObserver) -> Disposable in
        anyObserver.onNext("发送响应")
        anyObserver.onCompleted()  //发送完成
        return Disposables.create {
            print("销毁1")
        }
    }
    //销毁者
    let dispose = observable.subscribe { (text) in
        print("收到响应")
    } onDisposed: {
       print("销毁2")         
    }
    
    extension ObservableType {
        ...
        public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
            -> Disposable {
                let disposable: Disposable
                ...
                let observer = AnonymousObserver<Element> { event in
                    ...
                    switch event {
                    case .next(let value):  
                        onNext?(value)
                    case .error(let error):
                        ...
                        disposable.dispose()  //销毁
                    case .completed:
                        onCompleted?()
                        disposable.dispose()  //销毁
                    }
                }
                
                return Disposables.create(
                    self.asObservable().subscribe(observer),
                    disposable
                )
        }
    }
    

    由于发送完成或错误事件会进行标记,所以AnonymousDisposable.dispose()后就算没有释放对象也不能再次发送响应:

    AnonymousObservableSink ObserverBase

    而且序列也可以主动销毁,通过自身调用dispose()来进行标记断开响应:

    let observable = Observable<Any>.create { (anyObserver) -> Disposable in
        anyObserver.onNext("发送响应")
        return Disposables.create {
            print("销毁1")
        }
    }
    observable.dispose()
    

    相关文章

      网友评论

        本文标题:RxSwift源码分析(14)——Disposable销毁者

        本文链接:https://www.haomeiwen.com/subject/pgtbmktx.html