美文网首页
takeUntil运算

takeUntil运算

作者: 幸运的小强本人 | 来源:发表于2016-03-13 08:41 被阅读259次

    说明:
    Returns the elements from the source observable sequence until the other observable sequence produces an element.,含义说明如下图:

    屏幕快照 2016-03-13 上午8.20.34.png

    代码如下:

    extension ObservableType {
        public func takeUntil<O: ObservableType>(other: O)->Observable<E> {
            return TakeUntil(source: asObservable(), other: other.asObservable())
        }
    }
    
    class TakeUntil<Element, Other>: Producer<Element> {
        private let _source: Observable<Element>
        private let _other: Observable<Other>
    
        init(source: Observable<Element>, other: Observable<Other>) {
            _source = source
            _other = other
        }
    
        override func run<O: ObserverType where O.E == Element>(observer: O)->Disposable {
            let sink = TakeUntilSink(parent: self, observer: observer)
            sink.disposable = sink.run()
            return sink
        }
    }
    
    class TakeUntilSink<ElementType, other, O: ObserverType where O.E == ElementType>: Sink<O>, LockOwnerType, ObserverType, SynchronizedOnType {
        typealias E = ElementType
        typealias Parent = TakeUntil<E, Other>
    
        private let _parent: Parent
    
        let _lock = NSRecursiveLock()
    
        // state
        private var _open = false
    
        init(parent: Parent, observer: O) {
            _parent = parent
            super.init(observer: observer)
        }
    
        func on(event: Event<E>) {
            synchronizedOn(event)
        }
    
        func _synchronized_on(event: Event<E>) {
            switch event {
            case .Next:
                forwardOn(event)
            case .Error:
                forwardOn(event)
                dispose()
            case .Completed:
                forwardOn(event)
                dispose()
            }
        }
    
        func run()->Disposable {
            let otherObserver = TakeUntilSinkOther(parent: self)
            let otherSubscription = _parent._other.subscribe(otherObserver)
            otherObserver._subscription.disposable = otherSUbscription
            let sourceSubscription = _parent._source.subscribe(self)
    
            return StableCompositeDisposable.create(sourceSubscription, otherObserver._subscription)
        }
    }
    
    class TakeUntilSinkOther<ElementType, Other, O: ObserverType where O.E == ElementType>: ObserverType, LockOwnerType, SynchronizedOnType {
        typealias Parent = TakeUntilSink<ElementType, Other,O>
        typealias E = Other
        
        private let _parent: Parent
    
        var _lock: NSRecursiveLock {
            return _parent._lock
        }
    
        private let _subscription = SingleAssignmentDisposable()
        
        init(parent: Parent) {
            _parent = parent
        }
    
        func on(event: Event<E>) {
            synchronizedOn(event)
        }
    
        func _synchronized_on(event: Event<E>) {
            switch event {
            case .Next:
                _parent.forwardOn(.Completed)
                _parent.dispose()
            case .Error(let e):
                _parent.forwardOn(.Error(e))
                _parent.dispose()
            case .Completed:
                _parent._open = true
                _subscription.dispose()
            }
        }  
    
        deinit {
    
        }
    }
    

    相关文章

      网友评论

          本文标题:takeUntil运算

          本文链接:https://www.haomeiwen.com/subject/qznxlttx.html