美文网首页
RxSwift源码解析

RxSwift源码解析

作者: 纯情_小火鸡 | 来源:发表于2019-06-19 15:53 被阅读0次

    1. ReactiveCompatible

    比如当我们输入label.rx时,实际上是因为NSObject遵从了ReactiveCompatible协议添加了命名空间:

    extension NSObject: ReactiveCompatible {}
    
    public protocol ReactiveCompatible {
        associatedtype CompatibleType
    
        /// 元类型,此处为Reactive泛型并且用关联类型进行约束
        static var rx: Reactive<CompatibleType>.Type { get set }
        var rx: Reactive<CompatibleType> { get set }
    }
    
    extension ReactiveCompatible {
        //接口中使用的类型就是实现这个接口本身的类型的话,需要使用 Self 进行指代
      public static var rx: Reactive<Self>.Type {
            get {
                return Reactive<Self>.self
            }
            set {
                // this enables using Reactive to "mutate" base type
            }
        }
    
        /// Reactive extensions.
        public var rx: Reactive<Self> {
            get {
                return Reactive(self)
            }
            set {
                // this enables using Reactive to "mutate" base object
            }
        }
    }
    
    //上述调用即label会传入base
    public struct Reactive<Base> {
        public let base: Base
        public init(_ base: Base) {
            self.base = base
        }
    }
    

    2. ObservableConvertibleType

    定义了一个ObservableConvertibleType协议,表示可以转换为可观察序列类型。其中E表示序列元素的别名,asObservable方法是将self转换为Observable 序列。

    public protocol ObservableConvertibleType {
            //声明关联类型
        associatedtype E        
        func asObservable() -> Observable<E>
    }
    

    说明:在定义协议时,可以用associatedtype声明一个或多个类型作为协议定义的一部分,叫关联类型。关联类型为协议中的某个类型提供了一个占位名(或者说别名),其代表的实际类型在协议被采纳时才会被指定。

    例如:

    protocol TableViewCell {
        associatedtype T
        func updateCell(_ data: T)
    }
    
    class MyTableViewCell: UITableViewCell, TableViewCell {
        typealias T = Model
        func updateCell(_ data: Model) {
            // do something ...
        }
    }
    

    3. Observable

    Observable为遵从ObservableType协议的类,它表示一个push样式序列,可以让订阅“观察者”接收此序列的事件。而在ObservableType协议中,默认实现了asObservable()方法,以及定义了subscribe方法。该方法中参数需要是遵从ObserverType协议类型。

     public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
            rxAbstractMethod()
        }
    // MARK: 该subscribe方法的形参observer必须遵从ObserverType协议,返回类型是满足Disposable协议类型,并且需要满足参数.E 类型和当前Observable类中的E泛型为同一类型
    
    //Disposable也是声明的一个协议,用来表示资源释放
    public protocol Disposable {
        func dispose()
    }
    

    4. ObserverType

    ObserverType协议的作用是支持可观察序列上的推式迭代。我理解的意思是可以按顺序观察。

    public protocol ObserverType {
        /// 观察者可以观察到的按顺序排列的元素的类型。
        associatedtype E
            // 将序列事件通知给观察者,Event是枚举类型
        func on(_ event: Event<E>)
    }
    

    其中定义了观察元素以及观察方法,并且默认实现了三种事件方法:onNext,onCompleted,onError。

    5. Event

    Event表示序列事件,是一个泛型枚举。并且遵从了系统的CustomDebugStringConvertible协议,重写了debugDescription属性让其可以在debug模式下打印。

    public enum Event<Element> {
        /// Next element is produced.
        case next(Element)
        /// Sequence terminated with an error.
        case error(Swift.Error)
        /// Sequence completed successfully.
        case completed
    }
    

    其在extension中还新增了很多属性方便获取状态和值,比如:

            //是否是`completed` 或 `error`事件
            public var isStopEvent: Bool {
            switch self {
            case .next: return false
            case .error, .completed: return true
            }
        }
    
        /// 返回`next`事件的元素
        public var element: Element? {
            if case .next(let value) = self {
                return value
            }
            return nil
        }
    

    6. ObservableType

    ObservableType是继承自ObservableConvertibleType协议,它在extension中实现了从可观察序列订阅事件处理方法:

        public func subscribe(_ on: @escaping (Event<E>) -> Void)
            -> Disposable {
                let observer = AnonymousObserver { e in
                    on(e)
                }
                return self.asObservable().subscribe(observer)
        }
    

    其在另一个subscribe方法中有使用enum Hooks的customCaptureSubscriptionCallstack扩展方法来获取自定义callstack信息。

    extension Hooks {
        public typealias CustomCaptureSubscriptionCallstack = () -> [String]
        fileprivate static let _lock = RecursiveLock()
    
        //此处重写了属性的set get方法用到 NSRecursiveLock 递归锁🔐 搭配defer保证线程安全
        public static var customCaptureSubscriptionCallstack: CustomCaptureSubscriptionCallstack {
            get {
                _lock.lock(); defer { _lock.unlock() }
                return _customCaptureSubscriptionCallstack
            }
            set {
                _lock.lock(); defer { _lock.unlock() }
                _customCaptureSubscriptionCallstack = newValue
            }
        }
    }
    

    在Rx中,打开TRACE_RESOURCES调试模式,可以使用public struct Resources查看内部Rx资源分配(可观察对象、观察者、可处理对象等),他提供了一种在开发过程中检测泄漏的简单方法。

    /// 继承自NSLock保证原子性
    final class AtomicInt: NSLock {
        fileprivate var value: Int32
        public init(_ value: Int32 = 0) {
            self.value = value
        }
    }
    
    /// 忽略返回值警告
    @discardableResult
    @inline(__always)
    func add(_ this: AtomicInt, _ value: Int32) -> Int32 {
        this.lock()
        let oldValue = this.value
        this.value += value
        this.unlock()
        return oldValue
    }
    
    /// 函数内联是一种编译器优化技术,它通过使用方法的内容替换直接调用该方法,就相当于假装该方法并不存在一样,这种做法在很大程度上优化了性能
    @inline(__always)
    func load(_ this: AtomicInt) -> Int32 {
        this.lock()
        let oldValue = this.value
        this.unlock()
        return oldValue
    }
    

    7. SchedulerType

    表示调度工作单元的对象,继承自ImmediateSchedulerType,内部包含立即执行的调度和周期调用的调度。

        ///使用递归调度模拟周期性任务
         public func schedulePeriodic<StateType>(_ state: StateType, startAfter: RxTimeInterval, period: RxTimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
            let schedule = SchedulePeriodicRecursive(scheduler: self, startAfter: startAfter, period: period, action: action, state: state)
                
            return schedule.start()
        }
        
        ///SchedulePeriodicRecursive内部会调用到如下方法
           func scheduleRecursive<State>(_ state: State, dueTime: RxTimeInterval, action: @escaping (State, AnyRecursiveScheduler<State>) -> Void) -> Disposable {
            let scheduler = AnyRecursiveScheduler(scheduler: self, action: action)
             
            scheduler.schedule(state, dueTime: dueTime)
                
            return Disposables.create(with: scheduler.dispose)
        }
    

    8. DisposeBag

    线程安全袋:在deinit方法中将添加的disposes释放。其内部持有一个Disposable数组,当调用disposed(by bag: DisposeBag)添加到一个bag中,实际上是insert( disposable: Disposable)到该数组中,对应的在deinit方法中会remove元素。

    public final class DisposeBag: DisposeBase {
        private var _lock = SpinLock()
       fileprivate var _disposables = [Disposable]()
        fileprivate var _isDisposed = false
        
            private func _insert(_ disposable: Disposable) -> Disposable? {
            self._lock.lock(); defer { self._lock.unlock() }
            if self._isDisposed {
                return disposable
            }
            self._disposables.append(disposable)
            return nil
        }
    }
    

    此处以PublishSubject为例:

    当其subscribe时,会调用self._synchronized_subscribe(observer)方法,该方法会往bag中insert该AnyObserver<Element>对象并生成一个对应的BagKey,BagKey实际上内部持有一个UInt64类型的rawValue作为唯一标识。拿到key之后和当前PublishSubject对象用于创建一个SubscriptionDisposable对象,这样 _key 和 _owner就绑定了。

    紧接着是释放,系统会调用了DisposeBag的deinit方法,除了释放数组元素,self._owner?.synchronizedUnsubscribe(self._key) 会根据key移除相应的观察者:

    func _synchronized_unsubscribe(_ disposeKey: DisposeKey) {
            _ = self._observers.removeKey(disposeKey)
        }
    

    8. PrimitiveSequence

    其为遵从ObservableConvertibleType和PrimitiveSequenceType协议的泛型结构体,内部持有一个Observable<Element>属性,作为基类内部在extension中实现了deferred,delay,observeOn等方法。

    public struct PrimitiveSequence<Trait, Element> {
        let source: Observable<Element>
    
        init(raw: Observable<Element>) {
            self.source = raw
        }
    }
    extension PrimitiveSequence: ObservableConvertibleType {
            //asObservable就是返回source
        public func asObservable() -> Observable<E> {
            return self.source
        }
    }
    

    9. Single

    SingleObservable 的另外一个版本。不像 Observable 可以发出多个元素,它要么只能发出一个元素,要么产生一个 error 事件。

    public typealias Single<Element> = PrimitiveSequence<SingleTrait, Element>
    
    //对应到SingleEvent中
    public enum SingleEvent<Element> {
    ///只生成一个序列元素。(底层可观察序列发出:' .next(Element) ', ' .completed ')    case success(Element)
    ///序列以错误结束。(底层可观察序列发出:' .error(Error) ')  
        case error(Swift.Error)
    }
    
    //从指定的订阅方法实现创建可观察序列
    //(SingleEvent<ElementType>) -> Void作为SingleObserver参数,实际上Maybe和Single以及Completable在此处的区别就是处理不同的case
    public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single<ElementType> {
            let source = Observable<ElementType>.create { observer in
                return subscribe { event in
                    switch event {
                    case .success(let element):
                        observer.on(.next(element))
                        observer.on(.completed)
                    case .error(let error):
                        observer.on(.error(error))
                    }
                }
            }
            return PrimitiveSequence(raw: source)
        }
    
    //订阅“观察者”接收此序列的事件
    public func subscribe(_ observer: @escaping (SingleEvent<ElementType>) -> Void) -> Disposable {
            var stopped = false
                //拿到source中当前的SingleEvent
            return self.primitiveSequence.asObservable().subscribe { event in
                if stopped { return }
                stopped = true
                
                switch event {
                case .next(let element):
                    observer(.success(element))
                case .error(let error):
                    observer(.error(error))
                case .completed:
                    rxFatalErrorInDebug("Singles can't emit a completion event")
                }
            }
        }
    

    10. BehaviorSubject

    既是可观察序列又是观察者的对象,每项通知会广播予所有已订阅的观察者。

    Observable 先创建一个 AnonymousObserver,将事件处理方法设置给它的 eventHandler 属性。所有的 Observable 订阅,都会进行这样的方法。在 create 中,由于继承关系调用的是 Producersubscribe;而 BehaviorSubject 中也实现了自己的 subscribe 方法,BehaviorSubject 的订阅私有部分做的是将刚创建的 AnonymousObserver 保存起来,然后以当前 value 值作为事件值,发出一个事件。

    public final class PublishSubject<Element>
        //发送
        public func on(_ event: Event<Element>) {
            dispatch(self._synchronized_on(event), event)
        }
    
        func _synchronized_on(_ event: Event<E>) -> Observers {
            self._lock.lock(); defer { self._lock.unlock() }
            switch event {
            case .next:
                if self._isDisposed || self._stopped {
                    return Observers()
                }
                
                return self._observers
            case .completed, .error:
                if self._stoppedEvent == nil {
                    self._stoppedEvent = event
                    self._stopped = true
                    let observers = self._observers
                    self._observers.removeAll()
                    return observers
                }
    
                return Observers()
            }
        }
    
            //订阅
            public override func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == Element {
            self._lock.lock()
            let subscription = self._synchronized_subscribe(observer)
            self._lock.unlock()
            return subscription
        }
    
        func _synchronized_subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
            if let stoppedEvent = self._stoppedEvent {
                observer.on(stoppedEvent)
                return Disposables.create()
            }
            if self._isDisposed {
                observer.on(.error(RxError.disposed(object: self)))
                return Disposables.create()
            }
            //如果保存成功,则将该观察者以及对应的BagKey生成SubscriptionDisposable以便后续释放
            let key = self._observers.insert(observer.on)
            return SubscriptionDisposable(owner: self, key: key)
        }
    
            //调用DisposeBag的deinit
        func synchronizedUnsubscribe(_ disposeKey: DisposeKey) {
            self._lock.lock()
            self._synchronized_unsubscribe(disposeKey)
            self._lock.unlock()
        }
    
        func _synchronized_unsubscribe(_ disposeKey: DisposeKey) {
            _ = self._observers.removeKey(disposeKey)
        }
    }
    

    11. MainScheduler

    MainScheduler 代表主线程。如果你需要执行一些和 UI 相关的任务,就需要切换到该 Scheduler 运行。

    其继承自SerialDispatchQueueScheduler,SerialDispatchQueueScheduler 抽象了串行 DispatchQueue。如果你需要执行一些串行任务,可以切换到这个 Scheduler 运行。

    public class SerialDispatchQueueScheduler : SchedulerType {
                //包含DispatchQueue和DispatchTimeInterval延时
            let configuration: DispatchQueueConfiguration
        
            init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
            self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway)
        }
            //调用self.queue.async {cancel.setDisposable(action(state))}  
          func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
            return self.configuration.schedule(state, action: action)
        }
    }
    
    public final class MainScheduler : SerialDispatchQueueScheduler {
            public static let instance = MainScheduler()
    
                //调度要立即执行的操作
            override func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
                //原子属性+1 相当于信号量
            let previousNumberEnqueued = increment(self.numberEnqueued)
                    
                    //DispatchQueue.getSpecific(key: token) != nil 设置标识判断是否在主线程
            if DispatchQueue.isMain && previousNumberEnqueued == 0 {
                let disposable = action(state)
                //原子属性-1
                decrement(self.numberEnqueued)
                return disposable
            }
    
            let cancel = SingleAssignmentDisposable()
    
            self._mainQueue.async {
                if !cancel.isDisposed {
                    _ = action(state)
                }
    
                decrement(self.numberEnqueued)
            }
    
            return cancel
        }
    }
    

    12. Amb

    在多个源 Observables 中, 取第一个发出元素或产生事件的 Observable,然后只发出它的元素

    //比如如下序列
    Observable<Int>.amb([a, b]).subscribe{}.disposed(by: disposeBag)
    
    public static func amb<S: Sequence>(_ sequence: S) -> Observable<E>
        where S.Iterator.Element == Observable<E> {
        //使用reduce作用一个不终止的可观察序列,可用于表示无限持续时间
            return sequence.reduce(Observable<S.Iterator.Element.E>.never()) { a, o in
                return a.amb(o.asObservable())
            }
    }
    
    //该方法会生成一个Amb<Element>: Producer<Element>类,内部持有当前这两个Observable,当调用subscribe方法时调用run<O : ObserverType>(_ observer: O, cancel: Cancelable)
    public func amb<O2: ObservableType>
        (_ right: O2)
        -> Observable<E> where O2.E == E {
        return Amb(left: self.asObservable(), right: right.asObservable())
    }
    
    //该方法生成AmbSink<O: ObserverType>: Sink<O>对象并执行run方法,内部在状态为self._choice == me时执行self.forwardOn(event)
    override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
        let sink = AmbSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
    }
    

    13. buffer

    buffer 操作符将缓存 Observable 中发出的新元素,当元素达到某个数量,或者经过了特定的时间,它就会将这个元素集合发送出来。

      let subject = PublishSubject<String>()
      subject
      .buffer(timeSpan: 1, count: 3, scheduler: MainScheduler.instance)
      .subscribe(onNext: { print($0) })
      .disposed(by: disposeBag)
    
      subject.onNext("a")
      subject.onNext("b")
      subject.onNext("c")
      subject.onNext("d")
    
    
      //调用buffer方法会生成一个BufferTimeCount对象,把对应的缓存时间,缓存个数,调度以及当前Observable保存;当subscribe时,调用run方法生成BufferTimeCountSink
      final private class BufferTimeCount<Element>: Producer<[Element]> {
    
        fileprivate let _timeSpan: RxTimeInterval
        fileprivate let _count: Int
        fileprivate let _scheduler: SchedulerType
        fileprivate let _source: Observable<Element>
    
      override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == [Element] {
        let sink = BufferTimeCountSink(parent: self, observer: observer, cancel: cancel)
        let subscription = sink.run()
        return (sink: sink, subscription: subscription)
        }
      }
    
      //当调用onNext发送元素时调用
      func on(_ event: Event<E>) {
        self.synchronizedOn(event)
      }
    
      func _synchronized_on(_ event: Event<E>) {
        switch event {
        case .next(let element):
            //元素添加到buffer数组中,并且当满足缓存个数时,发送
            self._buffer.append(element)
            if self._buffer.count == self._parent._count {
                self.startNewWindowAndSendCurrentOne()
            }
    
        case .error(let error):
            self._buffer = []
            self.forwardOn(.error(error))
            self.dispose()
        case .completed:
            self.forwardOn(.next(self._buffer))
            self.forwardOn(.completed)
            self.dispose()
        }
      }
    
      //当执行run方法时调用
      func createTimer(_ windowID: Int) {
        //DisposeBase子类
        let nextTimer = SingleAssignmentDisposable()        
        self._timerD.disposable = nextTimer
        //调度之后定时器执行
        let disposable = self._parent._scheduler.scheduleRelative(windowID, dueTime: self._parent._timeSpan) { previousWindowID in
            self._lock.performLocked {
                //当前窗口与回调的滑动窗口id不同则返回
                if previousWindowID != self._windowID {
                    return
                }
                //窗口id+1,调用self.forwardOn(.next(buffer))
                self.startNewWindowAndSendCurrentOne()
            }
    
            return Disposables.create()
        }
    
        nextTimer.setDisposable(disposable)
      }
    

    14. RxCocoa — UINavigationController+Rx

    extension Reactive where Base: UINavigationController {
        public typealias ShowEvent = (viewController: UIViewController, animated: Bool)
    
        /// Reactive wrapper for `delegate`.
        ///
        /// For more information take a look at `DelegateProxyType` protocol documentation.
        public var delegate: DelegateProxy<UINavigationController, UINavigationControllerDelegate> {
            //调用DelegateProxyType的proxy(for object: ParentObject)方法,如下
            return RxNavigationControllerDelegateProxy.proxy(for: base)
        }
    
        /// Reactive wrapper for delegate method `navigationController(:willShow:animated:)`.
        public var willShow: ControlEvent<ShowEvent> {
            let source: Observable<ShowEvent> = delegate
                .methodInvoked(#selector(UINavigationControllerDelegate.navigationController(_:willShow:animated:)))
                .map { arg in
                    //arg为[Any]
                    let viewController = try castOrThrow(UIViewController.self, arg[1])
                    let animated = try castOrThrow(Bool.self, arg[2])
                    return (viewController, animated)
            }
            return ControlEvent(events: source)
        }
    }
    
    public static func proxy(for object: ParentObject) -> Self {
        //是否Thread.isMainThread
        MainScheduler.ensureRunningOnMainThread()
            //通过objc_getAssociatedObject获取指定控件(object)的 DelegateProxy 的实例
        let maybeProxy = self.assignedProxy(for: object)
    
        let proxy: AnyObject
        if let existingProxy = maybeProxy {
            proxy = existingProxy
        }
        else {
                //不存在即创建
            proxy = castOrFatalError(self.createProxy(for: object))
            //通过objc_setAssociatedObject绑定
            self.assignProxy(proxy, toObject: object)
            assert(self.assignedProxy(for: object) === proxy)
        }
        //拿到自身的object.delegate
        let currentDelegate = self._currentDelegate(for: object)
        let delegateProxy: Self = castOrFatalError(proxy)
            //是否已经设置
        if currentDelegate !== delegateProxy {
            //设置接收所有转发消息的delegate的引用
            delegateProxy._setForwardToDelegate(currentDelegate, retainDelegate: false)
            assert(delegateProxy._forwardToDelegate() === currentDelegate)
            //将 proxy 设置为代理对象
            self._setCurrentDelegate(proxy, to: object)
            assert(self._currentDelegate(for: object) === proxy)
            assert(delegateProxy._forwardToDelegate() === currentDelegate)
        }
    
        return delegateProxy
    }
    
    //返回调用的delegate方法的可观察序列。方法被调用后,元素才被发送。
    open func methodInvoked(_ selector: Selector) -> Observable<[Any]> {
        MainScheduler.ensureRunningOnMainThread()
            //这个属性是一个字典,以selector为键,value为MessageDispatcher对象,其内部持有PublishSubject和Observable
        let subject = self._methodInvokedForSelector[selector]
            //如果存在 subject 就说明,已经创建了 selector 对应的 Observable 对象了,直接返回即可
        if let subject = subject {
            return subject.asObservable()
        }
        else {
            //如果没有,那么就创建一个 PublishSubject,并且将其存入 methodInvokedForSelecotr 字典中去,最后返回这个 subject
            let subject = MessageDispatcher(selector: selector, delegateProxy: self)
            self._methodInvokedForSelector[selector] = subject
            return subject.asObservable()
        }
    }
    
    //_RXDelegateProxy.m
    //我们将 DelegateProxy 设置为代理类,但是实现代理方法。所以系统会执行消息转发的方法
    -(void)forwardInvocation:(NSInvocation *)anInvocation {
        //判断当前方法是否有返回值,因为订阅 Observable 的处理方法不会有返回值
        BOOL isVoid = RX_is_method_signature_void(anInvocation.methodSignature);
        NSArray *arguments = nil;
        if (isVoid) {
            //真正的代理方法执行前调用_sentMessageForSelector[selector]
            arguments = RX_extract_arguments(anInvocation);
            [self _sentMessage:anInvocation.selector withArguments:arguments];
        }
        //检查原本的代理对象有没有实现这个 selector,如果有,那么执行,保证原本代理方法的执行
        if (self._forwardToDelegate && [self._forwardToDelegate respondsToSelector:anInvocation.selector]) {
            [anInvocation invokeWithTarget:self._forwardToDelegate];
        }
    
        if (isVoid) {
            //真正的代理方法执行后调用_sentMessageForSelector[selector]
            [self _methodInvoked:anInvocation.selector withArguments:arguments];
        }
    }
    

    相关文章

      网友评论

          本文标题:RxSwift源码解析

          本文链接:https://www.haomeiwen.com/subject/fjukqctx.html