美文网首页RxSwiftRx
02. RxSwift源码解读:dispose流程

02. RxSwift源码解读:dispose流程

作者: Oceanj | 来源:发表于2021-05-20 16:36 被阅读0次

    上一篇文章解读了创建Observable和订阅Observable的流程,今天继续解读dispose源码。

    dispose即订阅取消,取消后相关内存资源释放,包括被观察者,观察者,之后无法再发送序列,也无法接收序列。
    我们可以对dispose事件进行订阅,这样订阅取消后会调用对应闭包。我们还是拿上一篇文章的代码举例:

           let observable = Observable<Int>.create { (anyObserver) -> Disposable in
                anyObserver.onNext(1)
                return Disposables.create { print("disposed 1")}
            }
            let dispose = observable.subscribe(onNext: { ele in
                print(ele)
            }, onDisposed:  {
                print("dispose2")
            })
            dispose.dispose()
    

    当我们创建Observable时,在闭包中返回一个Disposables.create { print("disposed 1")} 或者Disposables.create(), 如果订阅取消,会打印disposed 1,我们同时通过onDisposed订阅了订阅取消的事件,这样还会打印disposed2,那先打印哪个呢,经过测试先打印disposed1 再打印disposed 2; 关于dispose,涉及到一下几个结构体或协议。

    • 协议:DisposableCancelable,在上篇文章已经对两个协议做了讲解。
    • 结构体:Disposables,可以把它理解为一个工厂,创建各种类型的Disposable,它有如下几个工厂方法
        // Create AnonymousDisposable
        public static func create(with dispose: @escaping () -> Void) -> Cancelable {
            return AnonymousDisposable(disposeAction: dispose)
        }
        // Create BinaryDisposable
        public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable {
            return BinaryDisposable(disposable1, disposable2)
        }
        /// Creates a disposable with the given disposables.
        public static func create(_ disposable1: Disposable, _ disposable2: Disposable, _ disposable3: Disposable) -> Cancelable {
            return CompositeDisposable(disposable1, disposable2, disposable3)
        }
        
        /// Creates a disposable with the given disposables.
        public static func create(_ disposable1: Disposable, _ disposable2: Disposable, _ disposable3: Disposable, _ disposables: Disposable ...) -> Cancelable {
            var disposables = disposables
            disposables.append(disposable1)
            disposables.append(disposable2)
            disposables.append(disposable3)
            return CompositeDisposable(disposables: disposables)
        }
        
        /// Creates a disposable with the given disposables.
        public static func create(_ disposables: [Disposable]) -> Cancelable {
            switch disposables.count {
            case 2:
                return Disposables.create(disposables[0], disposables[1])
            default:
                return CompositeDisposable(disposables: disposables)
            }
        }
        static public func create() -> Disposable {
            return NopDisposable.noOp
        }
    

    分别创建了AnonymousDisposable BinaryDisposable CompositeDisposable NopDisposable,这些都直接或间接遵循了Disposeable协议,所以都实现了自己的dispose方法。AnonymousDisposable BinaryDisposable CompositeDisposable 都继承了DisposeBase, DisposeBase 代码很简单,用来实现rxswift自身的引用计数:初始化时调用Resources.incrementTotal() 增加引用计数,释放时调用Resources.decrementTotal()

    • AnonymousDisposable 遵循了Cancelable, 保存一个闭包,当调用dispose时会调用这个闭包,我们例子中的代码Disposables.create { print("disposed 1")} 创建的就是一个AnonymousDisposable,我们看看这个dispose代码:
        private let _isDisposed = AtomicInt(0)
        /// - returns: Was resource disposed.
        public var isDisposed: Bool {
            return isFlagSet(self._isDisposed, 1)
        }
        /// Calls the disposal action if and only if the current instance hasn't been disposed yet.
        ///
        /// After invoking disposal action, disposal action will be dereferenced.
        fileprivate func dispose() {
            if fetchOr(self._isDisposed, 1) == 0 {
                if let action = self._disposeAction {
                    self._disposeAction = nil
                    action()
                }
            }
        }
    

    _isDisposed 是一个AtomicInt,它是一个Int整型数据的封装,对Int操作是线程安全的,有兴趣自己看源码。
    首先调用fetchOr(self._isDisposed, 1) 将_isDisposed的value 和 1进行或运算操作,返回原value,并写回value。如果原value == 0则返回true,这时将value修改为1,并执行action, 这样做的目的是为了防止action被多次调用,一般只调用一次。所以上面的 isDisposed方法实现中如果value==1 则返回true,表示已经释放资源,否则返回false。

    • BinaryDisposable 同样遵循了Cancelable,包含两个Disposable,dispose时会调用这两个Disposable的dispose,代码如下:
       func dispose() {
            if fetchOr(self._isDisposed, 1) == 0 {
                self._disposable1?.dispose()
                self._disposable2?.dispose()
                self._disposable1 = nil
                self._disposable2 = nil
            }
        }
    

    所以我们可以将两个AnonymousDisposable或者任意Disposable 包装到BinaryDisposable里。

    • CompositeDisposable 可以包含任意多的Disposable,使用Bag这个结构体去存放所有Disposable对象,Bag使用ContiguousArray和 Dictionary保存所有Disposable,Bag的具体实现细节以后有机会再说吧!
    • NopDisposable 这个diposeable不做任何操作,它的dispose的实现是空的,当我们不需要释放资源时可以用这个类。
      刚刚我们说到Bag,我们还有个类叫DisposeBag这个在开发中用的最多。开发中一般把disposable放入DispoaseBag,当DisposeBag被释放时,里面的所有disposable会调用dispose。我们把例子中的代码改下:
      // 在当前类中定义bag,当当前对象被释放时bag被释放,此时dispose被释放。
      let bag = DisposeBag() 
     // 将dispose放入bag
      dispose.disposed(by: bag)
    

    我们看看DisposeBag的源码,它也继承了DisposeBase,没有遵循Cancelable,包含一个数组_disposables = [Disposable](), 保存了所有disposable,实现了insert dispose等方法。在deinit方法中调用dispose方法,dispose方法中遍历数组释放所有disposeable。

    下面开始讲解dispose的流程,当我们调用订阅onDisposed时,我们进入subscribe方法看看:

    public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
            -> Disposable {
                let disposable: Disposable
                
                if let disposed = onDisposed {
                    disposable = Disposables.create(with: disposed)
                }
                else {
                    disposable = Disposables.create()
                } 
                /// 此处代码省略。。。
                
                let observer = /// 此处代码省略。。。
                return Disposables.create(
                    self.asObservable().subscribe(observer),
                    disposable
                )
        }
    
    1. 如果外面实现了onDisposed方法,则创建Disposables.create(with: disposed) 并把onDisposed传入,它创建AnonymousDisposable(disposeAction: dispose),否则Disposables.create(),这个创建NopDisposable.noOp,上面已经对AnonymousDisposableNopDisposable解释过了,AnonymousDisposable包含一个闭包,dispose时会调用这个闭包,NopDisposable不做任何事情。将这个disposable传给 Disposables.create(disposable1, disposable2), 另一个参数是self.asObservable().subscribe(observer)的返回值。我们进入这个subscribe看代码:
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
            // 代码省略。。。
                return CurrentThreadScheduler.instance.schedule(()) { _ in
                    let disposer = SinkDisposer()
                    let sinkAndSubscription = self.run(observer, cancel: disposer)
                    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                    return disposer
                }
        }
    
    1. 创建了一个let disposer = SinkDisposer() 并返回,SinkDisposer是一个类,遵循了Cancelable,保存两个Disposable,一个是sink一个是subscription,通过调用disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription),给这两个成员赋值。而sinksubscription又是通过self.run(observer, cancel: disposer)返回, 我们看看这个run方法:
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
            let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
            let subscription = sink.run(self)
            return (sink: sink, subscription: subscription)
        }
    

    又调用了sink.run(self)

    func run(_ parent: Parent) -> Disposable {
            parent.subscribeHandler(AnyObserver(self))
     }
    

    subscriptionsubscribeHandler闭包的返回值,即文章开头的例子代码中的Disposables.create { print("disposed 1")}sink就是创建的AnonymousObservableSink对象,AnonymousObservableSink的父类Sink遵循了Disposable,所以它有一个dispose方法。
    这样我们大概能搞明白这些Disposable之间的关联关系了,我画了一个关系图;如下:

    image.png
    按照这个关系图依次调用每个类的dispose方法, 从BinaryDisposeable开始:
    1. 最外面的就是BinaryDisposable,当调用它的dispose时,先调用disposeable1dispose,这时会依次调用sinkdisposesubscriptiondispose,调用sinkdispose时会调用它保存的cancel对象的dispose,这个cancel实际上是let disposer = SinkDisposer()

    注意调用cancel的dispose时,因为cancel又回到了SinkDisposer的dispose,它会判断这个类是否已调用过dispose,如果是,则停止。否则会形成一个调用死循环。我们可以看看这块的代码:

    func dispose() {
            let previousState = fetchOr(self.state, DisposeState.disposed.rawValue)
    
            if (previousState & DisposeState.disposed.rawValue) != 0 {
                return
            }
    
            if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
                guard let sink = self.sink else {
                    rxFatalError("Sink not set")
                }
                guard let subscription = self.subscription else {
                    rxFatalError("Subscription not set")
                }
    
                sink.dispose()
                subscription.dispose()
    
                self.sink = nil
                self.subscription = nil
            }
        }
    

    if (previousState & DisposeState.disposed.rawValue) != 0 这句判断状态是否已经dispose 是就return。

    1. 接着我们调用subscriptiondispose时会调用它保存的闭包,打印disposed 1。
      3 这条线完成后,调用disposeable2dispose,它会打印dispose2,所以打印顺序就是先打印dispose1 再打印dispose2.

    说完这种情况,我们再讨论另外一种dispose方式,我们在通过调用onCompleted 释放资源,在onNext后面加句onCompleted,代码如下:

             
          let observable = Observable<Int>.create { (anyObserver) -> Disposable in
                anyObserver.onNext(1)
                anyObserver.onCompleted()
                return Disposables.create() {
                    print("disposed1")
                }
            }
            let dispose = observable.subscribe(onNext: { ele in
                print(ele)
            }, onDisposed:  {
                print("disposed2")
            })
    

    测试结果:先打印disposed2 后打印disposed1
    分析下原因:

    1. 我们在调用observable.subscribe方法时,最终会调到anyObserver.onCompleted() 所在的闭包,这个过程上篇文章已经分析过了,然后调用onCompleted的时候,流程跟onNext基本一致,最后会调到ObservableType + extension的subscribe
                if let disposed = onDisposed {
                    disposable = Disposables.create(with: disposed)
                }
                else {
                    disposable = Disposables.create()
               }
              // 中间省略...
              case .completed:
                     onCompleted?()
                     disposable.dispose()
              }
    
    1. 这里不再啰嗦,注意调完onCompleted后会调用disposable.dispose(),这里明显会调到onDisposed回调,所以会先打印disposed2,这里的disposable就是图中的disposeable2。
    2. 之后有一个关键调用是在AnonymousObservableSink 的on方法:
    func on(_ event: Event<Element>) {
            #if DEBUG
                self.synchronizationTracker.register(synchronizationErrorMessage: .default)
                defer { self.synchronizationTracker.unregister() }
            #endif
            switch event {
            case .next:
                if load(self.isStopped) == 1 {
                    return
                }
                self.forwardOn(event)
            case .error, .completed:
                if fetchOr(self.isStopped, 1) == 0 {
                    self.forwardOn(event)
                    self.dispose()
                }
            }
        }
    

    但我们调用完forwardOn后,接着调用self.dispose 将当前sink dispose, 这时根据上面的关系图,会调用sink内的cancel的dispose. 然后调用SinkDisposer的dispose,这个时候会将state改成1,除此之外不会做其他事情,因为原 state == 0,

    1. 接下去整个run方法执行完了,在执行run后面的方法:
      disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
            self.sink = sink
            self.subscription = subscription
    
            let previousState = fetchOr(self.state, DisposeState.sinkAndSubscriptionSet.rawValue)
            if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
                rxFatalError("Sink and subscription were already set")
            }
    
            if (previousState & DisposeState.disposed.rawValue) != 0 {
                sink.dispose()
                subscription.dispose()
                self.sink = nil
                self.subscription = nil
            }
        }
    
    1. 在这个方法中因为第2步把state改成了1,所以执行sink.dispose subscription.dispose(), 然后按图中的箭头顺序往下调。执行subscription.dispose时打印dispose1,这样disposable1执行完dispose了,结束!只剩下空壳BinaryDisposable. 已经无关紧要了。

    总结

    理顺各个dispose类的关系图,基本就没问题了。

    相关文章

      网友评论

        本文标题:02. RxSwift源码解读:dispose流程

        本文链接:https://www.haomeiwen.com/subject/ddhpjltx.html