RxSwift高阶函数skip解读
skip
skip
skip
的作用:跳过 Observable 中头 n 个元素,只关注后面的元素。
skip
的简单使用:
Observable.of(1, 2, 3, 4, 5, 6)
.skip(2)
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
跳过前n个,输出剩余的元素:3 4 5 6
虽然这里主要是研究skip
函数,但是调用者是of
函数的返回值,所以of
函数也不能省掉。
先看of
函数的实现:
public static func of(_ elements: Element ..., scheduler: ImmediateSchedulerType = CurrentThreadScheduler.instance) -> Observable<Element> {
return ObservableSequence(elements: elements, scheduler: scheduler)
}
这里只是传入了几个元素和默认的调度者,创建了一个ObservableSequence
序列的实例,它就是skip
的调用者。
skip
函数的实现:
public func skip(_ count: Int) -> Observable<Element> {
return SkipCount(source: self.asObservable(), count: count)
}
SkipCount
保存了调用者ObservableSequence
序列和需要跳过的count
。
final private class SkipCount<Element>: Producer<Element> {
let source: Observable<Element>
let count: Int
init(source: Observable<Element>, count: Int) {
self.source = source
self.count = count
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = SkipCountSink(parent: self, observer: observer, cancel: cancel)
let subscription = self.source.subscribe(sink)
return (sink: sink, subscription: subscription)
}
}
然后就开始了subscribe()
订阅信号。
此时,SkipCount
就会调用父类Producer
中的subscribe
函数,这个函数已经是老朋友了,它会根据所在的线程分别调用子类的run
,并创建一个销毁者SinkDisposer
,把订阅信号时创建的匿名观察者
和这个销毁者SinkDisposer
一起通过run
函数来传参过去。然后在子类SkipCount
的run
中让源序列(self.source
== ObservableSequence
)去订阅信号,并把
携带了匿名观察者
的SkipCountSink
传了过去。
下一步动动脚指头也知道,轮到ObservableSequence
去父类的subscribe
请安了,都是亲兄弟,也必然会像SkipCount
一样回到run
函数中:
final private class ObservableSequence<Sequence: Swift.Sequence>: Producer<Sequence.Element> {
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
let sink = ObservableSequenceSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
ObservableSequence
中的run
函数的实现就和RxSwift核心逻辑中的比较吻合了。都是创建了一个Sink
的子类ObservableSequenceSink
,然后调用run
。需要注意的是:ObservableSequenceSink
初始化用的observer
是SkipCountSink
。
ObservableSequenceSink
的run
:
final private class ObservableSequenceSink<Sequence: Swift.Sequence, Observer: ObserverType>: Sink<Observer> where Sequence.Element == Observer.Element {
typealias Parent = ObservableSequence<Sequence>
func run() -> Disposable {
return self._parent._scheduler.scheduleRecursive(self._parent._elements.makeIterator()) { iterator, recurse in
var mutableIterator = iterator
if let next = mutableIterator.next() {
self.forwardOn(.next(next))
recurse(mutableIterator)
}
else {
self.forwardOn(.completed)
self.dispose()
}
}
}
}
这里_parent
的调度者会带你去它家一顿唠嗑,最后还是会递归回调action闭包
。这闭包里的代码不难看出,会去父类Sink
的forwardOn
,然后就是Sink._observer.on(event)
,这个_observer
不就是我们刚刚重点加粗的那个SkipCountSink
么!
skip
的套路近在眼前:
final private class SkipCountSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Element = Observer.Element
typealias Parent = SkipCount<Element>
let parent: Parent
var remaining: Int
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self.parent = parent
self.remaining = parent.count
super.init(observer: observer, cancel: cancel)
}
func on(_ event: Event<Element>) {
switch event {
case .next(let value):
if self.remaining <= 0 {
self.forwardOn(.next(value))
}
else {
self.remaining -= 1
}
case .error:
self.forwardOn(event)
self.dispose()
case .completed:
self.forwardOn(event)
self.dispose()
}
}
}
soga,原来我们skip(n)
几次,这里就在else
中就放空几次。剩余的会正常self._observer.on(event)
,去回调subscribe
中的闭包。
skipWhile
skipWhile
skipWhile
的作用:跳过 Observable 中头几个元素,直到元素的判定为否
示例:
Observable.of(1, 2, 3, 4, 5, 6)
.skipWhile { $0 < 4 }
.subscribe(onNext: { print($0) })
.disposed(by: disposeBag)
跳过满足{$0 < 4}条件的元素,输出剩余的元素:4 5 6
同样的套路:
final private class SkipWhileSink<Observer: ObserverType>: Sink<Observer>, ObserverType {
typealias Element = Observer.Element
typealias Parent = SkipWhile<Element>
fileprivate let _parent: Parent
fileprivate var _running = false
func on(_ event: Event<Element>) {
switch event {
case .next(let value):
if !self._running {
do {
self._running = try !self._parent._predicate(value)
} catch let e {
self.forwardOn(.error(e))
self.dispose()
return
}
}
if self._running {
self.forwardOn(.next(value))
}
case .error, .completed:
self.forwardOn(event)
self.dispose()
}
}
}
关键部分还是on
函数中的实现,.next
下,用SkipWhile
保存的闭包_predicate
来判断当前元素是否满足条件。直到self._running
标记为true
后才会执行forwardOn
去响应订阅。
skipUntil
这部分加上就有些长,另外写了个RxSwift高阶函数skipUntil解读
网友评论