美文网首页
RxSwift之disposable

RxSwift之disposable

作者: silasjs | 来源:发表于2019-08-06 16:11 被阅读0次

RxSwift之disposable

我们知道 RxSwift 中有四大天王,今天我们来介绍下四大天王之一,掌管 RxSwift 世界中序列生死簿的阎罗王----disposable。

demo

let ob = Observable<Any>.create { (observer) -> Disposable in
    observer.onNext("123")
    return Disposables.create {print("sinkDisposer销毁了")}
}
let disposable = ob.subscribe(onNext: { (str) in
    print("订阅到:\(str)")
}, onError: { (error) in
    print(error)
}, onCompleted: {
    print("结束了")
}) {
    print("disposable销毁了")
}

disposable.dispose();

输出:
订阅到:123
sinkDisposer销毁了
disposable销毁了

这里打印的内容:sinkDisposer、disposable 是这个销毁操作的执行者,不要在意这么一点剧透。

demo 中明显跟销毁者有关的有三点:

  1. create闭包Disposables的创建
  2. subscribe函数最后一个参数onDisposed闭包
  3. disposable.dispose()执行销毁操作

前两个都是被回调时才触发的,只有最后一个是主动调用。

这么几句代码让我想起了功夫巨星李小龙的最后一个电影《死亡游戏》,影片讲述了一位绝顶高手,为了夺取传说中的稀世珍宝而独闯七层宝塔,经过连场激战后到达塔顶,岂枓藏宝匣内没有稀世珍宝,只有一张纸,上面写着:“生命是一段等待死亡的历程”。

create闭包的回调执行会返回Disposables创建的销毁者,刚刚出生就开始了走向死亡的旅程,由生而死,因果循环。

AnonymousDisposable

在探究销毁流程前,最好先了解下相关的类和协议。Disposables是个单纯的结构体:

public struct Disposables {
    private init() {}
}
extension Disposables {
    public static func create(with dispose: @escaping () -> Void) -> Cancelable {
        return AnonymousDisposable(disposeAction: dispose)
    }
}

create函数中看出,create闭包返回的实际上是一个AnonymousDisposable对象。

fileprivate final class AnonymousDisposable : DisposeBase, Cancelable {
    public typealias DisposeAction = () -> Void

    private let _isDisposed = AtomicInt(0)
    private var _disposeAction: DisposeAction?

    fileprivate init(disposeAction: @escaping DisposeAction) {
        self._disposeAction = disposeAction
        super.init()
    }
    
    fileprivate func dispose() {
        if fetchOr(self._isDisposed, 1) == 0 {
            if let action = self._disposeAction {
                self._disposeAction = nil
                action()
            }
        }
    }
}

AnonymousDisposableDisposeBase的子类,也遵守了Cancelable协议和它的父协议Disposable

public protocol Disposable {
    func dispose()
}
public protocol Cancelable : Disposable {
    var isDisposed: Bool { get }
}

AnonymousDisposable除了初始化时保存一个DisposeAction闭包外,就是dispose函数了。这里用了AtomicIntfetchOr做标记,保证if语句中的代码只执行一次。

func fetchOr(_ this: AtomicInt, _ mask: Int32) -> Int32 {
    this.lock()
    let oldValue = this.value
    this.value |= mask
    this.unlock()
    return oldValue
}

fetchOr的实现中,用按位或|运算符来操作AtomicIntvalue位运算操作的是补码。value 的初值是0:_isDisposed = AtomicInt(0)。初次调用后 this.value == (0 | 1) == 1,但是 return 的是oldValue,也就是位运算之前的值。之后再调用的话,结果就不可能为0了。

解析

前面我们说了「刚刚出生就开始了走向死亡的旅程」,那我们就看看这个销毁者是怎么伴随序列的一生的?就像之前RxSwift核心逻辑中一样,先看看序列的产生。

销毁者的伴生

序列创建时,只是创建了一个保存create闭包AnonymousObservable。但是create闭包里面就是销毁者的种子(返回值)。当后面开始回调create闭包时就会产生一个销毁者。

之前分析RxSwift核心逻辑中的订阅部分时,代码中省略了销毁者部分,现在省略观察者那部分代码后再来看看:

public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
    -> Disposable {
    let disposable: Disposable
        
    if let disposed = onDisposed {
        disposable = Disposables.create(with: disposed)
    } else {
        disposable = Disposables.create()
    }
    ...... 
    let observer = AnonymousObserver<Element> { event in
        ......
    }
    return Disposables.create(
        self.asObservable().subscribe(observer),
        disposable
    )
}

在 return 之前,主要是把onDisposed闭包通过Disposables.create封装成AnonymousDisposable。那么这里 return 的是什么呢?从 demo 中看起来就是个Disposable,但我们知道很可能也是些内部类,点进去:

extension Disposables {
    public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable {
        return BinaryDisposable(disposable1, disposable2)
    }
}

是个二元销毁者BinaryDisposable,里面装了两个Disposable!这两个大宝贝真的是Disposable么?最起码二号大宝贝我们是知道是AnonymousDisposable。那一号大宝贝呢?看看序列真正的订阅函数:

override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
    if !CurrentThreadScheduler.isScheduleRequired {
        ......
    } else {
        return CurrentThreadScheduler.instance.schedule(()) { _ in
            let disposer = SinkDisposer()
            let sinkAndSubscription = self.run(observer, cancel: disposer)
            disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
            
            return disposer
        }
    }
}

