美文网首页RAC和RxSwift
RxSwift中的publish和connect函数

RxSwift中的publish和connect函数

作者: 简_爱SimpleLove | 来源:发表于2019-08-14 17:27 被阅读0次

    publish一般都和connect连用。一般用于网络请求序列,有多次订阅的时候,只是进行一次网络请求。
    如下:

            let netOB = Observable<Any>.create { (observer) -> Disposable in
                sleep(2)// 模拟网络延迟
                print("我开始请求网络了")
                observer.onNext("请求到的网络数据")
                observer.onNext("请求到的本地")
                observer.onCompleted()
                return Disposables.create {
                    print("销毁回调了")
                    }
                }
                .publish()
            
            // 走的协议ObservableType的subscribe方法
            netOB.subscribe(onNext: { (anything) in
                print("订阅1:",anything)
            })
            .disposed(by: disposeBag)
            
            netOB.subscribe(onNext: { (anything) in
                print("订阅2:",anything)
            })
            .disposed(by: disposeBag)
            _ = netOB.connect()
    

    打印结果:

    我开始请求网络了
    订阅1: 请求到的网络数据
    订阅2: 请求到的网络数据
    订阅1: 请求到的本地
    订阅2: 请求到的本地
    销毁回调了
    

    如果注释掉publishconnect,打印结果如下:

    我开始请求网络了
    订阅1: 请求到的网络数据
    订阅1: 请求到的本地
    销毁回调了
    我开始请求网络了
    订阅2: 请求到的网络数据
    订阅2: 请求到的本地
    销毁回调了
    

    分析

    从上面打印可以看出来:

    • 当注释掉publishconnect的时候,会有两次网络请求,并且当一个订阅1都响应了两个事件的结束过后,订阅2才开始请求网络并响应了两个事件。

    • 如果只是注释掉connect就没有打印(后面分析知道是源序列没有发送事件)。

    • 当都不注释publishconnect的时候,只会进行一次网络请求,并且发送的事件会依次响应给订阅1订阅2,然后下一个事件再依次响应给订阅1订阅2

    我们继续还是通过代码的流程来进行分析:

    1、肯定首先来到源序列的创建create方法中,初始化一个可观察序列AnonymousObservable,并保存闭包。

        init(_ subscribeHandler: @escaping SubscribeHandler) {
            self._subscribeHandler = subscribeHandler
        }
    

    2、来到publish方法中

        /**
        - returns: A connectable observable sequence that shares a single subscription to the underlying sequence.
        */
     public func publish() -> ConnectableObservable<Element> {
            return self.multicast { PublishSubject() }
        }
    

    传了一个参数PublishSubject()过去,并返回了一个multicast可连接可观察序列(共享对基础序列的单个订阅。分析后面知道,它的意思是对源序列订阅一次,其响应结果可被传递到后面的多个订阅)

    3、来到multicast的实现

        public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
            -> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
            return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
        }
    

    传进去两个参数:source即源序列,makeSubjectPublishSubject()
    返回一个ConnectableObservableAdapter序列

    4、来到ConnectableObservableAdapter的初始化方法

    final private class ConnectableObservableAdapter<Subject: SubjectType>
        : ConnectableObservable<Subject.Element> {
        init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
            self._source = source             // 保存源序列,即调用publish方法的那个序列
            self._makeSubject = makeSubject   // 保存了初始化的时候,传进来的PublishSubject()
            self._subject = nil               // 用于保存subject,初始化为nil
            self._connection = nil            // 用于保存connection,初始化为nil
        }
    }
    

    保存了传进来的两个参数sourcemakeSubject。注意ConnectableObservableAdapter作为中间层,并没有直接或者间接继承自Producer

    5、初始化完成过后,来的最外层的订阅:订阅1

            // 走的协议ObservableType的subscribe方法
            netOB.subscribe(onNext: { (anything) in
                print("订阅1:",anything)
            })
            .disposed(by: disposeBag)
    
    • 这里的netOB的实际类型其实是ConnectableObservableAdapter
    • 订阅就来到ObservableType协议subscribe的实现,在这里面创建了一个观察者AnonymousObserver,并保存了事件处理闭包eventHandler
    • 最后来到self.asObservable().subscribe(observer)这句,因为ConnectableObservableAdapter没有继承Producer,所以就只有走自己实现的subscribe方法中去。

    这里的subscribe并不像以前一样走到Producersubscribe方法,然后走到sinkrun方法,然后再走到源序列初始化的闭包_subscribeHandler里面,走到发送事件流程。

    6、来到ConnectableObservableAdapter自己实现的subscribe方法

        fileprivate var lazySubject: Subject {
            if let subject = self._subject {  // 以后永远都是同一个subject,即lazySubject
                return subject
            }
    
            let subject = self._makeSubject()  // 第一次进来,进行赋值
            self._subject = subject
            return subject
        }
    
        // 没有间接继承Producer, 就只有自己实现subscribe
        override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
            // 每次都是lazySubject订阅的观察者,即外面有多少subscribe就会来这里多少次
            // 这里的观察者,是保存外面订阅处理事件闭包eventHandler的AnonymousObserver
            return self.lazySubject.subscribe(observer)
        }
    
    • 这里的订阅方法中有一个self.lazySubject,看上面代码可以知道这是一个懒加载,而且被赋值一次过后,以后再进来不会再被赋值,也就是说始终都是:makeSubjectPublishSubject()
    • ConnectableObservableAdaptersubscribe,即是PublishSubject()subscribe方法
    • 这里的观察者,是保存外面订阅处理事件闭包eventHandlerAnonymousObserver

    7、来到PublishSubject()subscribe方法

    public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
            self._lock.lock()
            let subscription = self._synchronized_subscribe(observer)
            self._lock.unlock()
            return subscription
        }
    

    加了锁,保证了订阅观察者的顺序,来看_synchronized_subscribe方法里面的核心代码:

            let key = self._observers.insert(observer.on)
            return SubscriptionDisposable(owner: self, key: key)
    
    • 保存了observer.on方法,这里的observer就是外面传进来的AnonymousObserver,即每次订阅的事件处理方法
    • insert进去看,知道,里面是用字典存储的
    • 这里直接返回了SubscriptionDisposable,没有进行到发送事件,即没有走到源序列创建是闭包中的onNext方法,一般情况下这里都是到发送事件
    • 所以整个订阅流程只是将订阅创建的AnonymousObserver.on保存在PublishSubject()的字典_observers中,并没有走到事件发送。

    8、接下来来到最外层的第二个订阅:订阅2

            netOB.subscribe(onNext: { (anything) in
                print("订阅2:",anything)
            })
            .disposed(by: disposeBag)
    

    订阅1一样,最后也只是保存订阅2中创建的AnonymousObserver.on保存在PublishSubject()的字典_observers中,还并没有走到事件发送。

    这时我们会好奇,那到底什么时候发生事件呢?这时就来到下面这重要的一步

    9、来到最外层的connect方法

            _ = netOB.connect()
    

    ConnectableObservableAdapterconnect方法:

    override func connect() -> Disposable {
            return self._lock.calculateLocked {
                // 避免外界多次调用connect方法的时候,返回不同的connection
                if let connection = self._connection {
                    return connection
                }
    
                let singleAssignmentDisposable = SingleAssignmentDisposable()
                let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
                self._connection = connection
                // connection是观察者,始终都是同一个观察者
                // 源序列订阅connection,最后走到connection.on方法
                let subscription = self._source.subscribe(connection)
                singleAssignmentDisposable.setDisposable(subscription)
                return connection
            }
        }
    
    • 和之前保存的_connection判断,没有就创建,保证就算多次调用connect,也始终返回同一个connection
    • 这里先走connection的初始化,里面保存了PublishSubject()
        init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
            self._parent = parent                // ConnectableObservableAdapter
            self._subscription = subscription
            self._lock = lock
            self._subjectObserver = subjectObserver  // self.lazySubject.asObserver() 即 PublishSubject()
        }
    
    • 然后又来到最关键的一步let subscription = self._source.subscribe(connection),即源序列订阅connection。和前面文章中源序列订阅中间层MapSink一样。

    因为源序列AnonymousObservable继承自Producer,所以后面的流程就是:
    Producersubscribe方法 -->Producerrun方法--> AnonymousObservablerun方法-->AnonymousObservableSinkrun方法-->AnonymousObservable_subscribeHandler-->发送事件onNext方法-->观察者observeron方法-->connectionon方法

    10、connectionon方法

        func on(_ event: Event<Subject.Observer.Element>) {
            if isFlagSet(self._disposed, 1) {
                return
            }
            if event.isStopEvent {
                self.dispose()
            }
            self._subjectObserver.on(event)
        }
    

    上面知道_subjectObserver就是PublishSubject,所以来到PublishSubjecton方法。

    11、PublishSubjecton方法中关键代码

        func _synchronized_on(_ event: Event<Element>) -> Observers {
            self._lock.lock(); defer { self._lock.unlock() }
            switch event {
            case .next:
                if self._isDisposed || self._stopped {
                    return Observers()
                }
                return self._observers   // 返回前面保存的所有订阅的事件处理
        }
    

    return self._observers这句非常关键,意思是返回之前PublishSubject保存的_observers, 即AnonymousObserver.on方法,也就是说依次响应前面的订阅处理事件(订阅1订阅2中的打印)。

    订阅1: 请求到的网络数据
    订阅2: 请求到的网络数据
    

    12、同理当第二个onNext发送事件的时候,继续走第10和第11步,依次响应前面保存的AnonymousObserver.on方法。

    总结:

    • 使用中间层ConnectableObservableAdapter将所有订阅事件AnonymousObserver.on保存在PublishSubject的一个字典中
    • 直到调用connect的时候,才让源序列订阅一个中间类connection,然后响应connectionon方法,然后走PublishSubjecton方法,依次响应原来保存的AnonymousObserver.on
    • 所以当我们前面把connect方法注释过后,没有打印,因为注释过后,都没有走源序列的闭包,也就没有走发送事件
    • 核心思想就是:用一个类将所有的订阅事件保存起来,然后让源序列去订阅这个类,然后将响应结果依次分给所以的订阅事件。

    相关文章

      网友评论

        本文标题:RxSwift中的publish和connect函数

        本文链接:https://www.haomeiwen.com/subject/qtbwjctx.html