RxSwift之disposable
我们知道 RxSwift 中有四大天王,今天我们来介绍下四大天王之一,掌管 RxSwift 世界中序列生死簿的阎罗王----disposable。
demo
let ob = Observable<Any>.create { (observer) -> Disposable in
observer.onNext("123")
return Disposables.create {print("sinkDisposer销毁了")}
}
let disposable = ob.subscribe(onNext: { (str) in
print("订阅到:\(str)")
}, onError: { (error) in
print(error)
}, onCompleted: {
print("结束了")
}) {
print("disposable销毁了")
}
disposable.dispose();
输出:
订阅到:123
sinkDisposer销毁了
disposable销毁了
这里打印的内容:sinkDisposer、disposable 是这个销毁操作的执行者,不要在意这么一点剧透。
demo 中明显跟销毁者有关的有三点:
-
create闭包
中Disposables
的创建 -
subscribe
函数最后一个参数onDisposed闭包
-
disposable.dispose()
执行销毁操作
前两个都是被回调时才触发的,只有最后一个是主动调用。
这么几句代码让我想起了功夫巨星李小龙的最后一个电影《死亡游戏》,影片讲述了一位绝顶高手,为了夺取传说中的稀世珍宝而独闯七层宝塔,经过连场激战后到达塔顶,岂枓藏宝匣内没有稀世珍宝,只有一张纸,上面写着:“生命是一段等待死亡的历程”。
create闭包
的回调执行会返回Disposables
创建的销毁者,刚刚出生就开始了走向死亡的旅程,由生而死,因果循环。
AnonymousDisposable
在探究销毁流程前,最好先了解下相关的类和协议。Disposables
是个单纯的结构体:
public struct Disposables {
private init() {}
}
extension Disposables {
public static func create(with dispose: @escaping () -> Void) -> Cancelable {
return AnonymousDisposable(disposeAction: dispose)
}
}
从create
函数中看出,create闭包
返回的实际上是一个AnonymousDisposable
对象。
fileprivate final class AnonymousDisposable : DisposeBase, Cancelable {
public typealias DisposeAction = () -> Void
private let _isDisposed = AtomicInt(0)
private var _disposeAction: DisposeAction?
fileprivate init(disposeAction: @escaping DisposeAction) {
self._disposeAction = disposeAction
super.init()
}
fileprivate func dispose() {
if fetchOr(self._isDisposed, 1) == 0 {
if let action = self._disposeAction {
self._disposeAction = nil
action()
}
}
}
}
AnonymousDisposable
是DisposeBase
的子类,也遵守了Cancelable
协议和它的父协议Disposable
。
public protocol Disposable {
func dispose()
}
public protocol Cancelable : Disposable {
var isDisposed: Bool { get }
}
AnonymousDisposable
除了初始化时保存一个DisposeAction
闭包外,就是dispose
函数了。这里用了AtomicInt
的fetchOr
做标记,保证if
语句中的代码只执行一次。
func fetchOr(_ this: AtomicInt, _ mask: Int32) -> Int32 {
this.lock()
let oldValue = this.value
this.value |= mask
this.unlock()
return oldValue
}
fetchOr
的实现中,用按位或|
运算符来操作AtomicInt
的value
,位运算操作的是补码。value 的初值是0:_isDisposed = AtomicInt(0)
。初次调用后 this.value == (0 | 1) == 1,但是 return 的是oldValue
,也就是位运算之前的值。之后再调用的话,结果就不可能为0了。
解析
前面我们说了「刚刚出生就开始了走向死亡的旅程」,那我们就看看这个销毁者是怎么伴随序列的一生的?就像之前RxSwift核心逻辑中一样,先看看序列的产生。
销毁者的伴生
序列创建时,只是创建了一个保存create闭包
的AnonymousObservable
。但是create闭包
里面就是销毁者的种子(返回值)。当后面开始回调create闭包
时就会产生一个销毁者。
之前分析RxSwift核心逻辑中的订阅部分时,代码中省略了销毁者部分,现在省略观察者那部分代码后再来看看:
public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
-> Disposable {
let disposable: Disposable
if let disposed = onDisposed {
disposable = Disposables.create(with: disposed)
} else {
disposable = Disposables.create()
}
......
let observer = AnonymousObserver<Element> { event in
......
}
return Disposables.create(
self.asObservable().subscribe(observer),
disposable
)
}
在 return 之前,主要是把onDisposed闭包
通过Disposables.create
封装成AnonymousDisposable
。那么这里 return 的是什么呢?从 demo 中看起来就是个Disposable
,但我们知道很可能也是些内部类,点进去:
extension Disposables {
public static func create(_ disposable1: Disposable, _ disposable2: Disposable) -> Cancelable {
return BinaryDisposable(disposable1, disposable2)
}
}
是个二元销毁者BinaryDisposable
,里面装了两个Disposable
!这两个大宝贝真的是Disposable
么?最起码二号大宝贝我们是知道是AnonymousDisposable
。那一号大宝贝呢?看看序列真正的订阅函数:
override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
if !CurrentThreadScheduler.isScheduleRequired {
......
} else {
return CurrentThreadScheduler.instance.schedule(()) { _ in
let disposer = SinkDisposer()
let sinkAndSubscription = self.run(observer, cancel: disposer)
disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
return disposer
}
}
}
通过RxSwift之scheduler的学习,我们知道此时并没有做线程相关的操作,所以这里需要用当前线程调度者去调用schedule
,初次调用会回调action闭包
,返回的也是这个action闭包
的返回值,就不进去看了。
很明显的,一号大宝贝就是SinkDisposer
。那它跟着序列去run
函数里面干啥了?
final private class AnonymousObservable<Element>: Producer<Element> {
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
let subscription = sink.run(self)
return (sink: sink, subscription: subscription)
}
}
RxSwift核心逻辑中管道是很重要的,现在看来它创建的时候就伴随着销毁者。下一句管道在run
的时候就是在回调create闭包
。在 demo 中create闭包
执行完会返回一个Disposable
。前面我们知道create闭包
的返回值是AnonymousDisposable
,那这个subscription
就是AnonymousDisposable
。
然后就把AnonymousObservableSink
和AnonymousDisposable
打包进元组给返回了。
在回到序列的订阅函数subscribe(observer:)
,一号大宝贝SinkDisposer
对这个元组做了一些设置:
fileprivate final class SinkDisposer: Cancelable {
fileprivate enum DisposeState: Int32 {
case disposed = 1 //已销毁
case sinkAndSubscriptionSet = 2 //设置
}
private let _state = AtomicInt(0) //默认状态0
private var _sink: Disposable?
private var _subscription: Disposable?
......
func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
self._sink = sink
self._subscription = subscription
let previousState = fetchOr(self._state, DisposeState.sinkAndSubscriptionSet.rawValue)
if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
rxFatalError("Sink and subscription were already set")
}
if (previousState & DisposeState.disposed.rawValue) != 0 {
sink.dispose()
subscription.dispose()
self._sink = nil
self._subscription = nil
}
}
}
setSinkAndSubscription
中,一号大宝贝除了保存_sink
和_subscription
,也没做什么。
fetchOr
函数前面介绍过了,初值为0的情况下,和sinkAndSubscriptionSet = 2
状态2按位或后结果是2,但返回的是旧值0。不满足条件,下面都不走。
到现在的流程:
- 序列订阅
- 回调序列的
create闭包
- 发出
.next
元素,然后打印“123”
销毁
最后一句是执行销毁,输出「sinkDisposer销毁,disposable销毁」。我们看一下二元销毁者的实现就知道了:
private final class BinaryDisposable : DisposeBase, Cancelable {
func dispose() {
if fetchOr(self._isDisposed, 1) == 0 {
self._disposable1?.dispose()
self._disposable2?.dispose()
self._disposable1 = nil
self._disposable2 = nil
}
}
}
看到了,先让一号大宝贝去销毁,然后才是二号大宝贝。
一号大宝贝SinkDisposer
的dispose
fileprivate final class SinkDisposer: Cancelable {
fileprivate enum DisposeState: Int32 {
case disposed = 1 //已销毁
case sinkAndSubscriptionSet = 2 //设置
}
private let _state = AtomicInt(0) //默认状态0
......
func dispose() {
let previousState = fetchOr(self._state, DisposeState.disposed.rawValue)
if (previousState & DisposeState.disposed.rawValue) != 0 {
return
}
if (previousState & DisposeState.sinkAndSubscriptionSet.rawValue) != 0 {
......
sink.dispose()
subscription.dispose()
self._sink = nil
self._subscription = nil
}
}
}
之前一号大宝贝设置了_sink
和_subscription
,其实就是保存了AnonymousObservableSink
和 create 闭包返回值AnonymousDisposable
。经过上次的设置,它的_state
中的value
已经是2了。
现在调用dispose
。fetchOr
的返回的旧值还是2,但里面的value
== (2 | 1) == 3。
后面的判断语句都是&
,这就是那种可以随意组合枚举值后,用按位与来判断某个枚举值。
第一个条件:2 & 1 == 0,不满足,跳过继续下一步。
第二个条件成立:2 & 2 == 2,然后让_sink
和_subscription
分别去dispose
。
_sink
的dispose
:
class Sink<Observer: ObserverType> : Disposable {
func dispose() {
fetchOr(self._disposed, 1)
self._cancel.dispose()
}
}
_sink
用fetchOr
做了个标记后,又把球踢回去,让SinkDisposer
调用dispose
。然后又回来分析上一个代码。上一次value
== 3了,那它与上任何一个枚举值都是true
,所以第一个条件满足,直接return
。
继续_subscription
的dispose
,AnonymousDisposable
的销毁,第二部分介绍这个类的时候已经看过了,就是回调一下闭包。此时此刻就打印了「sinkDisposer销毁」。然后_sink
和_subscription
都置nil
。
最后就是二号大宝贝了,都是AnonymousDisposable
类型,销毁时回调下闭包就哦了,打印「disposable销毁」。
在这源码阅读的过程中,经常会看到调用
dispose
函数的,拿到序列就可以调用,发出.completed
,.error
时也会调用,装进DisposeBag
,在 Bag 销毁时也会把里面的序列全销毁掉。dispose
函数都可以保证只执行一次操作。
整个销毁流程,除了保证只回调一次闭包外就是干掉了Sink
。Sink
是 RxSwift 响应式编程的核心,它知道的太多了,销毁它整个响应通道就断了。
网友评论