美文网首页RxSwiftRx
02. RxSwift源码解读:dispose流程

02. RxSwift源码解读:dispose流程

作者: Oceanj | 来源:发表于2021-05-20 16:36 被阅读0次

上一篇文章解读了创建Observable和订阅Observable的流程,今天继续解读dispose源码。

dispose即订阅取消,取消后相关内存资源释放,包括被观察者,观察者,之后无法再发送序列,也无法接收序列。
我们可以对dispose事件进行订阅,这样订阅取消后会调用对应闭包。我们还是拿上一篇文章的代码举例:

       let observable = Observable<Int>.create { (anyObserver) -> Disposable in
            anyObserver.onNext(1)
            return Disposables.create { print("disposed 1")}
        }
        let dispose = observable.subscribe(onNext: { ele in
            print(ele)
        }, onDisposed:  {
            print("dispose2")
        })
        dispose.dispose()

当我们创建Observable时,在闭包中返回一个Disposables.create { print("disposed 1")} 或者Disposables.create(), 如果订阅取消,会打印disposed 1,我们同时通过onDisposed订阅了订阅取消的事件,这样还会打印disposed2,那先打印哪个呢,经过测试先打印disposed1 再打印disposed 2; 关于dispose,涉及到一下几个结构体或协议。

  • 协议:DisposableCancelable,在上篇文章已经对两个协议做了讲解。
  • 结构体:Disposables,可以把它理解为一个工厂,创建各种类型的Disposable,它有如下几个工厂方法
    // Create AnonymousDisposable
    public static func create(with dispose: @escaping () -> Void) -> Cancelable {
        return AnonymousDisposable(disposeAction: dispose)
    }
    // Create BinaryDisposable
    public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable {
        return BinaryDisposable(disposable1, disposable2)
    }
    /// Creates a disposable with the given disposables.
    public static func create(_ disposable1: Disposable, _ disposable2: Disposable, _ disposable3: Disposable) -> Cancelable {
        return CompositeDisposable(disposable1, disposable2, disposable3)
    }
    
    /// Creates a disposable with the given disposables.
    public static func create(_ disposable1: Disposable, _ disposable2: Disposable, _ disposable3: Disposable, _ disposables: Disposable ...) -> Cancelable {
        var disposables = disposables
        disposables.append(disposable1)
        disposables.append(disposable2)
        disposables.append(disposable3)
        return CompositeDisposable(disposables: disposables)
    }
    
    /// Creates a disposable with the given disposables.
    public static func create(_ disposables: [Disposable]) -> Cancelable {
        switch disposables.count {
        case 2:
            return Disposables.create(disposables[0], disposables[1])
        default:
            return CompositeDisposable(disposables: disposables)
        }
    }
    static public func create() -> Disposable {
        return NopDisposable.noOp
    }

分别创建了AnonymousDisposable BinaryDisposable CompositeDisposable NopDisposable,这些都直接或间接遵循了Disposeable协议,所以都实现了自己的dispose方法。AnonymousDisposable BinaryDisposable CompositeDisposable 都继承了DisposeBase, DisposeBase 代码很简单,用来实现rxswift自身的引用计数:初始化时调用Resources.incrementTotal() 增加引用计数,释放时调用Resources.decrementTotal()

  • AnonymousDisposable 遵循了Cancelable, 保存一个闭包,当调用dispose时会调用这个闭包,我们例子中的代码Disposables.create { print("disposed 1")} 创建的就是一个AnonymousDisposable,我们看看这个dispose代码:
    private let _isDisposed = AtomicInt(0)
    /// - returns: Was resource disposed.
    public var isDisposed: Bool {
        return isFlagSet(self._isDisposed, 1)
    }
    /// Calls the disposal action if and only if the current instance hasn't been disposed yet.
    ///
    /// After invoking disposal action, disposal action will be dereferenced.
    fileprivate func dispose() {
        if fetchOr(self._isDisposed, 1) == 0 {
            if let action = self._disposeAction {
                self._disposeAction = nil
                action()
            }
        }
    }

