美文网首页
RxSwift-Subject

RxSwift-Subject

作者: lmfei | 来源:发表于2020-03-28 21:23 被阅读0次

    Subject是Observable,也是Observer,所以它可以在有新值时发送消息,也可以订阅这些消息。

    Subject的子类

    Subject子类有PublishSubject、BehaviorSubject、ReplaySubject、AsyncSubject

    • PublishSubject - 只会打印订阅后发送的消息
      使用
    let publishSub = PublishSubject<Int>()
    publishSub.onNext(1)
    _ = publishSub
        .subscribe{ print("订阅到了:\($0)")}
        .disposed(by: disposeBag)
    publishSub.onNext(2)
    publishSub.onNext(3)
    

    打印

    订阅到了:next(2)
    订阅到了:next(3)
    
    • BehaviorSubject - 会打印出来订阅前最后的一个消息,以及订阅后发送的消息
      使用
    let behaviorSub = BehaviorSubject<Int>(value: 0)
    behaviorSub.onNext(1)
    behaviorSub.onNext(2)
    behaviorSub.subscribe(onNext: {
        print("订阅了:\($0)")
    }).disposed(by: disposeBag)
    behaviorSub.onNext(3)
    behaviorSub.onNext(4)
    behaviorSub.onNext(5)
    

    打印

    订阅了:2
    订阅了:3
    订阅了:4
    订阅了:5
    
    • ReplaySubject - 会打印出来订阅前bufferSize个消息,以及订阅后发送的消息
      使用
    let replaySub = ReplaySubject<Int>.create(bufferSize: 2)
    replaySub.onNext(0)
    replaySub.onNext(1)
    replaySub.onNext(2)
    replaySub.subscribe(onNext: {
        print("ReplaySubject:订阅了\($0)")
    }).disposed(by: disposeBag)
    replaySub.onNext(4)
    replaySub.onNext(5)
    

    打印

    ReplaySubject:订阅了1
    ReplaySubject:订阅了2
    ReplaySubject:订阅了4
    ReplaySubject:订阅了5
    
    • AsyncSubject - 在收到completed消息后,会打印最后一个消息,在收到error消息后,会移除所有观察者,并返回错误
      使用
    let asyncSub = AsyncSubject<Int>()
    asyncSub.onNext(0)
    asyncSub.onNext(1)
    asyncSub.subscribe(onNext: {
        print("AsyncSubject:订阅了\($0)")
    }, onError:{
        print("AsyncSubject:订阅error:\($0)")
    }).disposed(by: disposeBag)
    asyncSub.onNext(2)
    asyncSub.onNext(3)
    asyncSub.onCompleted()
    //asyncSub.onError(NSError.init(domain: "haha", code: -4, userInfo: nil))
    

    打印

    AsyncSubject:订阅了3
    

    其他的既是Observable又是Observer的类

    Variable、BehaviorRelay也有同样的性质

    • Variable - 已废弃
      代码
    let variable = Variable<Int>.init(0)
    variable.value = 1
    variable.asObservable().subscribe(onNext: {
        print("Variable订阅到\($0)")
    })
    variable.value = 2
    variable.value = 3
    

    打印

    Variable订阅到1
    Variable订阅到2
    Variable订阅到3
    

    代码

    let behaviorRelay = BehaviorRelay<Int>.init(value: 0)
    behaviorRelay.accept(1)
    print("BehaviorRelay.value:\(behaviorRelay.value)")
    behaviorRelay.subscribe(onNext: {
        print("BehaviorRelay 订阅到\($0)")
    }).disposed(by: disposeBag)
    behaviorRelay.accept(2)
    print("BehaviorRelay.value:\(behaviorRelay.value)")
    behaviorRelay.accept(3)
    

    打印

    BehaviorRelay.value:1
    BehaviorRelay 订阅到1
    BehaviorRelay 订阅到2
    BehaviorRelay.value:2
    BehaviorRelay 订阅到3
    

    下面针对PublishSubject进行源码解析

    PublishSubject源码解析

    public final class PublishSubject<Element>
        : Observable<Element>
        , SubjectType
        , Cancelable
        , ObserverType
        , SynchronizedUnsubscribeType {
        public typealias SubjectObserverType = PublishSubject<Element>
    
        typealias Observers = AnyObserver<Element>.s
        typealias DisposeKey = Observers.KeyType
        ...    
        // state
        private var _isDisposed = false
        private var _observers = Observers()
        private var _stopped = false
        private var _stoppedEvent = nil as Event<Element>? 
        ...
        /// Notifies all subscribed observers about next event.
        ///
        /// - parameter event: Event to send to the observers.
        public func on(_ event: Event<Element>) {
            #if DEBUG
                self._synchronizationTracker.register(synchronizationErrorMessage: .default)
                defer { self._synchronizationTracker.unregister() }
            #endif
            dispatch(self._synchronized_on(event), event)
        }
    
        func _synchronized_on(_ event: Event<Element>) -> Observers {
            self._lock.lock(); defer { self._lock.unlock() }
            switch event {
            case .next:
                if self._isDisposed || self._stopped {
                    return Observers()
                }
                
                return self._observers
            case .completed, .error:
                if self._stoppedEvent == nil {
                    self._stoppedEvent = event
                    self._stopped = true
                    let observers = self._observers
                    self._observers.removeAll()
                    return observers
                }
    
                return Observers()
            }
        }
        
        /**
        Subscribes an observer to the subject.
        
        - parameter observer: Observer to subscribe to the subject.
        - returns: Disposable object that can be used to unsubscribe the observer from the subject.
        */
        public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
            self._lock.lock()
            let subscription = self._synchronized_subscribe(observer)
            self._lock.unlock()
            return subscription
        }
    
        func _synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
            if let stoppedEvent = self._stoppedEvent {
                observer.on(stoppedEvent)
                return Disposables.create()
            }
            
            if self._isDisposed {
                observer.on(.error(RxError.disposed(object: self)))
                return Disposables.create()
            }
            
            let key = self._observers.insert(observer.on)
            return SubscriptionDisposable(owner: self, key: key)
        }
    
       
        /// Returns observer interface for subject.
        public func asObserver() -> PublishSubject<Element> {
            return self
        }
    
    }
    

    我们在使用的时候,会先创建一个PublishSubject对象。
    通过上面代码,可以看到PublishSubject遵循了Observable,即它是观察者,同时它实现了subscribe方法;也遵循了ObserverType,即也是监听者,实现了on方法。
    在我们订阅PublishSubject既调用subscribe方法时,会执行Observable的subscribe方法,从而调用子类PublishSubject的subscribe,进而会执行_observers.insert(observer.on)方法,将observer.on插入_observers中。
    当我们发送消息onNext时,会执行ObserverType的on,进而执行PublishSubject实现的on方法,这个方法会执行dispatch(self._synchronized_on(event), event)方法,其中_synchronized_on会返回_observers,在通过dispatch依次去执行observer去解析.next方法,进而执行subscribe的block,完成打印。

    思维导图
    PublishSubject

    相关文章

      网友评论

          本文标题:RxSwift-Subject

          本文链接:https://www.haomeiwen.com/subject/hnnsuhtx.html