一个信号,由
Signal
类型表现,是可以被持续监视的一系列事件(events
)。
信号一般用来表示“正在进行中”的事件流,比如通知,用户输入等。随着操作完成或者收到数据,事件在信号上发送,信号将他们推向所有的监听器。所有的监听器会在同时看到事件。
用户必须监听(
observe
)一个信号来访问它的事件。监听信号不会产生任何副作用(side effects
)。换句话说,信号完全是生成器驱动和基于推送的,而且在其生命周期内消费者(监听器)不能对它产生任何影响。监听一个信号的时候,用户只能按照事件在信号上发送的顺序处理事件。没有方法可以随机访问信号的值。
可以通过在信号上施加原函数(primitives)来操作信号。典型的操作信号的原函数有
filter
,map
,reduce
,以及一些同时操作多个信号的原函数(比如zip
)。原函数仅在信号的Next
事件上施加操作。
信号的生命周期由任意多个
Next
事件,和一个紧随其后的终结事件组成,终结事件可能是Failed
,Completed
,或者Interrupted
(但不会是他们的组合)。终结事件不属于信号的值,他们必须被特殊处理。
1. 信号什么样
看看信号的定义,可以发现信号异常简单,它仅仅持有了一个对监听器集合的引用而已:
public final class Signal<Value, Error: ErrorType> {
public typealias Observer = ReactiveCocoa.Observer<Value, Error>
private let atomicObservers: Atomic<Bag<Observer>?> = Atomic(Bag())
......
}
Bag
是一个在ReactiveCocoa定义的数据结构,认为它是一个数组就可以了。Atomic
也是ReactiveCocoa自定的数据结构,给Atomic
一个值以后,如果对这个值进行操作,Atomic
会保证线程安全。这些都是信号的内部实现细节,对信号的使用者来讲是透明的。
关键在于信号的私有成员atomicObservers
,它是正在监听这个信号的所有监听器的集合。通过typealias
,信号的泛型定义决定了这些监听器能够接受的事件的类型。也就是说,如果一个信号发出带有String
类型数据的Next
事件,或者带有SomeError
类型的Failed
事件,那么只有能够处理这两种事件的监听器才能监听它。
于是信号看起来就是这样子的:
单纯的Signal是这样子的那么信号是怎么发送事件的?ReactiveCocoa的信号仅仅是一个媒介,它不管事件如何发生,只管把这些事件发送给监听自己的监听器们。打个比方,信号就是公路,而事件是公路上跑的车。事件怎么产生,需要用户(也就是我们)在初始化信号时来告诉信号,下面就来看看信号是如何初始化的。
2. 创建信号
创建一个信号,可以通过信号的初始化方法init(_ generator: Observer -> Disposable?)
。这个方法的外部参数名被取消了,不过从内部参数名可以看出它的作用——事件源(generator)。刚才提到过,信号�除了谁在监听自己以外一无所知,其实事件源也不知道信号的存在,那么就必须有一个中间人将事件从事件源移动到信号上。这个中间人,就是事件源接受的参数——一个Observer。他们之间的联系如何建立,就要看看信号初始化方法的执行过程:
- 先创建一个空的串行存根(
SerialDisposable
) - 再创建一个监听器(
Observer
),这个监听器持有上一步中创建的存根的引用,和对信号(Signal
)本身的引用 - 将上一步创建的监听器交给事件源,让事件源开始工作
- 将上一步返回的存根�(
Disposable
)交给第一步的串行存根
上面第二步中的监听器可能是ReactiveCocoa中最重要的对象了(其实,�把它叫做监听器让人困惑,我觉得叫做事件分发器更合适,只不过使用Observer
类来实现了而已),这个对象把ReactiveCocoa和ReactiveCocoa之外的世界(我们想做的App)联系了起来。所以必须说明一下它的作用(就是上面图中黄色圆角的action在做什么):
- 如果它从事件源收到任何事件,就在信号的监听器集合中循环迭代,将此事件原封不动地�分发给每一个监听器。
- 如果它从事件源收到的事件是一个终结事件,除了分发这个事件外,它还会废弃自己持有的存根对象。
事件源一开始发生事件,分发器就把这个事件分发给信号的所有监听器。一旦事件源发出了终结事件,分发器就废弃自己持有的串行存根,这会进而废弃事件源返回的存根,释放事件源占用的系统资源,事件源不再工作,信号就终结了。
值得注意的是:
- 其实在我们使用者看来,事件源,分发器,信号三者并没有区分看待的必要,将他们整体看做信号就可以了。
- 信号一经初始化,事件源就立即开始工作,发生事件(也就是所谓的“热”信号)。
该看看我们的职责了——提供事件源。事件源是一个回调函数,接受一个Observer
参数(就是那个很重要的分发器),可以选择性地返回一个Disposable
。我们可以做任何想做的事,只要把想告知信号另一端的监听器的值用sendNext(value:)
方法交给分发器就可以了;如果想要告诉对方我们做的事情失败了,就用调用分发器的sendFailed(error:)
方法;如果我们的操作正常结束就调用sendCompleted()
;如果我们被打断了,就调用sendInterrupted()
。
另外,如果我们的事件源要做一些很重的操作,需要占用系统资源要到操作完成才能释放的话,我们可以把释放资源的工作包装到一个Disposable
对象中,把它作为返回值传回去。分发器会在收到我们的终结事件时帮我们调用这些清理和释放的工作。当然,要是没有这个必要的话返回nil就可以了。
code somple
信号我们有了,那么如何监听信号呢?
3. 监听信号
相信你已经有了答案,要监听一个信号,只要将一个类型正确的监听器加入到信号的监听器集合里就行了。为此,ReavtiveCocoa框架在Signal类中定义了observe(observer: Observer) -> Disposable?
实例方法,把我们的监听器作为参数传入就可以了。
signal.observe(Signal.Observer { event in
switch event {
case let .Next(next):
print("Next: \(next)")
case let .Failed(error):
print("Failed: \(error)")
case .Completed:
print("Completed")
case .Interrupted:
print("Interrupted")
}
})
值得一提的是这个方法的返回值,一个存根会交到我们手中,我们可以废弃这个存根,这样做仅仅会使我们的监听器被从信号的监听器集合中移除,从而停止接收信号发出的事件,但是对信号本身而言没有任何影响。
监听信号在swift 2中,协议的定义中可以提供方法的默认实现。所有声明要实现�该协议的对象,如果没有提供自己的对于这些方法的实现,都可以使用这些默认实现。ReactiveCocoa里定义了一个SignalType
协议,规定了一个对象能够被称为信号所需要满足的接口。同时,它还定义了一些便利的帮助方法:
extension SignalType {
public func observe(action: Signal<Value, Error>.Observer.Action) -> Disposable? {
return observe(Observer(action))
}
public func observeNext(next: Value -> ()) -> Disposable? {
return observe(Observer(next: next))
}
public func observeCompleted(completed: () -> ()) -> Disposable? {
return observe(Observer(completed: completed))
}
public func observeFailed(error: Error -> ()) -> Disposable? {
return observe(Observer(failed: error))
}
public func observeInterrupted(interrupted: () -> ()) -> Disposable? {
return observe(Observer(interrupted: interrupted))
}
......
}
Signal
类实现了SignalType
协议,继承了这些默认方法,所以就不必显示调用监听器的初始化函数了,只要针对我们感兴趣的事件提供处理方法,作为参数传入就可以了:
signal.observeNext { next in
print("Next: \(next)")
}
signal.observeFailed { error in
print("Failed: \(error)")
}
signal.observeCompleted {
print("Completed")
}
signal.observeInterrupted {
print("Interrupted")
}
4. 管道(Pipes)
一个管道,由
Signal.pipe()
方法创建,是一个可以手动控制的信号(signal
)。
这个方法返回一个信号(
signal
)和一个监听器(observer
)。可以通过向监听器发送事件来控制信号。这在将非RAC的代码桥接到信号的世界时非常有用。
比如,不在回调中处理应用程序逻辑,�而是在这个回调中简单的向监听器发送事件。同时,信号可以被返回,隐藏了回调的实现细节。
pipe
是定义在Signal
类上的一个类方法,是另一种创建信号的方法。和信号的初始化方法不同,它不需要我们提供事件源,而是在返回值的元组中把事件分发器的引用交给我们,如何发送事件和何时发送时间完全由我们的后续处理而定:
/// Creates a Signal that will be controlled by sending events to the given
/// observer.
///
/// The Signal will remain alive until a terminating event is sent to the
/// observer.
public static func pipe() -> (Signal, Observer) {
var observer: Observer!
let signal = self.init { innerObserver in
observer = innerObserver
return nil
}
return (signal, observer)
}
pipe
方法调用了信号的初始化方法,作为参数的事件源中没有任何产生事件的处理,而是将事件分发器(上面代码中的innerObserver
)直接赋值到闭包外面的变量中,最后用元组的形式将创建好的信号和事件分发器返回。我们可以操作并监听返回的信号,或者在分发器上手动发送事件:
let (signal, observer) = Signal<String, NoError>.pipe()
signal
.map { string in string.uppercaseString }
.observeNext { next in print(next) }
observer.sendNext("a") // Prints A
observer.sendNext("b") // Prints B
observer.sendNext("c") // Prints C
事件可以产生了,信号把它们传递到了我们的监听器里,我们的监听器把事件中关联的值拿来做了我们要做的事。现在轮到ReactiveCocoa中最强大的部分登场了。
5. 信号的变形
假定有一只正在发出白光的手电筒,我们从它那里得到了白色的光。如果把它放到一块蓝色的玻璃后面,我们得到的光就变成了蓝色——信号发生了变形。
�用ReactiveCocoa的概念做个类比,信号就是手电筒,事件就是发出的光,监听器就是我们的眼睛。如果需要在事件发送到我们的监听器之前发生对它们做一些改变,就必须要有一个办法把我们的蓝色玻璃插入到信号和监听器之间,而且还应该可以插入任意多个任意颜色的玻璃。上面提到的SignalType
协议就提供这些办法。
SignalType
协议里有三个信号变形方法的默认实现,这三个方法(尤其是map
)是其他信号变形的基础:
map<U>(transform: Value -> U) -> Signal<U, Error>
mapError<F>(transform: Error -> F) -> Signal<Value, F>
filter(predicate: Value -> Bool) -> Signal<Value, Error>
正如这三个方法一样,所有关于信号变形操作的返回值依然是一个信号,也就是说可以进一步对这个新信号再次施加变形操作,从而形成一个变形操作的链条。除了map
,mapError
,filter
以外,ReactiveCocoa提供了许多其他的变形操作(后述),将这些操作排列组合,可以让信号发生无穷无尽的变化。如果ReactiveCocoa提供的变形操作不够用,我们可以扩展SignalType
协议(使用extension
)加入自定义的变形方法。
code sample
下面我们来分别看看它们在做什么:
1. 映射(map和mapError)
顾名思义,映射就是事件一对一的变形,我们来决定变形的具体过程,将这个过程作为参数传递给map
方法即可。
extension SignalType {
......
/// Maps each value in the signal to a new value.
@warn_unused_result(message="Did you forget to call `observe` on the signal?")
public func map<U>(transform: Value -> U) -> Signal<U, Error> {
return Signal { observer in
return self.observe { event in
observer.action(event.map(transform))
}
}
}
/// Maps errors in the signal to a new error.
@warn_unused_result(message="Did you forget to call `observe` on the signal?")
public func mapError<F>(transform: Error -> F) -> Signal<Value, F> {
return Signal { observer in
return self.observe { event in
observer.action(event.mapError(transform))
}
}
}
......
}
map
方法寥寥数语,但是所做事情比较复杂,有必要慢慢分解一下的话:
- 首先创建一个新的信号,这个过程和前面提到的信号初始化相同,一个事件分发器被传递到事件源中。
- 新信号的事件源使用得到的分发器创建一个监听器,这个监听对我们作为参数传入的变形方法有一个引用,它对每一个收到的事件实施这个变形方法,然后交给新信号的分发器。
- 新信号的�事件源不发生任何事件,仅仅把�第二步创建的监听器用
observe
方法加入到当前信号的监听器集合中。 - 因为用了
observe
方法,一个ActionDisposable
类型的存根会返回,交给新信号的串行存根。 - 将新的信号返回。
简而言之,映射操作就是使用当前的信号作为事件源制造了一个新的信号。沿用我们的类比,就是把手电筒和蓝色的玻璃绑在一起,当成一个新的手电筒。上面的过程中第二步中创建的监听器十分关键,它起到了连接新旧两个信号的作用,我们定义的变形方法(也就是我们制造的一个有颜色的玻璃)�被包装在这个监听器中。第三步,这个监听器加入到了当前信号的监听器集合中(跟手电筒绑在一起),一旦当前的信号有事件发生,这个监听器就会收到并立即调用变形方法,然后将新的事件交给新信号的分发器,于是新的信号的监听器们(我们的眼睛)就收到了变形后的事件(蓝色的光)。就像这样:
map后的事件发送.gif上面第四步返回的存根,和之前提到的监听信号时得到的存根一样,可以用来将负责事件变形的监听器从当前的信号上移除,而信号本身不会受任何影响(相当于把蓝色的玻璃拿掉,而手电筒不会有什么变化。)。
2. 过滤(filter)
有了上面个关于映射的讨论,再来看过滤的话就不困难了。过滤不会改变信号上事件流的值或类型,而是把不满足一定条件的事件拦截掉。拦截的方法,就是在连接新旧信号的监听器中规定,如果事件不满足条件,就不要把该事件传递给新信号的分发器。
extension SignalType {
......
/// Preserves only the values of the signal that pass the given predicate.
@warn_unused_result(message="Did you forget to call `observe` on the signal?")
public func filter(predicate: Value -> Bool) -> Signal<Value, Error> {
return Signal { observer in
return self.observe { (event: Event<Value, Error>) -> () in
if case let .Next(value) = event {
if predicate(value) {
observer.sendNext(value)
}
} else {
observer.action(event)
}
}
}
}
}
filter.gif
3. 聚合(reduce和collect)
6. 信号的组合
1. 组合(combine)
2. 打包(zip)
7. 信号的扁平化(Flatten)
1. 混合
2. 连接
3. 最新
8. 其他种类的变形
1. ignoreNil
2. take
take.gif3. collect
collect.gif4. observeOn
observerOn.gif5. combineLatestWith
combineLatestWith.gif6. delay
delay.gif7. skip
skip.gif8. materialize
materialize.gif9. dematerialize
dematerialize.gif10. sampleOn
sampleOn.gif11. takeUntil
takeUtil.gif12. skipUntil
skipUntil.gif13. combinePrevious
combinePrevious.gif14. reduce
reduce.gif15. scan
scan(initial:, combine:)
将信号包装为一个新信号,每当源信号发出事件时,事件的值都会被累积,然后再转发给新信号。具体的累积方法,由scan
方法的第二个参数规定,累积的结果的类型可以和源信号的值得类型不同。scan
的第一个参数是累积用的初始值,它的类型必须和累积的结果类型一致。
scan
方法在原信号的监听器集合中加入一个监听器,当信号发出第一个事件后,事件的值会和initial
的值累积后转发给新信号,累积的结果会保存在新信号的一个变量中。之后源信号发出的每一个事件的值都会和前一次累积的结果再次累积,然后转发给新信号。
16. skipRepeats
skipRepeats.gif17. skipWhile
skipWhile.gif18. takeUntilReplacement
takeUntilReplacement.gif19. takeLast
takeLast(count:)
操作将信号包装为一个新信号,在源信号发出完成事件时,将源信号的最后count
个事件发送出来,之后紧随一个完成事件。在源信号发出完成事件之前,新信号不发出任何事件。
takeLast
方法在源信号的监听器集合中加入一个带有缓冲的监听器,这个缓冲是一个原信号值类型的数组,数组长度由参count
数而定。当源信号发出Next
事件时,这个监听器并不将事件转发给新信号的事件分发器,而是将事件存储在缓冲的数组中。如果事件的数量超过了缓冲的容量,就将最早的事件从缓冲中移除以腾出空间。当源信号发出Complete
事件时,这个监听器就循环迭代缓冲数组,将其中所有的事件发送出去,之后再发出一个Complete
事件。
如果源信号发出了Failed
或Interrupted
事件,缓冲机制不会执行,而是直接转发给新信号。
网友评论