_isDisposed 是一个AtomicInt,它是一个Int整型数据的封装,对Int操作是线程安全的,有兴趣自己看源码。
首先调用fetchOr(self._isDisposed, 1) 将_isDisposed的value 和 1进行或运算操作,返回原value,并写回value。如果原value == 0则返回true,这时将value修改为1,并执行action, 这样做的目的是为了防止action被多次调用,一般只调用一次。所以上面的 isDisposed方法实现中如果value==1 则返回true,表示已经释放资源,否则返回false。

  • BinaryDisposable 同样遵循了Cancelable,包含两个Disposable,dispose时会调用这两个Disposable的dispose,代码如下:
   func dispose() {
        if fetchOr(self._isDisposed, 1) == 0 {
            self._disposable1?.dispose()
            self._disposable2?.dispose()
            self._disposable1 = nil
            self._disposable2 = nil
        }
    }

所以我们可以将两个AnonymousDisposable或者任意Disposable 包装到BinaryDisposable里。

  • CompositeDisposable 可以包含任意多的Disposable,使用Bag这个结构体去存放所有Disposable对象,Bag使用ContiguousArray和 Dictionary保存所有Disposable,Bag的具体实现细节以后有机会再说吧!
  • NopDisposable 这个diposeable不做任何操作,它的dispose的实现是空的,当我们不需要释放资源时可以用这个类。
    刚刚我们说到Bag,我们还有个类叫DisposeBag这个在开发中用的最多。开发中一般把disposable放入DispoaseBag,当DisposeBag被释放时,里面的所有disposable会调用dispose。我们把例子中的代码改下:
  // 在当前类中定义bag,当当前对象被释放时bag被释放,此时dispose被释放。
  let bag = DisposeBag() 
 // 将dispose放入bag
  dispose.disposed(by: bag)

我们看看DisposeBag的源码,它也继承了DisposeBase,没有遵循Cancelable,包含一个数组_disposables = [Disposable](), 保存了所有disposable,实现了insert dispose等方法。在deinit方法中调用dispose方法,dispose方法中遍历数组释放所有disposeable。

下面开始讲解dispose的流程,当我们调用订阅onDisposed时,我们进入subscribe方法看看:

public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable {
            let disposable: Disposable
            
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
            } 
            /// 此处代码省略。。。
            
            let observer = /// 此处代码省略。。。
            return Disposables.create(
                self.asObservable().subscribe(observer),
                disposable
            )
    }
  1. 如果外面实现了onDisposed方法,则创建Disposables.create(with: disposed) 并把onDisposed传入,它创建AnonymousDisposable(disposeAction: dispose),否则Disposables.create(),这个创建NopDisposable.noOp,上面已经对AnonymousDisposableNopDisposable解释过了,AnonymousDisposable包含一个闭包,dispose时会调用这个闭包,NopDisposable不做任何事情。将这个disposable传给 Disposables.create(disposable1, disposable2), 另一个参数是self.asObservable().subscribe(observer)的返回值。我们进入这个subscribe看代码:
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
        // 代码省略。。。
            return CurrentThreadScheduler.instance.schedule(()) { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)

                return disposer
            }
    }
  1. 创建了一个let disposer = SinkDisposer() 并返回,SinkDisposer是一个类,遵循了Cancelable,保存两个Disposable,一个是sink一个是subscription,通过调用disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription),给这两个成员赋值。而sinksubscription又是通过self.run(observer, cancel: disposer)返回, 我们看看这个run方法:
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }

又调用了sink.run(self)

func run(_ parent: Parent) -> Disposable {
        parent.subscribeHandler(AnyObserver(self))
 }

subscriptionsubscribeHandler闭包的返回值,即文章开头的例子代码中的Disposables.create { print("disposed 1")}sink就是创建的AnonymousObservableSink对象,AnonymousObservableSink的父类Sink遵循了Disposable,所以它有一个dispose方法。
这样我们大概能搞明白这些Disposable之间的关联关系了,我画了一个关系图;如下:

image.png
按照这个关系图依次调用每个类的dispose方法, 从BinaryDisposeable开始:
  1. 最外面的就是BinaryDisposable,当调用它的dispose时,先调用disposeable1dispose,这时会依次调用sinkdisposesubscriptiondispose,调用sinkdispose时会调用它保存的cancel对象的dispose,这个cancel实际上是let disposer = SinkDisposer()

