美文网首页
RxSwift 源码解析

RxSwift 源码解析

作者: 时光啊混蛋_97boy | 来源:发表于2021-01-15 10:25 被阅读0次

原创:知识点总结性文章
创作不易,请珍惜,之后会持续更新,不断完善
个人比较喜欢做笔记和写总结,毕竟好记性不如烂笔头哈哈,这些文章记录了我的IOS成长历程,希望能与大家一起进步
温馨提示:由于简书不支持目录跳转,大家可通过command + F 输入目录标题后迅速寻找到你所需要的内容

郑重警告:垃圾玩意儿,浪费了我一天的宝贵时间,只想送给这个框架的设计者一个妈勒戈壁,泯灭人性的设计思路,比小猫玩乱的毛线团还乱。

目录

  • 一、RxSwift核心逻辑源码分析
    • 1、RxSwift核心流程
    • 2、创建序列源码分析
    • 3、订阅信号源码分析
    • 4、发送信号源码分析
  • 二、RxSwift功能模块源码分析
    • 1、Timer计时器的实现
    • 2、文本框UITextFiled与UITextView的实现
    • 3、map映射
    • 4、Scheduler调度者
    • 5、既是序列也是观察者的Subject
    • 6、Dispose销毁者

一、RxSwift核心逻辑源码分析

1、RxSwift核心流程

a、创建序列
// 1.创建序列
let obserber = Observable<Any>.create
{ (obserber) -> Disposable in
    // 3.发送信号
    ......
}
b、订阅信号
// 2.订阅信号
let _ = obserber.subscribe(
    onNext: { text in print("订阅公众号:\(text)")},
    onError: { (error) in print("订阅过程发生未知错误:\(error)")},
    onCompleted: { print("订阅完成") },
    onDisposed: { print("销毁观察者") })
c、发送信号
obserber.onNext("漫游在云海的鲸鱼")
obserber.onCompleted()
obserber.onError(NSError.init(domain: "unknowError", code: 1997, userInfo: nil))

return Disposables.create()
d、输出结果
成功订阅
订阅公众号:漫游在云海的鲸鱼
订阅完成
销毁观察者
订阅发生错误
订阅公众号:漫游在云海的鲸鱼
订阅过程发生未知错误:Error Domain=unknowError Code=1997 "(null)"
销毁观察者
e、代码分析
  • 1.创建序列的代码 Create 后面的闭包A里面有3.发送信号,如果要执行发送信号,必然要来到这个闭包A
  • 我们执行 2.订阅序列 创建了闭包B
  • 通过结果我们显然知道,先执行闭包A漫游在云海的鲸鱼字符串传给了闭包B
  • 猜测代码里面嵌套了闭包的执行调用!猜测是否准确需要解读源码来验证

2、创建序列源码分析

❶ create

进入create方法里开始探索源码。

let obserber = Observable<Any>.create
❷ ObservableType

可以很清晰看到可观察序列的创建是利用协议拓展功能的create方法实现的,里面创建了一个 AnonymousObservable(匿名可观察序列) 。

extension ObservableType 
{
    public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.Element>) -> RxSwift.Disposable) -> RxSwift.Observable<Self.Element>
}
❸ AnonymousObservable

AnonymousObservable的命名体现了作者的思维 :这个类就是一个内部类,具备一些通用特性(具有自己功能的类才会命名) 。点进AnonymousObservable,发现其在初始化方法中保存了外界传入的闭包。

final private class AnonymousObservable<Element>: Producer<Element>
{
    typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    let subscribeHandler: SubscribeHandler

    init(_ subscribeHandler: @escaping SubscribeHandler)
    {
        self.subscribeHandler = subscribeHandler
    }
    ...
}
❹ Producer

AnonymousObservable继承自Producer,再来看看它是什么。我们发现Producer重写了来自 Observable 类中非常重要的 subscribe 订阅方法。

class Producer<Element>: Observable<Element> 
{
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
    {
        ......
    }
}
❺ Observable

再追根溯源看下Producer的父类Observable长什么样子。我们看到 Observable 这个观察者类拥有非常重要的订阅方法,但是没有进行实现,而是提供给子类重写。

public class Observable<Element> : ObservableType
{
    public func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
    {
        rxAbstractMethod()
    }
    ...
}

Observable是所有RXSwift序列的基类,在这个基类里面做了三件事情,第一件事上面提到的为观察者对象提供了订阅方法。第二件是在ObservableConvertibleType中解释的asObservable方法。

public func asObservable() -> Observable<E>
{
    return self
}

