美文网首页
RxSwift-publish

RxSwift-publish

作者: lmfei | 来源:发表于2020-04-01 22:04 被阅读0次

Publish的作用,它可以在序列被订阅后并不会收到消息,只有经过connect后,才会收到消息,并且在被多次订阅后subscribeHandler只会被执行一次,这个是非常实用的,比如在subscribeHandler中发送请求,拿到数据后,发送消息,如果不用publish处理,会发送多次请求,浪费带宽,设计也不合理,这时祭出Publish,万事解决,还有谁,下面详细了解下它的使用与源码解析

使用

let ob = Observable<String>.create { (observe) -> Disposable in
    print("我来了")
    observe.onNext("h")
    return Disposables.create{
        print("Observable Disposables")
    }
    }.publish()

_ = ob.subscribe(onNext: {
    print("订阅1:\($0)")
}).disposed(by: disposeBag)

_ = ob.subscribe(onNext: {
    print("订阅2:\($0)")
}).disposed(by: disposeBag)

_ = ob.connect()

打印

我来了
订阅1:h
订阅2:h

源码解析

//Multicast.swift
extension ObservableType {
    public func publish() -> ConnectableObservable<Element> {
        return self.multicast { PublishSubject() }
    }
}

调用multicast方法,并传入一个PublishSubject

//Multicast.swift
extension ObservableType {  
    public func multicast<Subject: SubjectType>(makeSubject: @escaping () -> Subject)
        -> ConnectableObservable<Subject.Element> where Subject.Observer.Element == Element {
        return ConnectableObservableAdapter(source: self.asObservable(), makeSubject: makeSubject)
    }
}

创建内部类ConnectableObservableAdapter时,传入source-源序列和makeSubject-前面闭包中生成的PublishSuject

final private class ConnectableObservableAdapter<Subject: SubjectType>
    : ConnectableObservable<Subject.Element> {
    typealias ConnectionType = Connection<Subject>

    fileprivate let _source: Observable<Subject.Observer.Element>
    fileprivate let _makeSubject: () -> Subject

    fileprivate let _lock = RecursiveLock()
    fileprivate var _subject: Subject?

    // state
    fileprivate var _connection: ConnectionType?

    init(source: Observable<Subject.Observer.Element>, makeSubject: @escaping () -> Subject) {
        self._source = source  //源序列
        self._makeSubject = makeSubject //前面用来创建PublishSubject的闭包
        self._subject = nil
        self._connection = nil
    }

    override func connect() -> Disposable {
        return self._lock.calculateLocked {
            if let connection = self._connection { //判断是否已经进行过订阅
                return connection
            }
  
            let singleAssignmentDisposable = SingleAssignmentDisposable()
            let connection = Connection(parent: self, subjectObserver: self.lazySubject.asObserver(), lock: self._lock, subscription: singleAssignmentDisposable)
            self._connection = connection
            let subscription = self._source.subscribe(connection)
            singleAssignmentDisposable.setDisposable(subscription)
            return connection
        }
    }

    fileprivate var lazySubject: Subject {
        if let subject = self._subject {
            return subject
        }

        let subject = self._makeSubject()
        self._subject = subject
        return subject
    }

    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Subject.Element {
        return self.lazySubject.subscribe(observer)
    }
}

ConnectableObservableAdapter中重要点

  • 声明了一个lazySubject,这个属性只会初始化一次,它的值就是_makeSubject闭包返回的PublishSubject
  • 实现了subscribe方法,调用lazySubject.subscribe(observer)
  • 实现了connect方法,方法内部创建了Connection对象,然后调用_source.subscribe(connection),也就是源序列的subscribe订阅方法,并传入connection
//Multicast.swift
final private class Connection<Subject: SubjectType>: ObserverType, Disposable {
    typealias Element = Subject.Observer.Element

    private var _lock: RecursiveLock
    // state
    private var _parent: ConnectableObservableAdapter<Subject>?
    private var _subscription : Disposable?
    private var _subjectObserver: Subject.Observer

    private let _disposed = AtomicInt(0)

    init(parent: ConnectableObservableAdapter<Subject>, subjectObserver: Subject.Observer, lock: RecursiveLock, subscription: Disposable) {
        self._parent = parent
        self._subscription = subscription
        self._lock = lock
        self._subjectObserver = subjectObserver
    }

    func on(_ event: Event<Subject.Observer.Element>) {
        if isFlagSet(self._disposed, 1) {
            return
        }
        if event.isStopEvent {
            self.dispose()
        }
        self._subjectObserver.on(event)
    }
    ...
}

在调用connect方法时,会创建内部类Connection,以备后用,这个类中主要实现了on方法,on方法调用makeSubject的on方法

除PublishSubject和Observable序列处理逻辑外,剩下的关键代码已经罗列出来,剩下就来梳理下完整的流程

  1. 创建序列,生成内部类ConnectableObservableAdapter,保留源序列_source,通过muticast传入闭包生成序列_makeSubject,声明属性lazySubject,保证_makeSubject只创建一次,并实现subscribe、connect方法
  2. 订阅序列,调用1中subscribe,然后执行PublishSubject订阅流程
  3. 执行connect,创建内部类Connection,然后执行源序列_source的subscribe方法并传入内部类对象Connection。接下来会执行源序列的订阅流程,执行_subscribeHandler闭包,闭包中会执行onNext方法,接下来走到源序列的subscribe闭包中,也就是Connection中,接下来执行Connection的on方法,进而执行lazySubject的on方法,最终执行到lazySubject的subscribe闭包方法,结束一个完整流程
    接下来依旧来看下思维导图


    Publish

生活如此美好,今天就点到为止。。。

相关文章

  • RxSwift-publish

    Publish的作用,它可以在序列被订阅后并不会收到消息,只有经过connect后,才会收到消息,并且在被多次订阅...

  • RxSwift-publish源码解析

    publish使用 我们看到网络只会请求一次。这种请求一次,订阅到多个不同的地方的场景很多。所以我们有必要了解一下...

网友评论

      本文标题:RxSwift-publish

      本文链接:https://www.haomeiwen.com/subject/vluzyhtx.html