美文网首页
scan运算

scan运算

作者: 幸运的小强本人 | 来源:发表于2016-03-14 18:44 被阅读25次

    /**
    Applies an accumulator function over an
    observable sequence and returns each
    intermediate result. The specified seed value
    is used as the initial accumulator value.
    For aggregation behavior with no
    intermediate results, see 'reduce'

    see also: 'http://reactivex.io/documentation/operators/scan.html'
    */
    

    图片描述如下:

    scan
    extension ObservableType {
        public func scan<A>(seed: A, accumulator: (A, E) throws -> A)-> Observable<A> {
            return Scan(source: self.asObservable(), seed: seed, accumulator: accumulator)
        }
    }
    
    
    class Scan<Element, Accumulate>: Producer<Accumulate> {
        typealias Accumulator = (Accumulator, Element) throws -> Accumulate
    
        private let _source: Observable<Element>
        private let _seed: Accumulate
        private let _accumulator: Accumulator
    
        init(source: Observable<Element>, seed: Accumulate, accumulator: Accumulator) {
            _source = source
            _seed = seed
            _accumulator = accumulator
        }
    
        override func run<O: ObserverType where O.E == Accumulate>(observer: O)->Disposable {
            let sink = ScanSink(parent: self, observer: observer)
            sink.disposable = _source.subscribe(sink)
            return sink
        }
    }
    
    class ScanSink<ElementType, Accumulate, O: ObserverType where O.E == Accumulate>: Sink<)>, ObserverType {
        typealias Parent = Scan<ElementType, Accumulate>
        typealias E = ElementType
    
        private let _parent: Parent
        private var _accumulate: Accumulate
    
        init(parent: Parent, observer: O) {
            _parent = parent
            _accumulate = parent._seed
            super.init(observer: observer)
        }
    
        func on(event: Event<ElementType>) {
            switch event {
            case .Next(let element):
                do {
                    _accumulate = try _parent._accumulator(_accumulate, element)
                    forwardOn(.Next(_accumulate))
                }catch let error {
                    forwardOn(.Error(error))
                    dispose()
                }
            case .Error(let error):
                forwardOn(.Error(error))
                dispose()
            case .Completed:
                forwardOn(.Completed)
                dispose()
            }
        }
    }

    相关文章

      网友评论

          本文标题:scan运算

          本文链接:https://www.haomeiwen.com/subject/qgvflttx.html