/**
Applies an accumulator function over an
observable sequence and returns each
intermediate result. The specified seed value
is used as the initial accumulator value.
For aggregation behavior with no
intermediate results, see 'reduce'
see also: 'http://reactivex.io/documentation/operators/scan.html'
*/
图片描述如下:
scanextension ObservableType {
public func scan<A>(seed: A, accumulator: (A, E) throws -> A)-> Observable<A> {
return Scan(source: self.asObservable(), seed: seed, accumulator: accumulator)
}
}
class Scan<Element, Accumulate>: Producer<Accumulate> {
typealias Accumulator = (Accumulator, Element) throws -> Accumulate
private let _source: Observable<Element>
private let _seed: Accumulate
private let _accumulator: Accumulator
init(source: Observable<Element>, seed: Accumulate, accumulator: Accumulator) {
_source = source
_seed = seed
_accumulator = accumulator
}
override func run<O: ObserverType where O.E == Accumulate>(observer: O)->Disposable {
let sink = ScanSink(parent: self, observer: observer)
sink.disposable = _source.subscribe(sink)
return sink
}
}
class ScanSink<ElementType, Accumulate, O: ObserverType where O.E == Accumulate>: Sink<)>, ObserverType {
typealias Parent = Scan<ElementType, Accumulate>
typealias E = ElementType
private let _parent: Parent
private var _accumulate: Accumulate
init(parent: Parent, observer: O) {
_parent = parent
_accumulate = parent._seed
super.init(observer: observer)
}
func on(event: Event<ElementType>) {
switch event {
case .Next(let element):
do {
_accumulate = try _parent._accumulator(_accumulate, element)
forwardOn(.Next(_accumulate))
}catch let error {
forwardOn(.Error(error))
dispose()
}
case .Error(let error):
forwardOn(.Error(error))
dispose()
case .Completed:
forwardOn(.Completed)
dispose()
}
}
}
网友评论