RxSwift_v1.0笔记——11: Time Based Operators
Getting started
Display the Assistant Editor
This will display a live view of the sequences you build in code.
This is where the real action will happen! To

make sure Automatically Run is selected

Buffering operators
Replaying past elements
When a sequence emits items, you'll often need to make sure that a future subscriber receives some or all of the past items. This is the purpose of the replay(_:) and replayAll() operators
let elementsPerSecond = 1
let maxElements = 5
let replayedElements = 1
let replayDelay: TimeInterval = 3
//创建一个observable以elementsPerSecond的频率发射元素。你也将捕获已经发射
//元素的总数,并且控制多少元素回调(played back)给新的订阅者。使用
//Observable<Int>.create函数构建observable
//DispatchSource.timer函数是在playground Sources folder中定义的对
//DispatchSource的扩展,它简化了重复定时器的创建。
let sourceObservable = Observable<Int>.create { observer in
var value = 1
let timer = DispatchSource.timer(interval: 1.0 /
Double(elementsPerSecond), queue: .main) {
if value <= maxElements {
observer.onNext(value)
value = value + 1
}
}
return Disposables.create {
timer.suspend()
}
}
.replay(replayedElements)
//为了可视化replay(_:),创建一对TimelineView视图。TimelineView类定义在
//playground page的底部。它为observable发射事件提供了一个动画
let sourceTimeline = TimelineView<Int>.make()
let replayedTimeline = TimelineView<Int>.make()
//为了方便,我们使用UIStackView。它将通过直接的订阅者和随后的订阅者来显示源
//observable,如下代码,它简单的创建了一些vertiaclly stacked
//views.UIStackView.makeVertical(_:)和UILabel.make(_:)是在playground中
//的便利扩展。
let stack = UIStackView.makeVertical([
UILabel.makeTitle("replay"),
UILabel.make("Emit \(elementsPerSecond) per second:"),
sourceTimeline,
UILabel.make("Replay \(replayedElements) after \(replayDelay) sec:"),
replayedTimeline])
//准备一个直接的订阅者,在top timeline显示它接收的
//TimelineView实现了ObserverType协议,因此你能够订阅它。每次有新的事件发生,
//将会显示在timeline上。发射显示为绿色,完成为黑色,错误为红色。
_ = sourceObservable.subscribe(sourceTimeline)
//随后的订阅者延时订阅
DispatchQueue.main.asyncAfter(deadline: .now() + replayDelay) {
_ = sourceObservable.subscribe(replayedTimeline)
}
//因为replay(_:)创建了一个connectable observable,它需要调用connect()才
//能开始接收
_ = sourceObservable.connect()
//最后,在stack view中建立host view并显示
let hostView = setupHostView()
hostView.addSubview(stack)
hostView
connectable observable——they won't start emitting items until you call their connect() method. These operators are:
-
replay(_:)
-
replayAll()
-
multicast(_:)
-
publish()

Unlimited replay
replayAll()——仅仅当你知道缓存元素将保持的总数是合理的情况才去使用。换句话说,在一个序列中使用它可能不会终止而且有可能产生大量的数据阻塞你的内存。
//替换.replay(replayedElements)
.replayAll()

Controlled buffering
buffer(timeSpan:count:scheduler:)
let bufferTimeSpan: RxTimeInterval = 4
let bufferMaxCount = 2
let sourceObservable = PublishSubject<String>()
let sourceTimeline = TimelineView<String>.make()
let bufferedTimeline = TimelineView<Int>.make()
let stack = UIStackView.makeVertical([
UILabel.makeTitle("buffer"),
UILabel.make("Emitted elements:"),
sourceTimeline,
UILabel.make("Buffered elements (at most \(bufferMaxCount) every \(bufferTimeSpan) seconds):"),
bufferedTimeline])
_ = sourceObservable.subscribe(sourceTimeline)
//你希望接收来自源observable的元素的数组
//每个数组能够保持至多bufferMaxCount个元素。
//如果在bufferTimeSpan到期前接收了许多元素(>=bufferMaxCount),这个操作将发射
//缓存元素并复位它的定时器。(会发射多次直到<bufferMaxCount)
//在bufferTimeSpan延时后,buffer将发射一个数组发射剩余的元素。在接下来时间段
//如果没有元素被接收到,这个数组将是空。
sourceObservable
.buffer(timeSpan: bufferTimeSpan, count: bufferMaxCount, scheduler: MainScheduler.instance)
.map { $0.count }
.subscribe(bufferedTimeline)
let hostView = setupHostView()
hostView.addSubview(stack)
hostView
DispatchQueue.main.asyncAfter(deadline: .now() + 5) {
sourceObservable.onNext("🐱")
sourceObservable.onNext("🐱")
sourceObservable.onNext("🐱")
}
//let elementsPerSecond = 0.7
//let timer = DispatchSource.timer(interval: 1.0 / Double(elementsPerSecond), queue: .main) {
// sourceObservable.onNext("🐱")
//}
// Support code -- DO NOT REMOVE
class TimelineView<E>: TimelineViewBase, ObserverType where E: CustomStringConvertible {
static func make() -> TimelineView<E> {
let view = TimelineView(frame: CGRect(x: 0, y: 0, width: 400, height: 100))
view.setup()
return view
}
public func on(_ event: Event<E>) {
switch event {
case .next(let value):
add(.Next(String(describing: value)))
case .completed:
add(.Completed())
case .error(_):
add(.Error())
}
}
}