注意调用cancel的dispose时,因为cancel又回到了SinkDisposer的dispose,它会判断这个类是否已调用过dispose,如果是,则停止。否则会形成一个调用死循环。我们可以看看这块的代码:

func dispose() {
        let previousState = fetchOr(self.state, DisposeState.disposed.rawValue)

        if (previousState & DisposeState.disposed.rawValue) != 0 {
            return
        }

        if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
            guard let sink = self.sink else {
                rxFatalError("Sink not set")
            }
            guard let subscription = self.subscription else {
                rxFatalError("Subscription not set")
            }

            sink.dispose()
            subscription.dispose()

            self.sink = nil
            self.subscription = nil
        }
    }

if (previousState & DisposeState.disposed.rawValue) != 0 这句判断状态是否已经dispose 是就return。

  1. 接着我们调用subscriptiondispose时会调用它保存的闭包,打印disposed 1。
    3 这条线完成后,调用disposeable2dispose,它会打印dispose2,所以打印顺序就是先打印dispose1 再打印dispose2.

说完这种情况,我们再讨论另外一种dispose方式,我们在通过调用onCompleted 释放资源,在onNext后面加句onCompleted,代码如下:

         
      let observable = Observable<Int>.create { (anyObserver) -> Disposable in
            anyObserver.onNext(1)
            anyObserver.onCompleted()
            return Disposables.create() {
                print("disposed1")
            }
        }
        let dispose = observable.subscribe(onNext: { ele in
            print(ele)
        }, onDisposed:  {
            print("disposed2")
        })

测试结果:先打印disposed2 后打印disposed1
分析下原因:

  1. 我们在调用observable.subscribe方法时,最终会调到anyObserver.onCompleted() 所在的闭包,这个过程上篇文章已经分析过了,然后调用onCompleted的时候,流程跟onNext基本一致,最后会调到ObservableType + extension的subscribe
            if let disposed = onDisposed {
                disposable = Disposables.create(with: disposed)
            }
            else {
                disposable = Disposables.create()
           }
          // 中间省略...
          case .completed:
                 onCompleted?()
                 disposable.dispose()
          }
  1. 这里不再啰嗦,注意调完onCompleted后会调用disposable.dispose(),这里明显会调到onDisposed回调,所以会先打印disposed2,这里的disposable就是图中的disposeable2。
  2. 之后有一个关键调用是在AnonymousObservableSink 的on方法:
func on(_ event: Event<Element>) {
        #if DEBUG
            self.synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { self.synchronizationTracker.unregister() }
        #endif
        switch event {
        case .next:
            if load(self.isStopped) == 1 {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self.isStopped, 1) == 0 {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }

但我们调用完forwardOn后,接着调用self.dispose 将当前sink dispose, 这时根据上面的关系图,会调用sink内的cancel的dispose. 然后调用SinkDisposer的dispose,这个时候会将state改成1,除此之外不会做其他事情,因为原 state == 0,

  1. 接下去整个run方法执行完了,在执行run后面的方法:
    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
        self.sink = sink
        self.subscription = subscription

        let previousState = fetchOr(self.state, DisposeState.sinkAndSubscriptionSet.rawValue)
        if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
            rxFatalError("Sink and subscription were already set")
        }

        if (previousState & DisposeState.disposed.rawValue) != 0 {
            sink.dispose()
            subscription.dispose()
            self.sink = nil
            self.subscription = nil
        }
    }
  1. 在这个方法中因为第2步把state改成了1,所以执行sink.dispose subscription.dispose(), 然后按图中的箭头顺序往下调。执行subscription.dispose时打印dispose1,这样disposable1执行完dispose了,结束!只剩下空壳BinaryDisposable. 已经无关紧要了。

总结

理顺各个dispose类的关系图,基本就没问题了。

相关文章

网友评论

    本文标题:02. RxSwift源码解读:dispose流程

    本文链接:https://www.haomeiwen.com/subject/ddhpjltx.html