在ReactiveSwift框架分析系列文章1中,我们知道需要三个简单的步骤来创建一个信号并观察它:
- 创建一个pipe管道,同时创建了一个输入端Observer,一个输出端Signal
- 创建观察者,订阅Signal。
- 通过输入端Observer发送值给Signal,这将触发所有订阅了该信号的观察者执行与观察者关联的闭包。
那这篇文章我们就来通过分析源码看看它究竟是怎样实现的。
Observer
An Observer is a simple wrapper around a function which can receive Events.
上篇文章中我们知道,Observer
封装了Event的处理逻辑,以下是其源码:
public final class Observer {
public typealias Action = (Event) -> Void
private let _send: Action
// Whether the observer should send an `interrupted` event as it deinitializes.
private let interruptsOnDeinit: Bool
internal init(action: @escaping Action, interruptsOnDeinit: Bool) {
self._send = action
self.interruptsOnDeinit = interruptsOnDeinit
}
public init(_ action: @escaping Action) {
self._send = action
self.interruptsOnDeinit = false
}
public func send(_ event: Event) {
_send(event)
}
public func send(value: Value) {
_send(.value(value))
}
public func send(error: Error) {
_send(.failed(error))
}
public func sendInterrupted() {
_send(.interrupted)
}
}
从源码可以看出,Observer内部有一个 (Event) -> Void的闭包_send
,在初始化Observer
的时候就是给这个闭包赋值,调用Observer一系列send方法就是在执行这个闭包,很简单!
Signal
Signal是事件流,event是其基本单元,它会主动将事件event向外发送,是热信号,不会等到有人订阅后才开始发送,Signal中包含一个核心属性:Core
属性,Signal其实只是包装Core的一个壳而已,所以我们主要还是来分析Core
的源码:
private let core: Core
private final class Core {
private let disposable: CompositeDisposable
private let stateLock: Lock
private let sendLock: Lock
private var state: State
......
}
Core
里面的State
属性,确切来说是一个枚举:
private enum State {
case alive(Bag<Observer>, hasDeinitialized: Bool)
case terminating(Bag<Observer>, TerminationKind)
case terminated
......
}
其中,除了alive,terminating关联一个Observer数组外,状态terminated是不带关联数组的,这意味着当信号切换到terminated状态时,那么那些被保存的Observer对象也就跟着释放了,所以当不再需要使用信号时,应该向信号发送一个非Value事件确保资源释放。
State
的作用有两个:
- 指示信号的状态。
- alive,terminating会保存信号订阅者添加进来的Observer对象。
那Observer
对象是怎么通过observe方法被添加进来的呢?先看下该方法具体实现:
fileprivate func observe(_ observer: Observer) -> Disposable? {
var token: Bag<Observer>.Token?
stateLock.lock()
if case let .alive(observers, hasDeinitialized) = state {
//1.信号处于state.alive状态,将新的observer对象添加到 state.alive的数组中
var newObservers = observers
token = newObservers.insert(observer)
self.state = .alive(newObservers, hasDeinitialized: hasDeinitialized)
}
stateLock.unlock()
//2. 如果1顺利执行,token会被赋值(即信号处于alive状态) 返回一个Disposable对象
if let token = token {
return AnyDisposable { [weak self] in
self?.removeObserver(with: token)
}
} else { //如果1顺利执行,直接向observer对象发送interrupted事件
observer.sendInterrupted()
return nil
}
}
从源码我们可以看出,通过observe
方法添加并保存了Observer,也就是保存了Event的处理逻辑。
那数据如何发送的,如何处理Observer内部的处理逻辑的呢?
再来分析Core的send方法的源码:
private func send(_ event: Event) {
if event.isTerminating { // 收到非Value的Event,信号状态从.alive切换到.terminating
self.stateLock.lock()
// 将.alive中的Observer数组移到.terminating中,遍历. terminating的数组observers,调用send函数
if case let .alive(observers, _) = state {
self.state = .terminating(observers, .init(event))
self.stateLock.unlock()
} else {
self.stateLock.unlock()
}
tryToCommitTermination()
} else { // 收到Value的Event
self.sendLock.lock()
self.stateLock.lock()
//遍历.alive的数组observers,调用send函数
if case let .alive(observers, _) = self.state {
self.stateLock.unlock()
for observer in observers {
observer.send(event)
}
} else {
self.stateLock.unlock()
}
self.sendLock.unlock()
stateLock.lock()
//遍历. terminating的数组observers,调用send函数
if case .terminating = state {
stateLock.unlock()
tryToCommitTermination()
} else {
stateLock.unlock()
}
}
}
private func tryToCommitTermination() {
......
if case let .terminating(observers, terminationKind) = state {
//切换状态到.terminated,遍历. terminating的数组observers,调用send函数
state = .terminated
if let event = terminationKind.materialize() {
for observer in observers {
observer.send(event)
}
}
}
}
可以看出,配合着切换状态,最终send方法中都是遍历Observers数组,让其中各Observer调用send方法。
我们分析了如何将Observer添加Signal的数组中,Observer的send方法,接下来再看看创建管道的pipe方法。
public static func pipe(disposable: Disposable? = nil) -> (output: Signal, input: Observer) {
var observer: Observer!
let signal = self.init { innerObserver, lifetime in
observer = innerObserver
lifetime += disposable
}
return (signal, observer)
}
//Signal.init()
public init(_ generator: (Observer, Lifetime) -> Void) {
core = Core(generator)
}
//Core.init()
fileprivate init(_ generator: (Observer, Lifetime) -> Void) {
//1. 设置信号初始状态为alive 同时初始化alive中的数组
state = .alive(Bag(), hasDeinitialized: false)
......
//2. 创建一个Observer对象,并将Core.send赋值给该对象的_send闭包
generator(Observer(action: self.send, interruptsOnDeinit: true), Lifetime(disposable))
}
从源码可以看出, pipe()
函数中包含Signal的初始化创建,而Signal的初始化创建中有包含Core的初始化创建,Core的初始化中又会真正执行generator: (Observer, Lifetime)
闭包,这个闭包传入一个Observer对象,所以其实pipe()
函数是通过generator闭包捕获了Core.init()
中的Observer
对象的,而这个Observer
对象的_send
闭包可以看出其实是Core.send
,最后pipe()
将返回Signal和Observer。
所以,pipe().output
(即Signal)的作用是管理信号状态并保存由订阅者提供的Observer对象, 而pipe().input
(即InnerObserver)的作用则是在接收到Event后依次执行这些被保存的Observer._send
。
实例展示,加深印象!!
上面讲过,通过Signal.pipe()
函数会返回一个元组,元组的第一个值是output(类型为Signal),第二个值是input(类型为Observer)。
创建如下:
let signalTuple = Signal<Int, NoError>.pipe()
let (signal, observer) = Signal<Int, NoError>.pipe()
我们在viewDidLoad
中创建一个热信号,并发送数据:
typealias NSignal<T> = Signal<T, NoError>
override func viewDidLoad() {
super.viewDidLoad()
let (signal, innerObserver) = NSignal<Int>.pipe()
//create Observer and add it into Signal
signal.observeValues { (value) in
print("did received value: \(value)")
}
//create Observer and add it into Signal again
signal.observeValues { (value) in
print("did received value: \(value)")
}
innerObserver.send(value: 1)
innerObserver.sendCompleted()
}
//输出结果: did received value: 1
did received value: 1
Signal.observeValues
,是Signal.observe
的一个便利函数, 作用是创建一个只处理Value事件的Observer并添加到Signal中, 类似的还有只处理Failed事件的Signal.observeFailed和所有事件都能处理的Signal.observeResult。
再来看一个ViewModel中的Demo:
typealias NSignal<T> = Signal<T, NoError>
//ViewModel.swift
class ViewModel {
let signal: NSignal<Int>
let innerObserver: NSignal<Int>.Observer
init() {
(signal, innerObserver) = NSignal<Int>.pipe()
}
}
//View1.swift
class View1 {
func bind(viewModel: ViewModel) {
viewModel.signal.observeValues { (value) in
print("View1 received value: \(value)")
}
}
}
//View2.swift
class View2 {
func bind(viewModel: ViewModel) {
viewModel.signal.observeValues { (value) in
print("View2 received value: \(value)")
}
}
}
//View3.swift
class View3 {
func bind(viewModel: ViewModel) {
viewModel.signal.observeValues { (value) in
print("View3 received value: \(value)")
}
viewModel.signal.observeInterrupted {
print("View3 received interrupted")
}
}
}
override func viewDidLoad() {
super.viewDidLoad()
let view1 = View1()
let view2 = View2()
let view3 = View3()
let viewModel = ViewModel()
view1.bind(viewModel: viewModel)
viewModel.innerObserver.send(value: 1)
view2.bind(viewModel: viewModel)
viewModel.innerObserver.send(value: 2)
viewModel.innerObserver.sendCompleted()//发送一个非Value事件 信号无效
view3.bind(viewModel: viewModel)//信号无效后才订阅
viewModel.innerObserver.send(value: 3)//信号无效后发送事件
}
输出: View1 received value: 1
View1 received value: 2
View2 received value: 2
View3 received interrupted
view2的订阅时间晚于value1的发送时间,所以view2收不到value1对应的事件,而之后发送一个非Value的事件后,信号便无效了。所以虽然view1和view2的订阅都早于value3的发送时间,但因为value3在信号发送前先发送了completed,结果就是view1和view2都不会收到value3事件,当然view3也不会收到value3事件,它只会收到一个interrupted。
小结
我们通过pipe()
方法,创建一个‘管道’,这个‘管道’其实是个元组,元素output代表输出端,类型为Signal <Int,NoError> , 元素input代表输入端,类型为Observer <Int,NoError>,作为input的Observer发送数据,将事件注入output的Signal,同时触发绑定到Signal中的所有Observer调用send函数。
网友评论