美文网首页swift原理篇程序员
【领略RxSwift源码】- 订阅的工作流(Subscribin

【领略RxSwift源码】- 订阅的工作流(Subscribin

作者: Maru | 来源:发表于2017-04-04 18:57 被阅读1311次
    RxSwift Logo

    开篇

    一直觉得自己似乎越来越浮躁了,可能当代的大多数年轻人都活在恐慌里,问题已经从小时候的不思进取变成了“太思进取”,似乎总是想不管什么投入都能立竿见影。但是很多时候总是事与愿违,所以这次我想试着静下心来,读一读RxSwift的源码,希望不要Give up halfway吧。

    关于RxSwift,作为ReactiveX的成员框架之一,它有着血脉相承的语法,语言和哲学上的一致性使得即便之后转向其他的平台我们也能很快的上手Rx。其次,通过阅读源码,我们也可以看到,其实有的时候大神的代码也没有那么的CleanPrefect ,我们也可以在一些地方看到妥协和失误。

    本篇目标

    本篇的目标就是了解下面这段代码(来自Rx.playgroud)的实现:

    example("just") {
        let disposeBag = DisposeBag()
        
        Observable.just("🔴")
            .subscribe { event in
                print(event)
            }
            .disposed(by: disposeBag)
    }
    

    这寥寥数行代码,我想但凡RxSwift入门的同学都知道它的用处:创建一个单值的可观察序列,并且打印出它的所有序列。恩...没毛病,但是本篇想要知道的是:

    1. 序列是如何创建的?它的结构是怎么样的?
    2. 序列是如何被观察者订阅的?

    so...let's go !

    众所周知,自Swift诞生以来,苹果爸爸就一直在推崇面向协议编程(POP) ,而RxSwift也是同样的,遵循了从一个协议开始,而不是从一个类开始。但是我并不想从协议讲起,因为虽然从协议讲起最具逻辑性,但是从文章的角度来说并不好理解和阅读。所以本文将以示例代码为切入点,自上而下的阅读,以求简单清晰易懂。

    0x01 - Observable

    -> Observable

    在开篇的示例代码中,首先映入我们眼帘的是ObservableObservable调用了just方法。其实Observable是一个遵守ObservableType的类,实现代码如下:

    
    public class Observable<Element> : ObservableType {
        /// Type of elements in sequence.
        public typealias E = Element
        
        init() {
    #if TRACE_RESOURCES
            let _ = Resources.incrementTotal()
    #endif
        }
        
        public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
            rxAbstractMethod()
        }
        
        public func asObservable() -> Observable<E> {
            return self
        }
        
        deinit {
    #if TRACE_RESOURCES
            let _ = Resources.decrementTotal()
    #endif
        }
    
        // this is kind of ugly I know :(
        // Swift compiler reports "Not supported yet" when trying to override protocol extensions, so ¯\_(ツ)_/¯
    
        /// Optimizations for map operator
        internal func composeMap<R>(_ selector: @escaping (Element) throws -> R) -> Observable<R> {
            return Map(source: self, transform: selector)
        }
    }
    
    
    

    首先这是一个用Public修饰的抽象类,它是直接面向RxSwift使用者的。在这个类当中,使用了E的别名来充当序列值的泛型类型。在Init方法中我们可以看到一个Resource的结构体,顺便提一句,这是一个用来“追踪计数”RxSwift引用资源的,每当init一个资源计数就+1,deinit的时候就总数-1,以此来追踪全局的资源使用。

    除此之外,我们还可以看到有两个方法:

        public func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E 
        public func asObservable() -> Observable<E>
    
    

    这两个方法都在ObservableType这个协议中有所定义,前者定义了Observable能够被订阅的行为,后者则是定义了可以将自身“转换”成“Observable实体”的功能。这里可能会有一些Confuse,其实前面已经提到过,单纯的Observable类并不能作为序列被直接订阅使用,只有Observable的实体子类才可以被实例化使用。

    所以,我们也可以看到,subscribe函数的实现也只是简单的fatalError,并没有实际的逻辑操作:

    /// Swift does not implement abstract methods. This method is used as a runtime check to ensure that methods which intended to be abstract (i.e., they should be implemented in subclasses) are not called directly on the superclass.
    func rxAbstractMethod(file: StaticString = #file, line: UInt = #line) -> Swift.Never {
        rxFatalError("Abstract method", file: file, line: line)
    }
    

    -> Observable - > ObservableType

    现在我们再来看一下ObservableType:

    public protocol ObservableType : ObservableConvertibleType {
        /// Type of elements in sequence.
        associatedtype E
        func subscribe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E
    }
    
    extension ObservableType {
        
        /// Default implementation of converting `ObservableType` to `Observable`.
        public func asObservable() -> Observable<E> {
            // temporary workaround
            //return Observable.create(subscribe: self.subscribe)
            return Observable.create { o in
                return self.subscribe(o)
            }
        }
    }
    
    

    这两个方法的作用前面已经讲过,在这里我们可以看到通过Protocol ExtensionRxSwift为ObservableType提供了默认的asObservable的实现,那么ObservableConvertibleType是一个什么协议呢,是不是有它从根源上定义了asObservable方法呢?我们来看一下ObservableConvertibleType的定义:

    /// Type that can be converted to observable sequence (`Observer<E>`).
    public protocol ObservableConvertibleType {
        /// Type of elements in sequence.
        associatedtype E
    
        /// Converts `self` to `Observable` sequence.
        ///
        /// - returns: Observable sequence that represents `self`.
        func asObservable() -> Observable<E>
    }
    

    果然如此,那么至此为止,Observable之前的协议继承体系我们已经明了,画成图大概是这样的:

    Observable Protocol

    很遗憾,至此为止我们并没有看到太多的实现逻辑,但是我们看到了一系列ObservableProtocol根基。那么具体的Observable实体应该是怎么样的呢?我们从just身上来找到答案。

    Observable+Creation.swift文件中,我们找到了关于just的定义:

    Observable+Creation.swift是一个Observable的一个拓展(extension),文件中我们可以看到很多关于构建Observable实体的方法,诸如create, empty, never等等,本篇以just作为切入点,其实其他的公开的Creation函数也是类似的逻辑,所以不会一一介绍了。

        public static func just(_ element: E) -> Observable<E> {
            return Just(element: element)
        }
    
    

    我们可以看到,这是一个public修饰的暴露在外的Observable的静态方法,返回的也是Observable类型。那么这个Just是什么呢?

    final class Just<Element> : Producer<Element> {
        private let _element: Element
        
        init(element: Element) {
            _element = element
        }
        
        override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
            observer.on(.next(_element))
            observer.on(.completed)
            return Disposables.create()
        }
    }
    

    我们可以看到,这是一个继承自Producer的一个类,OK,我们先不去管这个Just,先去看看Producer是一个什么东西:

    
    class Producer<Element> : Observable<Element> {
        override init() {
            super.init()
        }
        
        override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
            if !CurrentThreadScheduler.isScheduleRequired {
                let disposer = SinkDisposer()
                let sinkAndSubscription = run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                return disposer
            }
            else {
                return CurrentThreadScheduler.instance.schedule(()) { _ in
                    let disposer = SinkDisposer()
                    let sinkAndSubscription = self.run(observer, cancel: disposer)
                    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                    return disposer
                }
            }
        }
        
        func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
            rxAbstractMethod()
        }
    }
    
    

    看到这里我们清楚了,Producer是继承自Observable的一个抽象类,结合前面的Just,于是我们的图可以画成这样了:

    Paste_Image.png

    Observable的子类Producer我们可以看到该基类实现subscribe的基础方法,这里用到了RxSwift当中的另外一个概念--Scheduler,但是它不是本文的重点,我们将在接下来的文章里面去集中讨论它,这里只是做一些简单的解读(感觉给自己埋了坑)。

    Producer中,subscribe主要做了以下几件事情:

    1. 创建一个SinkDisposer
    2. 判断是否需要Scheduler来进行切换线程的调用,如果需要那么就在指定的线程中操作。
    3. 调用run方法,将observer和刚刚创建的SinkDisposer作为入参,得到一个SinkSubscription的一个元组。这里的SinkSubscription都是遵守Disposable的类。
      4.SinkDisposer对传入之前的SinkSubscription执行setSinkAndSubscription方法。
      5.将执行完setSinkAndSubscription方法的disposer作为返回值返回。

    这里的相关操作其实都容易理解,首先看看这个run:

        func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
            rxAbstractMethod()
        }
    

    好,既然这是一个抽象方法,那么我们暂且先不去管它,今晚不是它的轮次(最近🐺杀玩多了)。除去这个方法,最让人疑惑的就是这个setSinkAndSubscription方法,那么它的作用是什么呢?

    我们先来谈一谈SinkDisposer类,但是再谈它之前我们需要先知道它所遵守的协议。SinkDisposer是一个遵守Cancelable协议的类,那么这个Cancelable是何方神圣呢?

    这一切都要从Disposable说起。

    0X02-Dispose

    Disposable

    Disposable只是一个简单的协议,其中只有一个dispose方法,定义了释放资源的统一行为。

    
    /// Respresents a disposable resource.
    public protocol Disposable {
        /// Dispose resource.
        func dispose()
    }
    
    

    OK,这个很简单Cancelable呢?

    /// Represents disposable resource with state tracking.
    public protocol Cancelable : Disposable {
        /// Was resource disposed.
        var isDisposed: Bool { get }
    }
    
    

    没错,Cancelable只是一个继承自Disposable的一个协议,其中定义了一个Bool类型的isDisposed标识,用来标识是否该序列已经被释放。

    SinkDisposer

    OK,现在我们终于来到了SinkDisposer类,先上源码:

    
    fileprivate final class SinkDisposer: Cancelable {
        fileprivate enum DisposeState: UInt32 {
            case disposed = 1
            case sinkAndSubscriptionSet = 2
        }
    
        // Jeej, swift API consistency rules
        fileprivate enum DisposeStateInt32: Int32 {
            case disposed = 1
            case sinkAndSubscriptionSet = 2
        }
        
        private var _state: AtomicInt = 0
        private var _sink: Disposable? = nil
        private var _subscription: Disposable? = nil
    
        var isDisposed: Bool {
            return AtomicFlagSet(DisposeState.disposed.rawValue, &_state)
        }
    
        func setSinkAndSubscription(sink: Disposable, subscription: Disposable) {
            _sink = sink
            _subscription = subscription
    
            let previousState = AtomicOr(DisposeState.sinkAndSubscriptionSet.rawValue, &_state)
            if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
                rxFatalError("Sink and subscription were already set")
            }
    
            if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
                sink.dispose()
                subscription.dispose()
                _sink = nil
                _subscription = nil
            }
        }
        
        func dispose() {
            let previousState = AtomicOr(DisposeState.disposed.rawValue, &_state)
    
            if (previousState & DisposeStateInt32.disposed.rawValue) != 0 {
                return
            }
    
            if (previousState & DisposeStateInt32.sinkAndSubscriptionSet.rawValue) != 0 {
                guard let sink = _sink else {
                    rxFatalError("Sink not set")
                }
                guard let subscription = _subscription else {
                    rxFatalError("Subscription not set")
                }
    
                sink.dispose()
                subscription.dispose()
    
                _sink = nil
                _subscription = nil
            }
        }
    }
    
    
    

    在这里我们看到的一些以Atomic开头的方法都是在OSAtomic.h中所定义的自动读取和更新指定值方法,详细的使用方法可以点这里的官方文档。这里使用的Atomic方法是为了区分以下几种可能性:

    case1 - 第一次执行

    1. sinksubscription赋值给自身的私有变量。
    2. 通过Atmoic方法(也就是OSAtomicOr32OrigBarrier)方法,将_state的值更新为2,并且返回值previousState为0。
    3. previousStateDisposeStateInt32. sinkAndSubscriptionSet.rawValue做逻辑运算,得值为0所以不执行if里面的逻辑。
    4. previousStateDisposeStateInt32.disposed.rawValue 做逻辑运算,的值为0,所以也不执行if里面的逻辑。
    5. 结束。

    case2 - 再次执行

    1.由于没有执行过dispose方法,所以自从第一次执行setSinkAndSubscription之后,_state的值一直为2。当执行previousStateDisposeStateInt32.disposed.rawValue的时候,的值为2,所以执行xFatalError("Sink and subscription were already set")程序中止运行。

    case3 - 先执行过dispose,然后第一次执行

    1. 由于执行过dispose 方法,所以_state的值为1。
    2. 通过Atmoic方法(也就是OSAtomicOr32OrigBarrier)方法,将_state的值更新为2,并且返回值previousState为1。
    3. previousStateDisposeStateInt32. sinkAndSubscriptionSet.rawValue做逻辑运算,得值为0所以不执行if里面的逻辑。
    4. previousStateDisposeStateInt32.disposed.rawValue 做逻辑运算,的值为1,所以执行if内的操作,将sinksubscription分别执行dispose操作,并且将两个私有变量置nil,打破引用环。

    我们可以看到,通过一个_stateOSAtomic的方法,RxSwift非常优雅的解决了上述三种场景,非常值得借鉴。而本类中的dispose方法其实也是类似的处理方法,来保证只有一次有效的dispose操作,本文就不再赘述。

    0X03 - Observer

    接下来我们来讲讲RxSwift中的另外一个角色,Observer(观察者),这次我们从观察者的基类ObserverBase谈起:

    ObserverBase是一个遵守了DisposableObserverType协议的一个抽象类,实现了ondispose。值得注意的是,在ObserverBase中有一个私有变量:

        private var _isStopped: AtomicInt = 0
    

    _isStopped是一个哨兵,用来标记所观察的序列是否已经停止了,那么什么时候需要标记为Stop呢?我们来看这段代码:

        func on(_ event: Event<E>) {
            switch event {
            case .next:
                if _isStopped == 0 {
                    onCore(event)
                }
            case .error, .completed:
                if !AtomicCompareAndSwap(0, 1, &_isStopped) {
                    return
                }
                onCore(event)
            }
        }
    

    这段代码做的事情很简单:

    1. 只要_isStopped为0,那么就允许“发射”.next事件,也就是执行onCore方法。
    2. 当第一次“发射”.error或者.completed时,执行一次onCore,并且将_isStopped设为1。

    因为所有的Observer类在事件发射的逻辑上面都相同,所以统一在ObserverBase中作了处理,这也是典型的OOP思想。老铁,没毛病~

    值得一提的是,我们可以看到这里使用了一个AtomicCompareAndSwap的方法,这个方法是做什么的呢?在Platform.Darwin.swift中,我们可以看到关于这个方法的定义:

    
        typealias AtomicInt = Int32
    
        let AtomicCompareAndSwap = OSAtomicCompareAndSwap32Barrier
        let AtomicIncrement = OSAtomicIncrement32Barrier
        let AtomicDecrement = OSAtomicDecrement32Barrier
    
    

    我们可以看到,AtomicCompareAndSwap其实就是OSAtomic库中所定义的一个全局方法:

    /*! @abstract Compare and swap for 32-bit values with barrier.
        @discussion
        This function compares the value in <code>__oldValue</code> to the value
        in the memory location referenced by <code>__theValue</code>.  If the values
        match, this function stores the value from <code>__newValue</code> into
        that memory location atomically.
    
        This function is equivalent to {@link OSAtomicCompareAndSwap32}
        except that it also introduces a barrier.
        @result Returns TRUE on a match, FALSE otherwise.
     */
    @available(iOS 2.0, *)
    @available(iOS, deprecated: 10.0, message: "Use atomic_compare_exchange_strong() from <stdatomic.h> instead")
    public func OSAtomicCompareAndSwap32Barrier(_ __oldValue: Int32, _ __newValue: Int32, _ __theValue: UnsafeMutablePointer<Int32>!) -> Bool
    

    简单的来说,该方法传入三个参数:__oldValue__newValue__theValue,前两个参数都是Int32类型的,后一个是UnsafeMutablePointer<Int32>的可变指针。当__oldValue的值和指针所指向的内存地址的变量的值相等时,返回true否则为false,于此同时,如果__newValue和当前的值不相等,那么就赋值,使得__theValue的值为新值。伪代码如下:

    f (*pointer == oldvalue) {  
        *pointer = newvalue;  
        return 1;  
    } else {  
        return 0;  
    }  
    

    番外 - Memery barrier

    为了达到最佳性能,编译器通常会对汇编基本的指令进行重新排序来尽可能保持处理器的指令流水线。作为优化的一部分,编译器有可能对访问主内存的指令,如果它认为这有可能产生不正确的数据时,将会对指令进行重新排序。不幸的是,靠编译器检测到所有可能内存依赖的操作几乎总是不太可能的。如果看似独立的变量实际上是相互影响,那么编译器优化有可能把这些变量更新位错误的顺序,导致潜在不不正确结果。

    内存屏障(memory barrier)是一个使用来确保内存操作按照正确的顺序工作的非阻塞的同步工具。内存屏障的作用就像一个栅栏,迫使处理器来完成位于障碍前面的任何加载和存储操作,才允许它执行位于屏障之后的加载和存储操作。内存屏障同样使用来确保一个线程(但对另外一个线程可见)的内存操作总是按照预定的顺序完成。如果在这些地方缺少内存屏障有可能让其他线程看到看似不可能的结果。为了使用一个内存屏障,你只要在你代码里面需要的地方简单的调用OSMemoryBarrier函数。

    匿名观察者

    看完了ObserverBase现在我们来看一下AnonymousObserver:

    final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
        typealias Element = ElementType
        
        typealias EventHandler = (Event<Element>) -> Void
        
        private let _eventHandler : EventHandler
        
        init(_ eventHandler: @escaping EventHandler) {
    #if TRACE_RESOURCES
            let _ = Resources.incrementTotal()
    #endif
            _eventHandler = eventHandler
        }
    
        override func onCore(_ event: Event<Element>) {
            return _eventHandler(event)
        }
        
    #if TRACE_RESOURCES
        deinit {
            let _ = Resources.decrementTotal()
        }
    #endif
    }
    
    

    我们可以看到,在这个匿名观察者中,它主要做的事情就是将基类ObserverBase所没有实现的onCore方法实现了,将观察者构造方法时传入的EventHandleronCore方法中执行。这也就是观察者受到序列事件的动作。

    订阅过程

    在我们对ObservableObserverDisposeable有了一定的认知之后,我们可以来认识一下最为关键的一步,subscribe也就是订阅。

    ObservableType+Extensions.swift中我们可以看到相关的实现:

    
        /**
        Subscribes an event handler to an observable sequence.
    
        - parameter on: Action to invoke for each event in the observable sequence.
        - returns: Subscription object used to unsubscribe from the observable sequence.
        */
        public func subscribe(_ on: @escaping (Event<E>) -> Void)
            -> Disposable {
            let observer = AnonymousObserver { e in
                on(e)
            }
            return self.subscribeSafe(observer)
        }
    
    

    所谓的subscribe其实只是做了两个事情。首先是构造了一个匿名观察者,将on也就是(Event<E>) -> Void类型的闭包作为参数,每次在匿名观察者有新的事件的时候调用,这里也用到了尾随闭包的语法糖,提高阅读性。其次,将刚刚构造的匿名观察者,通过subscribeSafe函数来完成订阅。那么subscribeSafe究竟做了一些什么事情呢?

    extension ObservableType {
        func subscribeSafe<O: ObserverType>(_ observer: O) -> Disposable where O.E == E {
            return self.asObservable().subscribe(observer)
        }
    }
    
    

    subscribeSafe是一个内部的方法,所有内部的订阅操作全部通过该方法来完成,一般最后都是通过subscribe方法的多态性来完成最终的订阅,那么回想一下之前�Justsubscribe方法我们就可以知道,一旦调用subscribe方法,Just立刻给匿名观察者发送一个包裹了初始值的.next事件和一个.completed事件,最后返回一个NopDisposable类型的“存根”,NopDisposable是一个在执行dispose操作时不进行任何操作的存根。然后整个订阅过程就结束了。

    DisposeBag

    对于开头的代码,我们现在唯一还没有讲到的就是addDisposableTo这个方法,我们都知道,当一个序列执行subscribe之后我们会得到一个遵守Disposable的存根。那么根据方法名,我们也可以猜到这个一个将存根添加到一个地方的方法,那么它是要将存根添加到哪里呢?

    没错!就是我们天天在写的DisposeBag。在DisposeBag.swift中我们可以找到该方法的定义:

    extension Disposable {
        /// Adds `self` to `bag`.
        ///
        /// - parameter bag: `DisposeBag` to add `self` to.
        public func addDisposableTo(_ bag: DisposeBag) {
            bag.insert(self)
        }
    }
    

    那么DisposeBag到底是个什么东西呢?talk is cheap,我们直接来看源码:

    public final class DisposeBag: DisposeBase {
        
        private var _lock = SpinLock()
        
        // state
        private var _disposables = [Disposable]()
        private var _isDisposed = false
        
        /// Constructs new empty dispose bag.
        public override init() {
            super.init()
        }
        
        /// Adds `disposable` to be disposed when dispose bag is being deinited.
        ///
        /// - parameter disposable: Disposable to add.
        public func insert(_ disposable: Disposable) {
            _insert(disposable)?.dispose()
        }
        
        private func _insert(_ disposable: Disposable) -> Disposable? {
            _lock.lock(); defer { _lock.unlock() }
            if _isDisposed {
                return disposable
            }
    
            _disposables.append(disposable)
    
            return nil
        }
    
        /// This is internal on purpose, take a look at `CompositeDisposable` instead.
        private func dispose() {
            let oldDisposables = _dispose()
    
            for disposable in oldDisposables {
                disposable.dispose()
            }
        }
    
        private func _dispose() -> [Disposable] {
            _lock.lock(); defer { _lock.unlock() }
    
            let disposables = _disposables
            
            _disposables.removeAll(keepingCapacity: false)
            _isDisposed = true
            
            return disposables
        }
        
        deinit {
            dispose()
        }
    }
    
    

    其实DisposeBag这个类设计的还是非常的简单明了的,暴露给外部的只有一个insert方法,将需要被管理的Dispose交给这个Bag,当该Bag执行deinit方法的时候执行dispose,将所持有的所有Disposable遍历一遍,同时挨个dispose,值得注意的是,该类内部使用了一个锁:

        private var _lock = SpinLock()
    
    

    这个SpinLock其实就是一个NSRecursiveLock的递归锁,该🔐的作用就是为了保证_disposables的数组线程安全,之所以用递归锁是因为有可能会出现在相同的线程多次调用insert的而引发死锁。

    正常情况下,执行insert方法,首先会执行加锁操作,然后Bag会将该Disposable加入到_disposables这个数组中,最后解锁。但是还有一种情况,那就是当执行insert操作的时候,该Bag已经被析构了,那么我们就不需要再将其加入数组,直接将该Disposable释放掉就可以了。

    QA

    分析了上述的源码之后,我想开篇的两个问题我们也可以大致回答了:

    1.序列是如何创建的?它的结构是怎么样的?

    其实无论是通过just方法构建的序列还是createempty或者of方法构建的序列,最终得到的都是一个Producer的子类,只是不同的方法所构建的序列行为不一样,所以要通过子类的方法重写来实现多态,这是典型的OOP思想这里也就不多解释了。至于它的结构也是取决该序列的特性,比如本文中的Just,由于它是一个单一序列,只会有第一次的初始值,所以在Just类中直接定义了一个私有的存储类型的变量来存储初始值,当完成订阅操作的时候,直接将该值通过.next事件发送出去,然后再将.completed事件发送,完成整个序列的生命周期。再比如通过never构建的EmptyProducer类,由于该序列需要做的只是永远不发送.next事件,所以EmptyProducer没有任何私有变量必要,他要做的只是在完成订阅的时候发送一个.completed事件。由于Producer的子类太多了,篇幅有限就不在这里一一列举。

    2.序列是如何被观察者订阅的?

    当一个序列构造完毕的之后,调用subscribe方法会进行SubscribeHandler,也就是进行订阅的相关操作。具体来说,对于Just这个序列,SubscribeHandler指的是就是发送一个.next(element)事件和一个.completed事件;对于NeverProducer这个序列,SubscribeHandler指的是单单只发送一个.completed事件;所以对于不同的SubscribeHandler会有不同的订阅操作,总的来说是根据序列的特性来发送给观察者不同的事件流。

    值得一提的是在RxSwift中还有一个很重要的概念Sink,关于它的解释可以参考一下这个issueSink相当与一个加工者,可以将源事件流转换成一个新的事件流,如果讲事件流比作水流,事件的传递过程比作水管,那么Sink就相当于水管中的一个转换头。关于Sink我们会在之后的文章中详细的讲述。

    相关文章

      网友评论

      • 西山薄凉:只要_isStopped不为0,那么就允许“发射”.next事件,也就是执行onCore方法。
        这里看代码逻辑应该是为0吧:smile:
        Maru:@西山薄凉 哈哈哈哈,都是喜欢狗的人呐~
        西山薄凉:@Maru 写的蛮好,继续加油啊
        PS:我微信头像跟你头像一样。。有种看到自己的感觉:joy:
        Maru:对对对,谢谢指出来,已经修改:stuck_out_tongue_closed_eyes:
      • da27c260cc85:大佬,UML用啥工具画的了?
        Maru:@ArthurChi processon
      • 1c7530ba374e:SinkDisposer 好像跟 Sink 没什么关系啊。
        还有一个问题,observer 到底保存在哪里。被绕晕了,感觉像是保存在了 Disposable 对象内,放 disposeBag 的 _disposables 里。
        如果可以的话,请指点一下。
        Maru:有关系啊,Sink是一个基类,它派生出去了很多XXXSink,Sink就是用来接收流然后处理流,比如CombineLatest这些操作就是通过Sink派生出去的CombineLatestCollectionTypeSink类来完成的。
        至于Observer是在被订阅的时候在内部创建的,根据不同的Observable创建不同的Observer...
      • sharpdev:你好, 在 SinkDisposer 中调用setSinkAndSubscription后 _state 变为2, 再调用dispose()_state 变为3, 如果再次调用setSinkAndSubscription应该是 fatal error ,应该是这样,对不对?
        7c22e8c9d167:对的,如果对一个已经dispose过的sink再次setSinkAndSubscription, _state更新为3,并且以后无论调用过几次setSinkAndSubscription, state都会保持为3.
        并且新设置的sink subscription也会被置为nil
      • 能量熊:写的非常好,期待更多rxswift源码分析以及实例应用
        Maru:@能量熊 :kissing_heart:

      本文标题:【领略RxSwift源码】- 订阅的工作流(Subscribin

      本文链接:https://www.haomeiwen.com/subject/cwyinttx.html