美文网首页
windowTimeCount运算

windowTimeCount运算

作者: 幸运的小强本人 | 来源:发表于2016-03-12 21:27 被阅读34次

    windowTimeCount运算的含义如下图所示:

    屏幕快照 2016-03-12 下午9.14.26.png
    extension: ObservableType {
        public func window(timeSpa timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)->Observable<Observable<E>> {
            return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
        }
    }
    
    class WindowTimeCount<Element>: Producer<Observable<Element>> {
        private let _timeSpan: RxTimeInterval
        private let _count: Int
        private let _scheduler: SchedulerType
        private let _source: Observable<Element>
    
        init(source: Observable<Element>, timeSpan:RxTimeInterval, count: Int, scheduler: SchedulerType) {
            _source = source
            _timeSpan = timeSpan
            _count = count
            _scheduler = scheduler
        }
    
        override func run<O: ObserverType where O.E == Observable<Element>(observer: O)->Disposable {
            let sink = WindowTimeCountSink(parent: self, observer: observer)
            sink.disposable = sink.run()
            return sink
        }
    }
    
    class WindowTimeCountSink<Element, O: ObserverType where O.E == Observable<Element>>: Sink<O>, ObserverType, LockOwnerType, SynchronizedOnType {
        typealias Parent = WindowTimeCount<Element>
        typealias E = Element
    
        private let _parent: Parent
    
        let _lock = NSREcursiveLock()
    
        private var _subject = PublishSubject<Element>()
        private var _count = 0
        private var _windowId = 0
    
        private let _timerD = SerialDisposable()
        private let _refCountDisposable: RefCountDisposable
        private let _groupDisposable = CompositeDisposable()
    
        init(parent: parent, observer: O) {
            _parent = parent
            _groupDisposable.addDisposable(_timerD)
            _refCountDisposable = RefCountDisposable(disposable: _groupDisposable)
            super.init(observer: observer)
        }
    
        func run() -> Disposable {
            forwardOn(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable()))
            createTimer(_windowId)
        
            _groupDisposable.addDisposable(_parent._source.subscribeSafe(self))
            return _refCountDisposable
        }
    
        func startNewWindowAndCompleteCurrentOne() {
            _subject.on(.Completed)
            _subject = PublishSubject<Element>()
        
            forwardOn(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable()))
        }
    
        func on(event: Event<E>) {
            synchronizedOn(event)
        }
    
        func _synchronized_on(event: Event<E>) {
            var newWindow = false
            var newId = 0
        
            switch event {
            case .Next(let element):
                _subject.on(.Next(element))
            
                do {
                    try incrementChecked(&_count)
                } catch (let e) {
                    _subject.on(.Error(e as ErrorType))
                    dispose()
                }
            
                if (_count == _parent._count) {
                    newWindow = true
                    _count = 0
                    _windowId += 1
                    newId = _windowId
                    self.startNewWindowAndCompleteCurrentOne()
                }
            
            case .Error(let error):
                _subject.on(.Error(error))
                forwardOn(.Error(error))
                dispose()
            case .Completed:
                _subject.on(.Completed)
                forwardOn(.Completed)
                dispose()
            }
    
            if newWindow {
                createTimer(newId)
            }
        }
    
        func createTimer(windowId: Int) {
            if _timerD.disposed {
                return
            }
        
            if _windowId != windowId {
                return
            }
    
            let nextTimer = SingleAssignmentDisposable()
    
            _timerD.disposable = nextTimer
    
            nextTimer.disposable = _parent._scheduler.scheduleRelative(windowId, dueTime: _parent._timeSpan) { previousWindowId in
            
            var newId = 0
            
            self._lock.performLocked {
                if previousWindowId != self._windowId {
                    return
                }
                
                self._count = 0
                self._windowId = self._windowId &+ 1
                newId = self._windowId
                self.startNewWindowAndCompleteCurrentOne()
            }
            
            self.createTimer(newId)
            
            return NopDisposable.instance
        }
    }
    }
    

    相关文章

      网友评论

          本文标题:windowTimeCount运算

          本文链接:https://www.haomeiwen.com/subject/kzbxlttx.html