美文网首页
RxSwift-publish

RxSwift-publish

作者: lmfei | 来源:发表于2020-04-01 22:04 被阅读0次

    Publish的作用,它可以在序列被订阅后并不会收到消息,只有经过connect后,才会收到消息,并且在被多次订阅后subscribeHandler只会被执行一次,这个是非常实用的,比如在subscribeHandler中发送请求,拿到数据后,发送消息,如果不用publish处理,会发送多次请求,浪费带宽,设计也不合理,这时祭出Publish,万事解决,还有谁,下面详细了解下它的使用与源码解析

    使用

    let ob = Observable<String>.create { (observe) -> Disposable in
        print("我来了")
        observe.onNext("h")
        return Disposables.create{
            print("Observable Disposables")
        }
        }.publish()
    
    _ = ob.subscribe(onNext: {
        print("订阅1:\($0)")
    }).disposed(by: disposeBag)
    
    _ = ob.subscribe(onNext: {
        print("订阅2:\($0)")
    }).disposed(by: disposeBag)
    
    _ = ob.connect()
    

    打印

    我来了
    订阅1:h
    订阅2:h
    

    源码解析

    //Multicast.swift
    extension ObservableType {
        public func publish() -> ConnectableObservable<Element> {
            return self.multicast { PublishSubject() }
        }
    }
    

    调用multicast方法,并传入一个PublishSubject

    //Multicast.swift
    extension ObservableType {  
        public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
            -> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
            return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
        }
    }
    

    创建内部类ConnectableObservableAdapter时,传入source-源序列和makeSubject-前面闭包中生成的PublishSuject

    final private class ConnectableObservableAdapter<Subject: SubjectType>
        : ConnectableObservable<Subject.Element> {
        typealias ConnectionType = Connection<Subject>
    
        fileprivate let _source: Observable<Subject.Observer.Element>
        fileprivate let _makeSubject: () -> Subject
    
        fileprivate let _lock = RecursiveLock()
        fileprivate var _subject: Subject?
    
        // state
        fileprivate var _connection: ConnectionType?
    
        init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
            self._source = source  //源序列
            self._makeSubject = makeSubject //前面用来创建PublishSubject的闭包
            self._subject = nil
            self._connection = nil
        }
    
        override func connect() -> Disposable {
            return self._lock.calculateLocked {
                if let connection = self._connection { //判断是否已经进行过订阅
                    return connection
                }
      
                let singleAssignmentDisposable = SingleAssignmentDisposable()
                let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
                self._connection = connection
                let subscription = self._source.subscribe(connection)
                singleAssignmentDisposable.setDisposable(subscription)
                return connection
            }
        }
    
        fileprivate var lazySubject: Subject {
            if let subject = self._subject {
                return subject
            }
    
            let subject = self._makeSubject()
            self._subject = subject
            return subject
        }
    
        override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
            return self.lazySubject.subscribe(observer)
        }
    }
    

    ConnectableObservableAdapter中重要点

    • 声明了一个lazySubject,这个属性只会初始化一次,它的值就是_makeSubject闭包返回的PublishSubject
    • 实现了subscribe方法,调用lazySubject.subscribe(observer)
    • 实现了connect方法,方法内部创建了Connection对象,然后调用_source.subscribe(connection),也就是源序列的subscribe订阅方法,并传入connection
    //Multicast.swift
    final private class Connection<Subject: SubjectType>: ObserverType, Disposable {
        typealias Element = Subject.Observer.Element
    
        private var _lock: RecursiveLock
        // state
        private var _parent: ConnectableObservableAdapter<Subject>?
        private var _subscription : Disposable?
        private var _subjectObserver: Subject.Observer
    
        private let _disposed = AtomicInt(0)
    
        init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
            self._parent = parent
            self._subscription = subscription
            self._lock = lock
            self._subjectObserver = subjectObserver
        }
    
        func on(_ event: Event<Subject.Observer.Element>) {
            if isFlagSet(self._disposed, 1) {
                return
            }
            if event.isStopEvent {
                self.dispose()
            }
            self._subjectObserver.on(event)
        }
        ...
    }
    

    在调用connect方法时,会创建内部类Connection,以备后用,这个类中主要实现了on方法,on方法调用makeSubject的on方法

    除PublishSubject和Observable序列处理逻辑外,剩下的关键代码已经罗列出来,剩下就来梳理下完整的流程

    1. 创建序列,生成内部类ConnectableObservableAdapter,保留源序列_source,通过muticast传入闭包生成序列_makeSubject,声明属性lazySubject,保证_makeSubject只创建一次,并实现subscribe、connect方法
    2. 订阅序列,调用1中subscribe,然后执行PublishSubject订阅流程
    3. 执行connect,创建内部类Connection,然后执行源序列_source的subscribe方法并传入内部类对象Connection。接下来会执行源序列的订阅流程,执行_subscribeHandler闭包,闭包中会执行onNext方法,接下来走到源序列的subscribe闭包中,也就是Connection中,接下来执行Connection的on方法,进而执行lazySubject的on方法,最终执行到lazySubject的subscribe闭包方法,结束一个完整流程
      接下来依旧来看下思维导图


      Publish

    生活如此美好,今天就点到为止。。。

    相关文章

      网友评论

          本文标题:RxSwift-publish

          本文链接:https://www.haomeiwen.com/subject/vluzyhtx.html