美文网首页程序员
【领略RxSwift源码】- 主题类族(Subject)

【领略RxSwift源码】- 主题类族(Subject)

作者: Maru | 来源:发表于2018-01-15 01:47 被阅读294次

(一)前言

前两篇文章分析了RxSwift的整个基础的订阅流程以及变换操作(Operators)的概念实现,有兴趣的读者可以点击以下链接。

【领略RxSwift源码】- 订阅的工作流(Subscribing)
【领略RxSwift源码】- 变换操作(Operators)

本篇文章将阐述Subject的概念以及在RxSwift当中的具体实现,在分析源码的过程中,我们或许会发现一个不一样的世界,或许我们会看到平时看不到的风景。

(二)SubjectType

ReactiveX的世界中,一共定义了4种不同的Subject,分别是AsyncSubjectBehaviorSubjectPublishSubjectReplaySubject。无一例外,这四种Subject都实现了SubjectType协议,当然这也是非常朴素的面向协议了😂。

我们来看看SubjectType协议:

/// Represents an object that is both an observable sequence as well as an observer.
public protocol SubjectType : ObservableType {
    /// The type of the observer that represents this subject.
    ///
    /// Usually this type is type of subject itself, but it doesn't have to be.
    associatedtype SubjectObserverType : ObserverType

    /// Returns observer interface for subject.
    ///
    /// - returns: Observer interface for subject.
    func asObserver() -> SubjectObserverType
    
}

我们可以看到,在SubjectType中定义了一个ObserverType类型的associatedtype以及一个func asObserver() -> SubjectObserverType的方法。于此同时,它也是继承自Observable。也就是说,一个SubjectType既是一个观察者Observer,又是一个可观察序列(Observable)。

(三)Subject的实现细节

AsyncSubject

问:AsyncSubject 是一个具有什么样特性的Subject?

答:简单的来说,当AsyncSubject被订阅的时候,如果AsyncSubject已经发送过了.complete事件,那么订阅者只能得到最后一个序列的值(如果没有发送过序列那么不触发任何订阅)。如果没有发送.complete事件,那么订阅者一直都不会订阅到值,直到AsyncSubject发送了.complete事件。

我们可以先来看一看AsyncSubject的继承和协议:

public final class AsyncSubject<Element>
    : Observable<Element>
    , SubjectType
    , ObserverType
    , SynchronizedUnsubscribeType {
...

SubjectType刚刚我们已经看过它的定义了,而ObserverType也在之前的文章中有过认识,那么只剩下SynchronizedUnsubscribeType是还没有见到过的一个协议,看它的名字貌似是“同步取消订阅者”的一个协议,我们来看一下具体的定义:

protocol SynchronizedUnsubscribeType : class {
    associatedtype DisposeKey

    func synchronizedUnsubscribe(_ disposeKey: DisposeKey)
}

emmm...看样子是定义一个DisposeKey,然后可以通过这个DisposeKey来同步取消订阅。而这个DisposeKey其实就是一个BagKey的结构体,这个结构体只有一个UInt64类型的存储属性rawValue,如下:

struct BagKey {
    fileprivate let rawValue: UInt64
}

那么既然是移除所有的订阅者,那么这些订阅者被存储在哪里呢?

在RxSwift中定义了一个数据结构叫做Bag,它是一个用来存储少量元素的高效容器,它的插入删除时间复杂度为O(n)。

struct Bag<T> : CustomDebugStringConvertible { ... }

值得一提的是,在Bag的内部,真正存储元素的容器并不是我们常用的Array类型,而是使用了ContiguousArray。我们可以看一下ContiguousArray的官方解释:

/// If your array's `Element` type is a class or `@objc` protocol and you do
/// not need to bridge the array to `NSArray` or pass the array to Objective-C
/// APIs, using `ContiguousArray` may be more efficient and have more
/// predictable performance than `Array`. If the array's `Element` type is a
/// struct or enumeration, `Array` and `ContiguousArray` should have similar
/// efficiency.

显然,使用ContiguousArray这是因为ContiguousArray在处理class或者@objc修饰的类型的时候更加的高效,而在处理Swift基础类型的时候效率就和Array差不多了。

注:还有值得一提的是&=操作符,这是一个日常开发中很少使用到的一个操作符。与普通的加法操作符(+)的区别在于,当加法操作完成之后的结果类型溢出之后,任然可以安全的使用不至于奔溃;

例如:

let val: Int8 = 64
val + 64 // output: error
val &+ 64 // output: -128

OK~ 我们认识到了在AsyncSubject中使用Box来存储,那么具体的实现AsyncSubject的逻辑是怎么样的呢?我们先来看一下AsyncSubject有着那些属性:

    typealias Observers = AnyObserver<Element>.s
    typealias DisposeKey = Observers.KeyType

    /// Indicates whether the subject has any observers
    public var hasObservers: Bool {
        _lock.lock(); defer { _lock.unlock() }
        return _observers.count > 0
    }

    let _lock = RecursiveLock()

    // state
    private var _observers = Observers()
    private var _isStopped = false
    private var _stoppedEvent = nil as Event<Element>? {
        didSet {
            _isStopped = _stoppedEvent != nil
        }
    }
    private var _lastElement: Element?

    #if DEBUG
        fileprivate let _synchronizationTracker = SynchronizationTracker()
    #endif

我们可以看到,重点的实现相关逻辑的属性都被标注成了private_observers是一个存储元素类型为Event<Element>) -> ()Box_isStopped是一个Bool类型的flag,一旦发送了complete或者error时间,那么这个flag就会置为true。而_stoppedEvent则是一个可选的Event<Element>类型,它永远是最新发送的一个事件,如果从来没有发送next事件,那么这个属性就永远为空。