通过RxSwift之scheduler的学习,我们知道此时并没有做线程相关的操作,所以这里需要用当前线程调度者去调用schedule,初次调用会回调action闭包,返回的也是这个action闭包的返回值,就不进去看了。

很明显的,一号大宝贝就是SinkDisposer。那它跟着序列去run函数里面干啥了?

final private class AnonymousObservable<Element>: Producer<Element> {
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

RxSwift核心逻辑中管道是很重要的,现在看来它创建的时候就伴随着销毁者。下一句管道在run的时候就是在回调create闭包。在 demo 中create闭包执行完会返回一个Disposable。前面我们知道create闭包的返回值是AnonymousDisposable,那这个subscription就是AnonymousDisposable

然后就把AnonymousObservableSinkAnonymousDisposable打包进元组给返回了。

在回到序列的订阅函数subscribe(observer:),一号大宝贝SinkDisposer对这个元组做了一些设置:

fileprivate final class SinkDisposer: Cancelable {
    fileprivate enum DisposeState: Int32 {
        case disposed = 1                //已销毁
        case sinkAndSubscriptionSet = 2  //设置
    }

    private let _state = AtomicInt(0)    //默认状态0
    private var _sink: Disposable?
    private var _subscription: Disposable?
    ......
    func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
        self._sink = sink
        self._subscription = subscription

        let previousState = fetchOr(self._state, DisposeState.sinkAndSubscriptionSet.rawValue)
        if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
            rxFatalError("Sink and subscription were already set")
        }
        
        if (previousState & DisposeState.disposed.rawValue) != 0 {
            sink.dispose()
            subscription.dispose()
            self._sink = nil
            self._subscription = nil
        }
    }
}

setSinkAndSubscription中,一号大宝贝除了保存_sink_subscription,也没做什么。

fetchOr函数前面介绍过了,初值为0的情况下,和sinkAndSubscriptionSet = 2状态2按位或后结果是2,但返回的是旧值0。不满足条件,下面都不走。

到现在的流程

  1. 序列订阅
  2. 回调序列的create闭包
  3. 发出.next元素,然后打印“123”

销毁

最后一句是执行销毁,输出「sinkDisposer销毁,disposable销毁」。我们看一下二元销毁者的实现就知道了:

private final class BinaryDisposable : DisposeBase, Cancelable {
    func dispose() {
        if fetchOr(self._isDisposed, 1) == 0 {
            self._disposable1?.dispose()
            self._disposable2?.dispose()
            self._disposable1 = nil
            self._disposable2 = nil
        }
    }
}

看到了,先让一号大宝贝去销毁,然后才是二号大宝贝。

一号大宝贝SinkDisposerdispose

fileprivate final class SinkDisposer: Cancelable {
    fileprivate enum DisposeState: Int32 {
        case disposed = 1                //已销毁
        case sinkAndSubscriptionSet = 2  //设置
    }

    private let _state = AtomicInt(0)    //默认状态0
    ......

    func dispose() {
        let previousState = fetchOr(self._state, DisposeState.disposed.rawValue)

        if (previousState & DisposeState.disposed.rawValue) != 0 {
            return
        }

        if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
            ......
            sink.dispose()
            subscription.dispose()

            self._sink = nil
            self._subscription = nil
        }
    }
}

之前一号大宝贝设置了_sink_subscription,其实就是保存了AnonymousObservableSink和 create 闭包返回值AnonymousDisposable。经过上次的设置,它的_state中的value已经是2了。

现在调用disposefetchOr的返回的旧值还是2,但里面的value == (2 | 1) == 3。

后面的判断语句都是&,这就是那种可以随意组合枚举值后,用按位与来判断某个枚举值。

第一个条件:2 & 1 == 0,不满足,跳过继续下一步。

第二个条件成立:2 & 2 == 2,然后让_sink_subscription分别去dispose

_sinkdispose

class Sink<Observer: ObserverType> : Disposable {
    func dispose() {
        fetchOr(self._disposed, 1)
        self._cancel.dispose()
    }
}

_sinkfetchOr做了个标记后,又把球踢回去,让SinkDisposer调用dispose。然后又回来分析上一个代码。上一次value == 3了,那它与上任何一个枚举值都是true,所以第一个条件满足,直接return

继续_subscriptiondisposeAnonymousDisposable的销毁,第二部分介绍这个类的时候已经看过了,就是回调一下闭包。此时此刻就打印了「sinkDisposer销毁」。然后_sink_subscription都置nil

最后就是二号大宝贝了,都是AnonymousDisposable类型,销毁时回调下闭包就哦了,打印「disposable销毁」。

在这源码阅读的过程中,经常会看到调用dispose函数的,拿到序列就可以调用,发出.completed.error时也会调用,装进DisposeBag,在 Bag 销毁时也会把里面的序列全销毁掉。dispose函数都可以保证只执行一次操作。

整个销毁流程,除了保证只回调一次闭包外就是干掉了SinkSink是 RxSwift 响应式编程的核心,它知道的太多了,销毁它整个响应通道就断了。

相关文章

网友评论

      本文标题:RxSwift之disposable

      本文链接:https://www.haomeiwen.com/subject/pfrwdctx.html