美文网首页RxSwiftRx
05. RxSwift源码解读:Connection

05. RxSwift源码解读:Connection

作者: Oceanj | 来源:发表于2021-06-10 13:44 被阅读0次

    今天介绍可连接序列和连接相关操作符

    可连接序列 Connection Observable, 不同于一般的序列,有订阅时不会立刻开始发送事件消息,只有当调用 connect()之后才会开始发送值。

    connect & publish 操作符

    我们看一个例子:

          let observable = Observable<Int>.create({ anyObserver in
                print("subscrition")
                anyObserver.onNext(1)
                anyObserver.onNext(2)
                anyObserver.onNext(3)
                return Disposables.create()
            })
            .publish()
            
            observable
                .subscribe {
                    print("订阅1", $0)
                }
                .disposed(by: bag)
            DispatchQueue.main.asyncAfter(deadline: .now() + 3) {
                _ = observable.connect()
            }
            
            observable.subscribe {
                print("订阅2", $0)
            }
    
    subscrition
    订阅1 next(1)
    订阅2 next(1)
    订阅1 next(2)
    订阅2 next(2)
    订阅1 next(3)
    订阅2 next(3)
    

    姑且把创建序列传入的闭包称之为subscribe handler

    3秒后才会打印信息,因为3秒后才连接。而且我们发现subscrition只打印了一次,说明subscribe handler只执行了一遍,如果去掉publish和connect,则会执行两遍,相当于每次订阅都会重新执行一遍;如果只去掉connect,则不会发送事件。

    对于上面的结果我们分析一下源码:
    对于publish操作符,不妨看看:

    public func publish() -> ConnectableObservable<Element> {
          self.multicast { PublishSubject() }
    }
    

    publish本质上调用了multicast操作符:

        public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
            -> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
            ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
        }
    

    返回一个ConnectableObservableAdapter对象,它也是个Observable,保存了原始的Observable和 makeSubject闭包。

    • ConnectableObservableAdapter继承了ConnectableObservable,ConnectableObservable表示一个可连接的序列,ConnectableObservable遵循ConnectableObservableType协议声明一个connect()协议,表示连接操作。
    • SubjectType表示这是一个Subject,在上一篇文章已经讲述过,那么连接操作实际上将一个普通的Observable连接到一个Subject,而publish操作明显是接到PublishSubject,它是没有buffer的Subject。

    然后再看看subscribe代码在哪:

        public func subscribe(_ on: @escaping (Event<Element>) -> Void) -> Disposable {
            let observer = AnonymousObserver { e in
                on(e)
            }
            return self.asObservable().subscribe(observer)
        }
    

    然后调用熟悉的subscribe(observer), 这里会进入ConnectableObservableAdapter的subscribe:

        override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
            self.lazySubject.subscribe(observer)
        }
    

    这里的lazySubject是通过makeSubject闭包创建的PublishSubject();通过它继续调用subscribe,这里相当于做了一次变形,而我们在上一篇已经介绍过PublishSubject执行subscribe是不会发送事件消息的(先不考虑error和complete事件),它只是将观察者插入到observers中保存起来,这块的代码上一篇已经详细讲解过,这里不再讲解。所以就解释了为什么connect之前调用subscribe不会发送事件消息;那么connect又干了什么呢?我们继续看connect的代码:

        // at `ConnectableObservableAdapter` class
        override func connect() -> Disposable {
            return self.lock.performLocked {
                if let connection = self.connection {
                    return connection
                }
    
                let singleAssignmentDisposable = SingleAssignmentDisposable()
                let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self.lock, subscription: singleAssignmentDisposable)
                self.connection = connection
                let subscription = self.source.subscribe(connection)
                singleAssignmentDisposable.setDisposable(subscription)
                return connection
            }
        }
    

    这里创建了一个Connection,并且执行了原始序列的subscribe(connection), 以connection作为参数,原始序列是最初的AnonymousObservable, AnonymousObservable的subscribe方法实现已经比较熟悉了,不用再看。它最终会执行最初创建序列的闭包。但是因为传入的observer是connection,而且Connection类重写了on方法,所以在调用onNext时,会调用到这个on方法:

          func on(_ event: Event<Subject.Observer.Element>) {
            if isFlagSet(self.disposed, 1) {
                return
            }
            if event.isStopEvent {
                self.dispose()
            }
            self.subjectObserver.on(event)
        }
    

    然后执行PulishSubject的on方法:这里会向所有观察者发送消息,例子中的观察者有两个。意思是例子每次调用onNext会通知所有的观察者。这也解释了为什么“subscrition”只打印了一遍,onNext每个打印两遍。如果先connect再subscribe,则只会打印“subscrition”,因为connect时还没有观察者。

    share

    share操作符可接受两个参数,replay 和 scope, replay表示缓存的个数或者叫重放的次数,这个好理解,而scope可接受两个枚举值.whileConnected 和 .forever,这两个什么区别:

    • .whileConnected 只有在连接时才支持缓存和重放,意味着如果连接断开则不能重放,下次再订阅时只能重新执行subscribe handler,一般什么操作会断开连接?当发送error或complete事件或者回收资源时会断开连接。
    • .forever 永远支持缓存和重放,无论是否断开连接。
      我们看看官方给的例子:
    func testScope(){
            let xs = Observable.deferred { () -> Observable<TimeInterval> in
                    print("Performing work ...")
                    return Observable.just(Date().timeIntervalSince1970)
                }
                .share(replay: 1, scope: .forever)
    
            _ = xs.subscribe(onNext: { print("next \($0)") }, onCompleted: { print("completed\n") })
            _ = xs.subscribe(onNext: { print("next \($0)") }, onCompleted: { print("completed\n") })
            _ = xs.subscribe(onNext: { print("next \($0)") }, onCompleted: { print("completed\n") })
        }
    

    这里的just方法会发送一个单一的元素:当前时间戳,还会在发一个complete事件,scope设置的是forever
    打印结果:

    Performing work ...
    next 1622777422.7355828
    completed
    
    next 1622777422.7355828
    completed
    
    next 1622777422.7355828
    completed
    

    第一次订阅时,执行了subscribe handler,打印了Performing work ... next和complete,之后两次订阅时是对第一次的重放,打印的时间戳也一样。我们设置重放的个数为1,所以会重放一次next,同时也会重放complete;第一次执行complete其实以及断开连接了,但是依然能重放,说明就算连接断开了依然支持重放,这就是forever的意思。
    如果改成.willConnected
    则无法支持重放,打印结果:

    Performing work ...
    next 1622778439.6327991
    completed
    
    Performing work ...
    next 1622778439.6332831
    completed
    
    Performing work ...
    next 1622778439.633359
    completed
    

    这里相当于执行了三遍subscre handler,所以Performing work ...打印的三次,而且3次next打印的时间戳不一样;为什么呢,因为第一订阅之后就执行了complete事件了(just操作符会执行一次onNext+一次onComplete),这时候连接断开,再次订阅时需要重新建立连接,然又会重新执行subscribe handler。
    我们再看看share的源码:

          public func share(replay: Int = 0, scope: SubjectLifetimeScope = .whileConnected)
            -> Observable<Element> {
            switch scope {
            case .forever:
                switch replay {
                case 0: return self.multicast(PublishSubject()).refCount()
                default: return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()
                }
            case .whileConnected:
                switch replay {
                case 0: return ShareWhileConnected(source: self.asObservable())
                case 1: return ShareReplay1WhileConnected(source: self.asObservable())
                default: return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount()
                }
            }
        }
    

    当scope == forever时,调用的multicast, replay == 0 连接的subject是PublishSubject,这个跟publish的操作符是一样的,不一样的是后面接了一个refCount操作符。
    refCount返回一个RefCount对象,并将当前对象作为source

        public func refCount() -> Observable<Element> {
            RefCount(source: self)
        }
    

    RefCount是一个与源保持连接的可观察序列,只要该可观察序列至少有一次订阅。RefCount有一个对应的RefCountSink类,外面执行订阅时会调用RefCountSink的run方法:

     func run() -> Disposable {
            let subscription = self.parent.source.subscribe(self)
            self.parent.lock.lock(); defer { self.parent.lock.unlock() }
    
            self.connectionIdSnapshot = self.parent.connectionId
    
            if self.isDisposed {
                return Disposables.create()
            }
    
            if self.parent.count == 0 {
                self.parent.count = 1
                self.parent.connectableSubscription = self.parent.source.connect()
            }
            else {
                self.parent.count += 1
            }
            /// 省略。。。
    

    第一句执行self.parent.source.subscribe(self),这里的parent是RefCount对象,而它的source是ConnectableObservableAdapter对象,它的subscribe代码:

        override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
            self.lazySubject.subscribe(observer)
        }
    

    lazySubject是要连接的Subject,之后调用subject的subscribe,这个subscribe会执行重放和插入观察者操作,回到RefCountSink的run:
    后面有一个关键代码:

            if self.isDisposed {
                return Disposables.create()
            }
    

    因为第一订阅时,会执行complete,所以执行当前dispose,第二次订阅时self.isDisposed == true,直接返回了。所以第二次第三次订阅时不会执行subscribe handler,只做重放。
    如果share操作改成share(replay:2, .whileConnected), 执行的代码是return self.multicast(makeSubject: { ReplaySubject.create(bufferSize: replay) }).refCount(), 这个跟 forever分支内的 return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()代码很像,为什么一个是forever一个是whileConnected。关键在于return self.multicast(ReplaySubject.create(bufferSize: replay)).refCount()这里只会创建一个ReplaySubject对象,而 { ReplaySubject.create(bufferSize: replay) }是一个闭包,每次在执行闭包时都会创建一个新的subject。所以每次订阅时都会创建一个新的subject。新的subject不存在重放的buffer;上面的代码self.isDisposed 一直是false,这导致每次需要再次连接。而每次连接都会执行subscribe handler。

    还剩下下面两个case的代码没有解读:

     case 0: return ShareWhileConnected(source: self.asObservable())
     case 1: return ShareReplay1WhileConnected(source: self.asObservable())
    

    ShareWhileConnected 处理无buffer的情况,ShareReplay1WhileConnected处理一个buffer的情况。这两个类非常相似,这两个分支并没有连接Subject,而是用ShareReplay1WhileConnected和ShareWhileConnected代替了Subject的功能。在ShareReplay1WhileConnected和ShareWhileConnected中,保存了所有的观察者;在ShareReplay1WhileConnected中保存了最后一个元素,这是为了实现重放。
    它们都持有一个Connection,分别是ShareWhileConnectedConnection和ShareReplay1WhileConnectedConnection,这两个类实现了连接:

        final func connect() {
            self.subscription.setDisposable(self.parent.source.subscribe(self))
        }
    

    这里会去调用原始序列的subscribe,最终调用subscribe handler。
    同时实现了订阅和通知观察者的功能,代码类似于Subject。

        fileprivate var observers = Observers()
        private var element: Element?
    

    那这两个类是怎么实现whileConnected效果的? 看看ShareReplay1WhileConnected的subscribe的实现:

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element {
            self.lock.lock()
            let connection = self.synchronized_subscribe(observer)
            let count = connection.observers.count
    
            let disposable = connection.synchronized_subscribe(observer)
            self.lock.unlock()
            
            if count == 0 {
                connection.connect()
            }
    
            return disposable
        }
    
        @inline(__always)
        private func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Connection where Observer.Element == Element {
            let connection: Connection
    
            if let existingConnection = self.connection {
                connection = existingConnection
            }
            else {
                connection = ShareReplay1WhileConnectedConnection<Element>(
                    parent: self,
                    lock: self.lock)
                self.connection = connection
            }
    
            return connection
        }
    

    当count == 0 执行连接,如果发送complete事件断开连接后,此时再次订阅,则count依然是==0 需要重新连接,这样就会再执行一遍subscribe handler,如果没有发送complete时间,则只会连接一次,后面的订阅只会重放buffer。

    replay

    replay操作符实际上是通过self.multicast { ReplaySubject.create(bufferSize: bufferSize) }实现,效果类似于publish,只不过publish不带buffer,replay可以指定buffer个数。

    总结

    普通可观察序列都可以通过以上操作符转变成可连接的序列,可连接序列只有在连接之后才能发送事件元素,而whileConnected和forever区别在于:

    • whileConnected:每个连接将有它自己Subject实例来存储重放事件。
    • forever:一个Suject将存储到source的所有连接的重放事件。

    相关文章

      网友评论

        本文标题:05. RxSwift源码解读:Connection

        本文链接:https://www.haomeiwen.com/subject/qqpzsltx.html