由于Subject既有Observer的特性又有Observable的特性,所以我们一个一个看它如何实现这些特性。我们先来看看Observer

    /// Notifies all subscribed observers about next event.
    ///
    /// - parameter event: Event to send to the observers.
    public func on(_ event: Event<E>) {
        #if DEBUG
            _synchronizationTracker.register(synchronizationErrorMessage: .default)
            defer { _synchronizationTracker.unregister() }
        #endif
        let (observers, event) = _synchronized_on(event)
        switch event {
        case .next:
            dispatch(observers, event)
            dispatch(observers, .completed)
        case .completed:
            dispatch(observers, event)
        case .error:
            dispatch(observers, event)
        }
    }

在这里有两个比较关键的函数,一个是_synchronized_on,另一个是dispatch_synchronized_on是实现AsyncSubject的关键函数,我们可以待会了解了其他细节之后再看。

dispatch是一个在Bag+Rx.swift中定义的一个内联函数,它的主要作用是给bag内的所有(Event<E>) -> ()闭包对象派发执行一个指定的事件(Event)。源码如下:

@inline(__always)
func dispatch<E>(_ bag: Bag<(Event<E>) -> ()>, _ event: Event<E>) {
    if bag._onlyFastPath {
        bag._value0?(event)
        return
    }

    let value0 = bag._value0
    let dictionary = bag._dictionary

    if let value0 = value0 {
        value0(event)
    }

    let pairs = bag._pairs
    for i in 0 ..< pairs.count {
        pairs[i].value(event)
    }

    if let dictionary = dictionary {
        for element in dictionary.values {
            element(event)
        }
    }
}

注:在Swift中我们可以通过@ inline关键字来标识一个函数是内联函数。简单的来说,在Swift中我们有三种"内联策略": sometimes, nerver, always。
sometimes: 当我们申明一个函数的时候,默认这个函数的内联策略就是sometimes的。这个时候,swift的编译器会自动的为它所认为足够短小的函数增添上内联特性,而对于相对而言比较庞大的函数不使用内联特性,以此达到代码执行优化的目的。
always: 当我们需要某个函数强制内联的时候,我们只需要在函数之前加上@inline(__always)关键字。当编译器检测到该关键字的时候,编译器就知道在这里永远都需要内联展开,就不会执行自己的那一套默认的内链优化策略。
nerver: 当我们需要某一个函数永远都不要进行内联的时候,我们只需要在函数之前加上@inline(never)。那么,当编译器检测到该关键字的时候,编译器就知道在这里永远都需要内联展开。

正如我们看到的,dsipatch方法是基于Bag<(Event<E>) -> ()>类型的容器来实现的,之所以之前有一堆复杂的判断逻辑,就是因为优化代码执行效率。当bag中只有一个元素的时候,_onlyFastPathtrue,那么我们只需要取出那一个执行操作就可以了。然而当我们超过一个,小于三十个的时候,我们会将元素存储在ContiguousArray中,通过下标的方式来获取元素进行操作。而当容器内元素超过30这个阈值的时候,我们就要将元素插入到字典中,需要使用的时候再用键值对取出使用。

那么,_synchronized_on到底是如何配合dispatch来实现AsyncSubject的特性的呢?

    func _synchronized_on(_ event: Event<E>) -> (Observers, Event<E>) {
        _lock.lock(); defer { _lock.unlock() }
        if _isStopped {
            return (Observers(), .completed)
        }

        switch event {
        case .next(let element):
            _lastElement = element
            return (Observers(), .completed)
        case .error:
            _stoppedEvent = event

            let observers = _observers
            _observers.removeAll()

            return (observers, event)
        case .completed:

            let observers = _observers
            _observers.removeAll()

            if let lastElement = _lastElement {
                _stoppedEvent = .next(lastElement)
                return (observers, .next(lastElement))
            }
            else {
                _stoppedEvent = event
                return (observers, .completed)
            }
        }
    }

