美文网首页
windowTimeCount运算

windowTimeCount运算

作者: 幸运的小强本人 | 来源:发表于2016-03-12 21:27 被阅读34次

windowTimeCount运算的含义如下图所示:

屏幕快照 2016-03-12 下午9.14.26.png
extension: ObservableType {
    public func window(timeSpa timeSpan: RxTimeInterval, count: Int, scheduler: SchedulerType)->Observable<Observable<E>> {
        return WindowTimeCount(source: self.asObservable(), timeSpan: timeSpan, count: count, scheduler: scheduler)
    }
}

class WindowTimeCount<Element>: Producer<Observable<Element>> {
    private let _timeSpan: RxTimeInterval
    private let _count: Int
    private let _scheduler: SchedulerType
    private let _source: Observable<Element>

    init(source: Observable<Element>, timeSpan:RxTimeInterval, count: Int, scheduler: SchedulerType) {
        _source = source
        _timeSpan = timeSpan
        _count = count
        _scheduler = scheduler
    }

    override func run<O: ObserverType where O.E == Observable<Element>(observer: O)->Disposable {
        let sink = WindowTimeCountSink(parent: self, observer: observer)
        sink.disposable = sink.run()
        return sink
    }
}

class WindowTimeCountSink<Element, O: ObserverType where O.E == Observable<Element>>: Sink<O>, ObserverType, LockOwnerType, SynchronizedOnType {
    typealias Parent = WindowTimeCount<Element>
    typealias E = Element

    private let _parent: Parent

    let _lock = NSREcursiveLock()

    private var _subject = PublishSubject<Element>()
    private var _count = 0
    private var _windowId = 0

    private let _timerD = SerialDisposable()
    private let _refCountDisposable: RefCountDisposable
    private let _groupDisposable = CompositeDisposable()

    init(parent: parent, observer: O) {
        _parent = parent
        _groupDisposable.addDisposable(_timerD)
        _refCountDisposable = RefCountDisposable(disposable: _groupDisposable)
        super.init(observer: observer)
    }

    func run() -> Disposable {
        forwardOn(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable()))
        createTimer(_windowId)
    
        _groupDisposable.addDisposable(_parent._source.subscribeSafe(self))
        return _refCountDisposable
    }

    func startNewWindowAndCompleteCurrentOne() {
        _subject.on(.Completed)
        _subject = PublishSubject<Element>()
    
        forwardOn(.Next(AddRef(source: _subject, refCount: _refCountDisposable).asObservable()))
    }

    func on(event: Event<E>) {
        synchronizedOn(event)
    }

    func _synchronized_on(event: Event<E>) {
        var newWindow = false
        var newId = 0
    
        switch event {
        case .Next(let element):
            _subject.on(.Next(element))
        
            do {
                try incrementChecked(&_count)
            } catch (let e) {
                _subject.on(.Error(e as ErrorType))
                dispose()
            }
        
            if (_count == _parent._count) {
                newWindow = true
                _count = 0
                _windowId += 1
                newId = _windowId
                self.startNewWindowAndCompleteCurrentOne()
            }
        
        case .Error(let error):
            _subject.on(.Error(error))
            forwardOn(.Error(error))
            dispose()
        case .Completed:
            _subject.on(.Completed)
            forwardOn(.Completed)
            dispose()
        }

        if newWindow {
            createTimer(newId)
        }
    }

    func createTimer(windowId: Int) {
        if _timerD.disposed {
            return
        }
    
        if _windowId != windowId {
            return
        }

        let nextTimer = SingleAssignmentDisposable()

        _timerD.disposable = nextTimer

        nextTimer.disposable = _parent._scheduler.scheduleRelative(windowId, dueTime: _parent._timeSpan) { previousWindowId in
        
        var newId = 0
        
        self._lock.performLocked {
            if previousWindowId != self._windowId {
                return
            }
            
            self._count = 0
            self._windowId = self._windowId &+ 1
            newId = self._windowId
            self.startNewWindowAndCompleteCurrentOne()
        }
        
        self.createTimer(newId)
        
        return NopDisposable.instance
    }
}
}

相关文章

  • windowTimeCount运算

    windowTimeCount运算的含义如下图所示:

  • 2、Swift 基础运算

    赋值运算 算数运算 余数运算 一元加减法运算 混合赋值运算 比较运算 三元运算 空合运算符 范围运算 逻辑运算

  • Python 入门之常用运算符

    Python中的运算按种类可分为算数运算、比较运算、逻辑运算、赋值运算、成员运算、身份运算、位运算 1、常用运算符...

  • 黑猴子的家:Python 数据运算

    1、算术运算 2、比较运算 3、赋值运算 4、逻辑运算 5、成员运算 6、身份运算 7、位运算 code 8、运算...

  • 运算 & 运算符

    运算 & 运算符 算术运算 比较运算 比较运算的结果为bool值 赋值运算 逻辑运算 位运算 二进制的运算 身份检...

  • 2019-07-23

    R中的基本运算包括:算术运算、关系运算、逻辑运算、赋值运算以及其他运算。 算术运算:四则运算(+,-,*, /),...

  • JS 加、减、乘、除运算避免浮点数

    加法运算 减法运算 乘法运算 除法运算

  • 算术运算符

    算术运算符对数字(文字或变量)执行算术运算。 加法运算 减法运算 乘法运算 除法运算 余数运算 自增自减运算 指数...

  • JavaScript快速入门03-运算符

    JS的运算符 算数运算符 算数运算符用于对数值进行算数运算 运算符描述说明+加法运算-减法运算*乘法运算/除法运算...

  • 技术问 - 运算符有哪些

    算术运算符 赋值运算符 比较运算符 逻辑运算符 位运运算 三目运算符 算术运算符 赋值运算符 比较运算符(关系运算...

网友评论

      本文标题:windowTimeCount运算

      本文链接:https://www.haomeiwen.com/subject/kzbxlttx.html