每个盒子中显示了每次发射数组的元素个数:
-
首先发送一个空数组
-
然后再源observable上推送三个元素
-
buffered timeline立即得到两个元素的数组,因为这是你规定的最大缓存数
-
过4秒后,一个元素的数组被发射。这是剩下的第三个元素
再试下不同的缓存脚本,替换DispatchQueue
let elementsPerSecond = 0.7
let timer = DispatchSource.timer(interval: 1.0 / Double(elementsPerSecond), queue: .main) {
sourceObservable.onNext("🐱")
}

Windows of buffered observables
window(timeSpan:count:scheduler:)与 buffer(timeSpan:count:scheduler:)非常接近。唯一的不同是它发射一个Observable的items而不是数组。
let elementsPerSecond = 3
let windowTimeSpan: RxTimeInterval = 4
let windowMaxCount = 10
let sourceObservable = PublishSubject<String>()
let sourceTimeline = TimelineView<String>.make()
let stack = UIStackView.makeVertical([
UILabel.makeTitle("window"),
UILabel.make("Emitted elements (\(elementsPerSecond) per sec.):"),
sourceTimeline,
UILabel.make("Windowed observables (at most \(windowMaxCount) every \(windowTimeSpan) sec):")])
//增加一个定时器推送元素到源observable
let timer = DispatchSource.timer(interval: 1.0 / Double(elementsPerSecond), queue: .main) {
sourceObservable.onNext("🐱")
}
_ = sourceObservable.subscribe(sourceTimeline)
_ = sourceObservable
.window(timeSpan: windowTimeSpan, count: windowMaxCount, scheduler: MainScheduler.instance)
//每次flatMap获得一个新的observable,你插入一个新的timeline view
//然后映射items的observable到一个元组的observable。这样做是为了传输显示它的值和时间轴。
//一旦这个内部的observable完成,你concat(_:)一个元组以便你能够标记是进行为完成。
//你用flatMap(_ :)将元组的可观察结果序列发送到单个元组序列
//您在收到元组时,订阅了所得到的observable并注入时间线。
.flatMap { windowedObservable -> Observable<(TimelineView<Int>, String?)> in
let timeline = TimelineView<Int>.make()
stack.insert(timeline, at: 4)
stack.keep(atMost: 8)
return windowedObservable
.map { value in (timeline, value) }
.concat(Observable.just((timeline, nil)))
}
//最后,你需要在每个实践性上订阅和显示元素
.subscribe(onNext: { tuple in
let (timeline, value) = tuple
if let value = value {
timeline.add(.Next(value))
} else {
timeline.add(.Completed(true))
}
})
let hostView = setupHostView()
hostView.addSubview(stack)
hostView