当源序列发送next事件的时候,AsyncSubject仅仅使用内部的_lastElement属性来记录下当前的next事件,然后构造一个空的Bag来执行completed事件(相当于没做什么事情)。

当源序列发送error事件的时候,使用_stoppedEvent来记录最后的最后的事件,然后构造一个observers常量,将自身所有的观察者拷贝到observers常量中,将自身所有的观察者移除,最后把observers和该error事件返回。

(四)Subject的意义

当然,除了AsyncSubject之外,我们还有还有以下几种Subject:

  • PublishSubject: 标准的热信号,订阅者只会接收到订阅操作之后的事件。
  • ReplaySubject:订阅者会接受到订阅之前的事件以及订阅之后的事件,类似于冷信号。
  • BehaviorSubject:订阅之后首先会接收到最近一次发送的事件(如果最近没有发送,那么发送一个初始的事件)。
  • Variable: 基于BehaviorSubject的封装,会将初始值或者最近的值发送给订阅者。

然而写到这里,我并不想一一详细的分析剩下四种的实现细节。因为,与刚刚分析完成的AsyncSubject相比,其余的Subject实现的方法都没有太大的区别。所以笔者也不想在这里流水账似的浪费时间。

不如做一些更有意思的事情:为什么我们需要Subject?

现在我们先不妨设想一个这样的场景:

我们需要追踪用户在iPhone上的每一次点击,当用户点击一次系统就会调用一次screenDidTapped(on point: CGPoint)方法。

那么在ReactiveX中,我们自然可以想到类似这样的做法:

var observer: AnyObserver<CGPoint>!

let tapped = Observable<CGPoint>.create { (observer) -> Disposable in
    observer = observer
    return Disposables.create()
}

func screenDidTapped(on point: CGPoint) {
    observer.on(point)
}

然而这样的实现确实存在一些问题:

  • 一对一的限制

由于Observables的特性限制,如果我们希望有多个观察者来订阅该点击事件,那么Observables是无法做到的。当你存在两个及以上的订阅的时候,只有最新的观察者可以接收到序列的事件信息。

  • 订阅前的行为

还是由于Observables的特性,create构造器的闭包只会在第一次被订阅的时候会调用。然而当点击屏幕的时候,我们并不能保证就一定有观察者订阅了这个序列。

也就是说,当你遇到类似上述的情况的时候,你需要使用热信号(hot observeable)。

Hot Observables VS Cold observables

虽然冷热信号已经是被讲烂的话题了,但是既然写到这里已经是不得不说的地步了。

Bnaya Eshet在他的博客中对"冷热信号"有过一个非常形象的比喻:

if a tree falls in a forest and no one is around to hear it, does it make a sound? if it do make a sound when nobody observed it, we should mark it as hot, otherwise it should be marked as cold.
倘若一颗沙漠中的枯树黯然倾倒而无人问津,是不是可以说它从未对这个世界发出声音。倘若无人关心而算作发出了声音,那么它就是热信号,反之,则是冷信号。

我们再来看看冷热信号的对比:

Hot Observables Cold observables
属于序列 属于序列
无论有或者没有被订阅,都会产生事件。 只有被订阅的时候才会产生事件。
变量、属性、点击操作、鼠标操作、UI的变化等 异步操作、HTTP连接、TCP连接等
通常包含N个Next事件 通常只有一个Next事件
数据源的变化能够作用到所有的订阅者。 数据源的变化只会作用到当前的订阅者。
它是有状态的 它是无状态的

在现实世界的编程中,我们总是面对着各种各样复杂的情景。绝大多数的情况之下,我们的信号流可以是纯函数的,不可变的,安全的。然而当我们面对诸如鼠标追踪,变量的流式表达的时候,不可避免的我们需要使用到热信号。

当我们需要使用到热信号的时候,我们再根据当前的环境选择最适合的Subject。比如我们希望对数据有“回看”功能,那么我们就可以选择使用ReplaySubject。如果我们只关心最后一个数据变化,那么我们可以使用AsyncSubject,诸如此类等等。

(五)结语

对于Rx的使用者来说,我们更加希望使用的是Cold observables。从函数式的角度来说,Cold observables是不可变的,而Hot observables是可变的。不可变的数据总是更加的符合人的心智模型,而更加易于维护和理解,同时也更加的安全。希望这篇文章可以加深读者对于Rx的理解。

参考

  1. https://developer.apple.com/documentation/swift/operator_declarations
  2. http://xgrommx.github.io/rx-book/content/getting_started_with_rxjs/subjects.html
  3. To Use Subject Or Not To Use Subject?
  4. https://github.com/ReactiveX/RxSwift/blob/master/Documentation/HotAndColdObservables.md

相关文章

网友评论

    本文标题:【领略RxSwift源码】- 主题类族(Subject)

    本文链接:https://www.haomeiwen.com/subject/sksarxtx.html