第三件事是RXSwift框架自己制作的防治内存泄露类似MRC的引用计数器,在初始化的时候引用计数+1,在销毁的时候引用计数-1。

init()
{
#if TRACE_RESOURCES
    _ = Resources.incrementTotal()
#endif
}

deinit
{
#if TRACE_RESOURCES
    _ = Resources.decrementTotal()
#endif
}
❻ ObservableType

至于ObservableType是观察者实现的协议,在这个协议中拥有订阅方法,观察者实现了该协议方法所以能够进行订阅。

public protocol ObservableType: ObservableConvertibleType 
{
    func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
}
❼ ObservableConvertibleType

ObservableType协议还有一个父协议ObservableConvertibleType,该协议表示可以转换为可观察序列的类型(observable<Element>)。

public protocol ObservableConvertibleType
{
    // 序列中元素的类型
    associatedtype Element

    // 将 self 转换为 Observable 序列
    func asObservable() -> Observable<Element>
}

因为有些对象并不是序列,比如下面的value,这样就打破了RXSwift万物皆为序列的宗旨,所以我们想要将这些非序列的类型也转化为序列,这时候就可以用到asObservable()方法。

var switchControl = UISwitch()
switchControl.rx.value.asObservable()
// value非序列
public var value: ControlProperty<Bool>
public struct ControlProperty<PropertyType> : ControlPropertyType
public protocol ControlPropertyType : ObservableType, ObserverType
// asObservable将其转化为序列
public func asObservable() -> Observable<E>
{
    return self._values
}
let _values: Observable<PropertyType>
self._values = values.subscribeOn(ConcurrentMainScheduler.instance)
❽ 继承图

从以上的流程发现存在多层嵌套,通过下图来进行表示。


3、订阅信号源码分析

❶ subscribe

接下来探索订阅信号,唉,费时间呀。注意这里说明的这个订阅方法 subscribe 和我们上面所说的Producer中的subscribe不是同一个方法。

let _ = obserber.subscribe(
    onNext: { text in print("订阅公众号:\(text)")},
    onError: { (error) in print("订阅过程发生未知错误:\(error)")},
    onCompleted: { print("订阅完成") },
    onDisposed: { print("销毁观察者") })
❷ ObservableType

订阅序列来自于对 ObservableType 的拓展功能。