Time-shifting operators
有时(every now and again)你需要时间旅行。当RxSwift不能帮助你修复你投递的相关错误时,它有能力冻结时间让你等待直到自我克隆是有效的。
这节你将窥视两种时间相关的操作。
Delayed subscriptions
let elementsPerSecond = 1
let delayInSeconds = 1.5
let sourceObservable = PublishSubject<Int>()
let sourceTimeline = TimelineView<Int>.make()
let delayedTimeline = TimelineView<Int>.make()
let stack = UIStackView.makeVertical([
UILabel.makeTitle("timer"),
UILabel.make("Emitted elements (\(elementsPerSecond) per sec.):"),
sourceTimeline,
UILabel.make("Delayed elements (with a \(delayInSeconds)s delay):"),
delayedTimeline])
var current = 1
let timer = DispatchSource.timer(interval: 1.0 / Double(elementsPerSecond), queue: .main) {
sourceObservable.onNext(current)
current = current + 1
}
_ = sourceObservable.subscribe(sourceTimeline)
// Setup the delayed subscription
// ADD CODE HERE
_ = sourceObservable
.delaySubscription(RxTimeInterval(delayInSeconds), scheduler:
MainScheduler.instance)
.subscribe(delayedTimeline)
//_ = sourceObservable
// .delay(RxTimeInterval(delayInSeconds), scheduler:
// MainScheduler.instance)
// .subscribe(delayedTimeline)
//_ = Observable<Int>
// .timer(3, scheduler: MainScheduler.instance)
// .flatMap { _ in
// sourceObservable.delay(RxTimeInterval(delayInSeconds), scheduler: MainScheduler.instance)
// }
// .subscribe(delayedTimeline)
let hostView = setupHostView()
hostView.addSubview(stack)
hostView

Note:在Rx中,observables分为“cold”或“hot”。
cold observables:当你订阅他们时开始发射元素。
hot observables:发射元素不依赖与订阅。
Delayed elements
在delay playground中,替换delayed subscription
_ = sourceObservable
.delay(RxTimeInterval(delayInSeconds), scheduler:
MainScheduler.instance)
.subscribe(delayedTimeline)
You just replaced delaySubscription(_:scheduler:) with delay(_:scheduler:).

Timer operators
RsSwift为一次性和重复的定时器提供了一个简单和高效的解决方案。它完美的与序列集成并提供了与其他序列进行拆解和组合的能力。
Intervals
返回replay playground page,Delete this code, starting at let sourceObservable = Observable<Int>.create {… and up to (and including) replayAll(); and then insert instead:
let sourceObservable = Observable<Int>
.interval(RxTimeInterval(elementsPerSecond), scheduler:
MainScheduler.instance)
.replay(replayedElements)

One-shot or repeating timers
Observable.timer(_:period:scheduler:)与 Observable.interval(_:scheduler:)非常相似,仅仅增加了一下特性:
- 你能够指定一个”due date“当做从订阅到第一次发射所消逝的时间
- 重复的时间是可选的。如果你不指定,定时器observable将只发射一次,然后完成。
打开delay页面,Locate the place where you used the delay(_:scheduler:) operator. Replace the whole block of code with:
_ = Observable<Int>
.timer(3, scheduler: MainScheduler.instance)
.flatMap { _ in
sourceObservable.delay(RxTimeInterval(delayInSeconds), scheduler:
MainScheduler.instance)
}
.subscribe(delayedTimeline)

一个定时器触发了另一个定时器,这比使用Dispatch有一些好处:
- 整个链式结构更易读(more “Rx-y”).
- 因为订阅返回了disposable,你能在第一个或第二个定时器触发前,用一个单一的observable取消订阅
- 使用 flatMap(_:)操作,你不需要使用 Dispatch asynchronous闭包跳转就能生成定时器序列
Timeouts
Open the timeout playground page.
//Create a simple button
let button = UIButton(type: .system)
button.setTitle("Press me now!", for: .normal)
button.sizeToFit()
//Prepare the timeline view and stack it up with the button:
let tapsTimeline = TimelineView<String>.make()
let stack = UIStackView.makeVertical([
button,
UILabel.make("Taps on button above"),
tapsTimeline])
//Setup the observable and connect it to the timeline view:
let _ = button
.rx.tap
.map { _ in "•" }
.timeout(5, other: Observable.just("X"), scheduler: MainScheduler.instance)
.subscribe(tapsTimeline)
//And as usual, add the stack to the host view to kick off the animation:
let hostView = setupHostView()
hostView.addSubview(stack)
hostView
如果在5秒内你点击按钮,你将在时间线上看到你的点击。在5秒之后,超时开火,时间线将由于错误而停止。

另一个版本的 timeout(_:scheduler:)是当超时时,产生另一个obsesvable代替发射一个错误。
To try this, change the timeout(_:scheduler:) call in the playground to:
.timeout(5, other: Observable.just("X"), scheduler:
MainScheduler.instance)

网友评论