extension ObservableType
{
    public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> Disposable
    {
        let observer = AnonymousObserver<Element>
        { event in
            switch event
            {
            case .next(let value):
                onNext?(value)
            case .error(let error):
                if let onError = onError
                {
                    onError(error)
                }
                else
                {
                    Hooks.defaultErrorHandler(callStack, error)
                }
                disposable.dispose()
            case .completed:
                onCompleted?()
                disposable.dispose()
            }
        }
        return Disposables.create(
            self.asObservable().subscribe(observer),
            disposable
        )
    }
}
❸ Element
public func subscribe(onNext: ((Element) -> Void)

Element 这里的意思是 Swift 的关联类型,如果仔细看过可观察序列的继承链源码应该不难得出这个 Element 就是我们的序列类型,其真实类型来自于我们在创建序列时候传入的类型let obserber = Observable<Any>.create,我们这里传入的类型是Any,其也可以为NSString等类型。

public protocol ObservableConvertibleType
{
    // 序列中元素的类型
    associatedtype Element

    // 将 self 转换为 Observable 序列
    func asObservable() -> Observable<Element>
}
❹ AnonymousObserver

外界传入了序列类型,这里是之前提到的Any

let observer = AnonymousObserver<Element>

创建了一个 AnonymousObserver (匿名内部观察者) 手法和我们的 AnonymousObservable 差不多。

final class AnonymousObserver<Element>: ObserverBase<Element>
{
}

它的初始化是闭包参数,保存了外界传入的 onNextonErroronCompletedonDisposed闭包。

typealias EventHandler = (Event<Element>) -> Void
private let eventHandler : EventHandler

init(_ eventHandler: @escaping EventHandler)
{
    self.eventHandler = eventHandler
}
❺ ObserverBase

这个是所有订阅者的基类,提供了3个基本方法。on方法处理传递进来的event,再调用本类中的onCore抽象方法进行处理。

func on(_ event: Event<Element>)
{
    switch event
    {
    case .next:
        if load(self._isStopped) == 0 {
            self.onCore(event)
        }
    case .error, .completed:
        if fetchOr(self._isStopped, 1) == 0 {
            self.onCore(event)
        }
    }
}

onCore方法由子类重载实现,在子类AnonymousObserver类中即对其进行了实现,调用了在创建序列中保存的闭包。

final class AnonymousObserver<ElementType> : ObserverBase<ElementType>
{

    typealias EventHandler = (Event<Element>) -> Void
    private let _eventHandler : EventHandler
    override func onCore(_ event: Event<Element>)
    {
        return self._eventHandler(event)
    }
}

最后一个方法是dispose,因为订阅者基类需要将订阅完成后的序列销毁掉。

func dispose()
{
    fetchOr(self._isStopped, 1)
}
❻ Producer
self.asObservable().subscribe(observer),

转化为可观察序列后,调用订阅方法。通过可观察序列的继承关系,我们可以非常快速的定位到 Producer 里面的订阅方法。Producer里面处理了订阅、线程调用CurrentThreadScheduler、销毁return disposer三件事情。

class Producer<Element>: Observable<Element>
{
    ...
    override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
    {
        if !CurrentThreadScheduler.isScheduleRequired
        {
            ...
        }
        else
        {
            return CurrentThreadScheduler.instance.schedule(())
            { _ in
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                ...

                return disposer
            }
        }
    }
}
❼ AnonymousObservable

将匿名观察者序列observer作为参数传递了进去。

let observer = AnonymousObserver<Element>
self.asObservable().subscribe(observer)

在生产者Producer中将前面传入的observer作为参数再次传递到了下一层。

let sinkAndSubscription = self.run(observer, cancel: disposer)

self.run 这个代码最终由我们生产者 Producer 延伸到我们具体的事务代码 AnonymousObservable类里面的run方法。通过传递进来的observer创建了AnonymousObservableSink这个用于连通序列和订阅者的管子,并调用了管子里面的run方法。

final private class AnonymousObservable<Element>: Producer<Element>
{
    ...
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element
    {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}
❽ AnonymousObservableSink

Producerrun方法只是一个抽象类,提供给子类重写实现。

class Producer<Element>: Observable<Element>
{
    let subscription = sink.run(self)

    func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element
    {
        rxAbstractMethod()
    }
}

在子类AnonymousObservable实现了run方法,又调用了AnonymousObservableSink中的run方法,将AnonymousObservable对象作为参数传入run方法,即这里的self就是AnonymousObservable对象。

final private class AnonymousObservable<Element>: Producer<Element>
{
    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element
    {
        let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
        let subscription = sink.run(self)
        return (sink: sink, subscription: subscription)
    }
}

parent 就是上面通过sink.run(self)传过来的AnonymousObservable对象。我们非常兴奋的看到 AnonymousObservable._subscribeHandler,从这句代码我们解惑了为什么订阅信号的时候会执行创建序列时候的闭包,再去执行闭包里面的发送信号。

final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType
{
    ...
    func run(_ parent: Parent) -> Disposable
    {
        parent.subscribeHandler(AnyObserver(self))
    }
}

在这个构造方法里面,我们创建了一个结构体AnyObserverAnyObserver(self)中的self指的就是AnonymousObservableSink。 在这个结构体的初始化方法中保存了一个AnonymousObservableSink .on 函数。self.observer这里不是观察者,而是一个函数。

public struct AnyObserver<Element> : ObserverType
{
    private let observer: EventHandler

    // 构造一个实例,其 on(event)调用 观察员(事件)
    // 参数:接收序列事件的观察器
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element
    {
        self.observer = observer.on
    }
}

4、发送信号源码分析

❶ ObserverType

从上面的分析,我们可以看出obserber.onNext("漫游在云海的鲸鱼")的本质是AnyObserver.onNext("漫游在云海的鲸鱼")。这时候发现我们的AnyObserver没有这个方法,这很正常,按照一般思路寻找父类或者协议。

extension ObserverType
{
    // 等效于on(.next(element:element))的便利方法
    // 参数:要发送给观察者的下一个元素
    public func onNext(_ element: Element)
    {
        self.on(.next(element))
    }
    
    // 等同于on(.completed)的便利方法
    public func onCompleted()
    {
        self.on(.completed)
    }
    
    // 相当于on(.error)的便利方法
    // 参数:将错误发送给观察者
    public func onError(_ error: Swift.Error)
    {
        self.on(.error(error))
    }
}
❷ AnyObserver

AnyObserver.onNext("漫游在云海的鲸鱼")变形为AnyObserver.on("漫游在云海的鲸鱼")。这个AnyObserver调用的 on 方法里面传的event.next函数,.next函数带有我们最终的参数"漫游在云海的鲸鱼"文本。

public struct AnyObserver<Element> : ObserverType
{
    // 构造一个实例,其 on(event)调用 观察员(事件)
    // 参数:接收序列事件的观察器
    public init<Observer: ObserverType>(_ observer: Observer) where Observer.Element == Element
    {
        self.observer = observer.on
    }

    // 将 event 发送给此观察者
    // 参数:事件实例
    public func on(_ event: Event<Element>)
    {
        self.observer(event)
    }
}
❸ AnonymousObservableSin

self.observer(event)会转化为AnonymousObservableSink .on(event) 其中 event = .next("漫游在云海的鲸鱼"),因为 AnonymousObservableSink 继承自 Sink ,最终我们的核心逻辑又回到了 sink 这个神奇的管子。

final private class AnonymousObservableSink<Observer: ObserverType>: Sink<Observer>, ObserverType
{
    func on(_ event: Event<Element>)
    {
        switch event
        {
        case .next:
            if load(self.isStopped) == 1
            {
                return
            }
            self.forwardOn(event)
        case .error, .completed:
            if fetchOr(self.isStopped, 1) == 0
            {
                self.forwardOn(event)
                self.dispose()
            }
        }
    }
}
❹ Sink

self.forwardOn(event) 这也是执行的核心代码。其中 self._observer 就是我们初始化时候保存的观察者AnonymousObserver。那么我们变形得出本质就是:AnonymousObserver.on(.next("漫游在云海的鲸鱼"))。 这里逻辑辗转回到了我们订阅信号时候创建的 AnonymousObserver 的参数闭包的调用!

所有的一切感觉是这样的啰嗦,她妈的,绕来绕去,浪费老子的分析时间,有病,都想爆锤框架开发者了,搞得我迷迷糊糊晕晕眩眩,分析这个鬼东西毫无意义嘛。

class Sink<Observer: ObserverType>: Disposable
{
    final func forwardOn(_ event: Event<Observer.Element>)
    {
        if isFlagSet(self.disposed, 1)
        {
            return
        }
        self.observer.on(event)
    }
}

二、RxSwift功能模块源码分析

1、Timer计时器的实现

a、通过RxSwift使用计时器的方式
timer = Observable<Int>.interval(RxTimeInterval.seconds(1), scheduler: MainScheduler.instance)
timer.subscribe(onNext: { num in print("下雪了❄️")})
    .disposed(by: disposeBag)
b、Timer 类

调用了来自生产者父类Producerrun方法,再将功能实现下沉到TimerSink管道,sink.run()返回处理完成的结果。

final private class Timer<Element: RxAbstractInteger>: Producer<Element>
{
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element
    {
        let sink = TimerSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
}
c、TimerSink类

TimerSink继承自连通序列和观察者用来处理实际业务的管道基类,专门负责实现定时器的相关业务。Parent就是创建TimerSink(parent: self...)时传入的计时器TimerTimerSink里的实现的run方法调用了self.parent.scheduler.schedulePeriodic来处理定时器功能。

final private class TimerSink<Observer: ObserverType> : Sink<Observer> where Observer.Element : RxAbstractInteger
{
    typealias Parent = Timer<Observer.Element>
    private let parent: Parent
    private let lock = RecursiveLock()

    init(parent: Parent, observer: Observer, cancel: Cancelable)
    {
        self.parent = parent
        super.init(observer: observer, cancel: cancel)
    }

    func run() -> Disposable
    {
        return self.parent.scheduler.schedulePeriodic(0 as Observer.Element, startAfter: self.parent.dueTime, period: self.parent.period!)
        { state in
            self.lock.performLocked
            {
                self.forwardOn(.next(state))
                return state &+ 1
            }
        }
    }
}
d、DispatchQueueConfiguration类
struct DispatchQueueConfiguration
{
    let queue: DispatchQueue
}

可以看到RXSwift底层使用的是GCD的方式实现了计时器,而不是使用的NSTimer或者CADisplayLink,难怪当我们拖动滚动视图的时候,计时器仍然可以继续工作。在timer.setEventHandler中我们设置了计时器的响应事件,通过action执行了timer.subscribe(onNext: { num in print("下雪了❄️")})里面打印"下雪了❄️"的事件,再将执行完成后的最新状态timerState不断发送出去,直到计时器停止销毁。

extension DispatchQueueConfiguration
{
    func schedulePeriodic<StateType>(_ state: StateType, startAfter: RxTimeInterval, period: RxTimeInterval, action: @escaping (StateType) -> StateType) -> Disposable
    {
        // 初始化为系统当前时间
        let initial = DispatchTime.now() + startAfter

        var timerState = state

        // 底层使用GCD的方式实现了计时器
        let timer = DispatchSource.makeTimerSource(queue: self.queue)
        timer.schedule(deadline: initial, repeating: period, leeway: self.leeway)

        // 设置响应事件
        timer.setEventHandler(handler: {
            // 计时器被销毁了
            if cancelTimer.isDisposed
            {
                return
            }
            // 不断发送最新状态
            timerState = action(timerState)
        })
        // 启动计时器
        timer.resume()
        
        return cancelTimer
    }
}
e、action方法

scheduleRelative方法中的参数action就是action(timerState)这个方法的来源

extension DispatchQueueConfiguration
{
    func scheduleRelative<StateType>(_ state: StateType, dueTime: RxTimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable
    {
    }
}

而这个入参是在TimerSink类中的run方法里面的返回值self.parent.scheduler.schedulePeriodic的尾随闭包,即从state in开始后面的一大段。在这个闭包里面,调用了forwardOn(.next(state)处理打印"下雪了❄️"的事件后再将state状态值+1返回回去。

final private class TimerSink<Observer: ObserverType> : Sink<Observer> where Observer.Element : RxAbstractInteger
{
    func run() -> Disposable
    {
        return self.parent.scheduler.schedulePeriodic(0 as Observer.Element, startAfter: self.parent.dueTime, period: self.parent.period!)
        { state in
            self.lock.performLocked
            {
                self.forwardOn(.next(state))
                return state &+ 1
            }
        }
    }
}

管道基类中的forwardOn(.next(state)会调用到观察者序列的observeron方法来处理事件event

class Sink<Observer: ObserverType>: Disposable
{
    final func forwardOn(_ event: Event<Observer.Element>)
    {
        self.observer.on(event)
    }
}

observer.on(event)则会进入到下面所示创建observer的位置,执行subscribeonNext?(value)方法。最终执行了我们的timer.subscribe(onNext: { num in print("下雪了❄️")})语句,将下雪了❄️不断打印出来。

extension ObservableType
{
    public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable
    {
        let observer = AnonymousObserver<Element>
        { event in
            switch event
            {
            case .next(let value):
                onNext?(value)
            }
        }
    }
}

2、文本框UITextFiled与UITextView的实现

疑问一:为什么通过代码而不是键盘赋予新值却没调用
override func touchesBegan(_ touches: Set<UITouch>, with event: UIEvent?)
{
    textFiled.text = "王牌"
}

点击时候并没有按照预期输出"王牌"。

textView.rx.text.subscribe(onNext: { (text) in
    print("textView:输入来了 \(text)")
})

override func touchesBegan(_ touches: Set<UITouch>, with event: UIEvent?)
{
    textView.text = "哈利波特"
}

但是使用textView的话,通过代码而不是键盘赋予新值却调用了订阅者序列里面的回调函数,输出了结果。

textView:输入来了 Optional("哈利波特")

解答疑问一
TextField

RXSwift确认TextField的值是否发生了改变是通过控件基类里面的editingEvents,而editingEvents包括了[.allEditingEvents, .valueChanged]两种,valueChanged只针对UISwitch或者UISlider等才会起作用,对于TextField无效,至于allEditingEvents包括开始编辑、触摸等,与通过代码方式赋值矛盾,因为代码方式赋值不属于ControlEvent,所以不会生效,故此才会产生了通过代码而不是键盘赋予新值却没调用订阅者序列里面的回调函数打印"王牌"的问题。

extension Reactive where Base: UITextField
{
    public var value: ControlProperty<String?>
    {
        return base.rx.controlPropertyWithDefaultEvents
        (
            getter:
            { textField in
                textField.text
            },
            setter:
            { textField, value in
                if textField.text != value
                {
                    textField.text = value
                }
            }
        )
    }
}
extension Reactive where Base: UIControl
{
    internal func controlPropertyWithDefaultEvents<T>
    (
        editingEvents: UIControl.Event = [.allEditingEvents, .valueChanged],
    )
}
public static var editingDidBegin: UIControl.Event { get }
public static var editingChanged: UIControl.Event { get }
public static var touchDown: UIControl.Event { get }
...

可以通过添加sendActions方式解决上面提到的问题。

override func touchesBegan(_ touches: Set<UITouch>, with event: UIEvent?)
{
    textFiled.text = "王牌"
    textFiled.sendActions(for: .allEditingEvents)
}

这样就会调用到RXSwift里面的回调函数打印"王牌"。

输入来了 Optional("王牌")
TextView

RXSwift确认TextView的值是否发生了改变是通过通知机制来实现的,与TextField实现机制不同。

extension Reactive where Base: UITextView
{
    // This project uses text storage notifications because
    let textChanged = textView?.textStorage
        .rx.didProcessEditingRangeChangeInLength
}

疑问二:为什么多调用了2次
textFiled.rx.text.subscribe(onNext: { (text) in
    print("输入来了 \(text)")
})

输出结果为:

输入来了 Optional("")
输入来了 Optional("")
输入来了 Optional("1")
输入来了 Optional("11")
输入来了 Optional("111")

解答疑问二
extension Reactive where Base: UIControl
{
    internal func controlPropertyWithDefaultEvents<T>
    (
        editingEvents: UIControl.Event = [.allEditingEvents, .valueChanged],
    )
}

会发生提前调用两次的情况,第一次是因为我们通过textFiled.rx.text调用了Observable.create初始化了一个新序列,这个序列针对.subscribe(onNext: { }订阅后会回到创建序列的闭包中,即下图中Observable.create后面{}的部分,在这个部分中调用了observer.on(.next(getter(control))),其经过一系列的步骤最终又回到了.subscribe(onNext: { text in print("输入来了 \(text)") })订阅的闭包里进行打印,由于此时进行的是初始化序列并没有输入文本text,所以打印Optional("")

第二次是因为当我们点击到文本框开始准备输入的时候就触发了editingEvents事件,所以会打印输出,至于为什么会触发editingEvents事件,是因为beginEditing也属于allEditingEvents

通常我们让控件使用以下方式通过调用skip(1)来规避不必要的那次调用。为什么是1而不是2 ?那是因为初始化那次调用是不可避免的。

passionToLearnStepper.rx.value
    .skip(1)
    .subscribe(onNext: { (value) in
        ...
    })
    .disposed(by: disposeBag)

3、map映射

a、使用方式

源序列提供的是响应,映射后的序列才真正进行订阅。

let subject = PublishSubject<Any>()
subject
    // 源序列提供响应
    .map{ return $0 }
    // 映射后的序列进行订阅
    .subscribe(onNext: { item in print(item) })
b、init方法

因为Map继承自Producer,所以映射后产生的是新序列。

final private class Map<SourceType, ResultType>: Producer<ResultType>
{
    init(source: Observable<SourceType>, transform: @escaping Transform)
    {
        // 保存源序列
        self._source = source
        // 保存映射后的序列
        self._transform = transform
    }
}
c、run方法

MapSink中处理事务。在源序列中响应事务的处理。

final private class Map<SourceType, ResultType>: Producer<ResultType>
{
    override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == ResultType
    {
        // 在MapSink中处理事务
        let sink = MapSink(transform: self._transform, observer: observer, cancel: cancel)
        // 在源序列中响应事务的处理
        let subscription = self._source.subscribe(sink)
        return (sink: sink, subscription: subscription)
    }
}

4、Scheduler调度者

RxSwift 的世界如果看得简单一点,那么只有四个东西:可观察序列(Observable)、观察者 (Observer)、调度者(Scheduler)、销毁者(Dispose)。RxSwift 的狗屁研发者主要针对 GCD 进行了一套 scheduler 封装。

a、RXSwift内部处理了自动切换回主线程问题
❶ 使用时自动回到主队列
DispatchQueue.global().async
{
    print("请求数据")
    self.actionButton.rx.tap
        .subscribe(onNext: { () in
            print("点击了按钮,当前线程为:\(Thread.current)")
        })
        .disposed(by: self.disposeBag)
}

输出结果为:

请求数据
点击了按钮,当前线程为:<NSThread: 0x600002fe49c0>{number = 1, name = main}

可以看到我们虽然在子线程请求的数据,但是经过了RXSwift的处理,返回数据的时候又自动回到了主线程。

❷ 自动回到主队列的源码解析

subscribe进行订阅的时候自动回到主队列,因为按钮属于控件,其tap会来到控件事件的处理。在初始化方法中将事件的处理放在了主线程上。

public struct ControlEvent<PropertyType> : ControlEventType
{
    public init<Ev: ObservableType>(events: Ev) where Ev.Element == Element
    {
        self.events = events.subscribe(on: ConcurrentMainScheduler.instance)
    }
}

最终还是调用了系统的DispatchQueue.main方法。

public final class ConcurrentMainScheduler : SchedulerType
{
    private init(mainScheduler: MainScheduler)
    {
        self.mainScheduler = mainScheduler
    }
}
public final class MainScheduler : SerialDispatchQueueScheduler
{
    public init()
    {
        self.mainQueue = DispatchQueue.main
        super.init(serialQueue: self.mainQueue)
    }
    public static let instance = MainScheduler()
}

b、RXSwift中的当前线程 CurrentThreadScheduler
public class CurrentThreadScheduler : ImmediateSchedulerType
{
}
❶ 提供给外界获取,用于判断当前队列是否被关联
public static private(set) var isScheduleRequired: Bool
❷ 利用对队列的set和get方法的观察,将当前队列与静态字符串进行绑定
static var queue : ScheduleQueue?
{
    get
    {
        return Thread.getThreadLocalStorageValueForKey(CurrentThreadSchedulerQueueKey.instance)
    }
    set
    {
        Thread.setThreadLocalStorageValue(newValue, forKey: CurrentThreadSchedulerQueueKey.instance)
    }
}
❸ 在对线程的拓展中实现了队列与静态字符串进行绑定的方法
extension Thread
{
    static func setThreadLocalStorageValue<T: AnyObject>(_ value: T?, forKey key: NSCopying)
    {
        let currentThread = Thread.current
        let threadDictionary = currentThread.threadDictionary

        if let newValue = value
        {
            threadDictionary[key] = newValue
        }
        else
        {
            threadDictionary[key] = nil
        }
    }

    static func getThreadLocalStorageValueForKey<T>(_ key: NSCopying) -> T?
    {
        let currentThread = Thread.current
        let threadDictionary = currentThread.threadDictionary
        
        return threadDictionary[key] as? T
    }
}
c、RXSwift中的串行队列和并发队列
串行队列

封装了GCD的串行队列,如果我们需要执行串行任务就可以切换到这个调度者中执行。主队列就继承自串行队列。初始化掉用系统的API创建了一个串行队列,leeway指的是延迟计时器的时间。

public class SerialDispatchQueueScheduler : SchedulerType
{
    public convenience init(internalSerialQueueName: String, serialQueueConfiguration: ((DispatchQueue) -> Void)? = nil, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0))
    {
        let queue = DispatchQueue(label: internalSerialQueueName, attributes: [])
        serialQueueConfiguration?(queue)
        self.init(serialQueue: queue, leeway: leeway)
    }
}
并发队列

封装了GCD的并发队列,如果我们需要执行并发任务就可以切换到这个调度者中执行。

public class ConcurrentDispatchQueueScheduler: SchedulerType
{
    public convenience init(qos: DispatchQoS, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0))
    {
        self.init(queue: DispatchQueue(
            label: "rxswift.queue.\(qos)",
            qos: qos,
            attributes: [DispatchQueue.Attributes.concurrent],
            target: nil),
            leeway: leeway
        )
    }
}

5、既是序列也是观察者的Subject

a、Subject既是Observer,也是Observable

从源码中可以看到,SubjectType协议继承了ObservableType协议,因此具有序列特性,同时也关联到了观察者协议,具备这个类型的能力。

public final class PublishSubject<Element>: SubjectType
    
public protocol SubjectType : ObservableType
{
    associatedtype SubjectObserverType : ObserverType
}
b、订阅响应流程
public override func subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
{
    self.lock.performLocked { self.synchronized_subscribe(observer) }
}

通过一个集合添加进去所有的订阅事件,也就是创建observers保存我们所有的订阅者,这里的订阅者可以理解为闭包。在合适的地方再遍历observers让所有订阅者发出响应,即一次性将闭包全部执行。

func synchronized_subscribe<Observer: ObserverType>(_ observer: Observer) -> Disposable where Observer.Element == Element
{
    let key = self.observers.insert(observer.on)

    return SubscriptionDisposable(owner: self, key: key)
}
c、发送信号流程

调用了dispatch函数,传了两个参数:self.synchronized_on(event)event

public final class BehaviorSubject
{
    public func on(_ event: Event<Element>) 
    {
        dispatch(self.synchronized_on(event), event)
    }
}

拿到外界封装的闭包进行调用。

func dispatch<Element>(_ bag: Bag<(Event<Element>) -> Void>, _ event: Event<Element>)
{
    if let dictionary = bag._dictionary
    {
        for element in dictionary.values
        {
            element(event)
        }
    }
}

6、Dispose销毁者

a、Dispose的使用
❶ 使用Dispose停止计时器
func disposeIntervalObservable()
{
    self.intervalObservable = Observable<Int>.interval(.seconds(1), scheduler: MainScheduler.init())
    
    let dispose = self.intervalObservable.subscribe(onNext:
    { (num) in
        self.titleLabel.text = String(num)
    })
    
    _ = self.stopAction.rx.tap.subscribe(onNext: {
        print("停止计时")
        dispose.dispose()
    })
}

输出结果为:

停止计时
❷ 核心流程里手动调用dispose.dispose()
func disposeLimitObservable()
{
    // 创建序列
    let observable = Observable<Any>.create
    { (observer) -> Disposable in
        observer.onNext("谢佳培")
        return Disposables.create { print("销毁释放了")}
    }
    
    // 订阅信号
    let dispose = observable.subscribe(onNext: { (anything) in
        print("收到的内容为:\(anything)")
    }, onError: { (error) in
        print("错误信息:\(error)")
    }, onCompleted: {
        print("完成了")
    }) {
        print("销毁回调")
    }
    
    print("执行完毕")
    dispose.dispose()
}

输出结果为:

收到的内容为:谢佳培
执行完毕
销毁释放了
销毁回调
❸ 核心流程里发送完成或者错误信息自动调用dispose.dispose()
func disposeLimitObservable()
{
    // 创建序列
    let observable = Observable<Any>.create
    { (observer) -> Disposable in
        observer.onNext("谢佳培")
        observer.onCompleted()
        //observer.onError(...)
        // 在完成后就已经销毁了序列,不会再发送下面的信号了
        observer.onNext("王小清")
        return Disposables.create { print("销毁释放了")}
    }
    
    // 订阅信号
    let dispose = observable.subscribe(onNext: { (anything) in
        print("收到的内容为:\(anything)")
    }, onError: { (error) in
        print("错误信息:\(error)")
    }, onCompleted: {
        print("完成了")
    }) {
        print("销毁回调")
    }
    
    print("执行完毕")
    //dispose.dispose()
}

输出结果为:

收到的内容为:谢佳培
执行完毕
销毁释放了
销毁回调

b、Dispose的create方法
❶ 创建了一个匿名的可销毁者
extension Disposables
{
    public static func create(with dispose: @escaping () -> Void) -> Cancelable
    {
        AnonymousDisposable(disposeAction: dispose)
    }
}
❷ AnonymousDisposable
private final class AnonymousDisposable : DisposeBase, Cancelable
{
    private init(_ disposeAction: @escaping DisposeAction)
    {
        // 保存了disposeAction闭包,就是外界传入的{ print("销毁释放了")}
        self.disposeAction = disposeAction
    }

    // 实现了Disposable协议里最重要的方法dispose
    fileprivate func dispose()
    {
        // 判断是否已经销毁过(保证只会销毁一次)
        if fetchOr(self.disposed, 1) == 0
        {
            // 如果没有销毁过就执行销毁闭包{ print("销毁释放了")}
            if let action = self.disposeAction
            {
                self.disposeAction = nil
                action()
            }
        }
    }
}
❸ DisposeBase

AnonymousDisposable继承了DisposeBase,实现内存的引用计数。

public class DisposeBase
{
    init()
    {
#if TRACE_RESOURCES
    _ = Resources.incrementTotal()
#endif
    }
    
    deinit
    {
#if TRACE_RESOURCES
    _ = Resources.decrementTotal()
#endif
    }
}
❹ Disposable

AnonymousDisposable实现了Disposable协议里最重要的方法dispose

public protocol Cancelable : Disposable
{
    // 是否销毁了
    var isDisposed: Bool { get }
}

public protocol Disposable
{
    func dispose()
}

c、发送完成或者错误信息自动调用dispose.dispose()
extension ObservableType
{
    public func subscribe(onNext: ((Element) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
        -> Disposable
    {
        case .error(let error):
            if let onError = onError
            ....
            disposable.dispose()
        case .completed:
            onCompleted?()
            disposable.dispose()
        }
    }
}

d、销毁的内容
func dispose()
{
    private var sink: Disposable?
    private var subscription: Disposable?
    ...
    self.sink = nil
    self.subscription = nil
}

e、垃圾袋DisposeBag

垃圾袋就是个集合,将需要销毁的内容收集在一起,之后一次性进行销毁。所以如果我们新创建了一个DisposeBag(),那么就相当于丢弃了之前的垃圾袋,里面的内容会全部销毁,这就是为什么我们在一个类中只创建了一个垃圾袋let disposeBag = DisposeBag(),使用的时候是.disposed(by: self.disposeBag),而不是每次都新创建垃圾袋.disposed(by: DisposeBag())

public final class DisposeBag: DisposeBase
{
    // 数组
    private var disposables = [Disposable]()

    // 插入
    private func _insert(_ disposable: Disposable)
    {
        self.disposables.append(disposable)
    }

    // 移除
    private func dispose()
    {
        let oldDisposables = self._dispose()

        for disposable in oldDisposables
        {
            disposable.dispose()
        }
    }
}

Demo

Demo在我的Github上,欢迎下载。
SourceCodeAnalysisDemo

参考文献

相关文章

网友评论

      本文标题:RxSwift 源码解析

      本文链接:https://www.haomeiwen.com/subject/xqviaktx.html