美文网首页
ReactiveSwift框架分析2 — Observer,Si

ReactiveSwift框架分析2 — Observer,Si

作者: 沈枫_ShenF | 来源:发表于2019-06-01 12:56 被阅读0次

    在ReactiveSwift框架分析系列文章1中,我们知道需要三个简单的步骤来创建一个信号并观察它:

    1. 创建一个pipe管道,同时创建了一个输入端Observer,一个输出端Signal
    2. 创建观察者,订阅Signal。
    3. 通过输入端Observer发送值给Signal,这将触发所有订阅了该信号的观察者执行与观察者关联的闭包。

    那这篇文章我们就来通过分析源码看看它究竟是怎样实现的。

    Observer

    An Observer is a simple wrapper around a function which can receive Events.

    上篇文章中我们知道,Observer封装了Event的处理逻辑,以下是其源码:

    public final class Observer {
            public typealias Action = (Event) -> Void
            private let _send: Action
    
                    // Whether the observer should send an `interrupted` event as it deinitializes.
            private let interruptsOnDeinit: Bool
    
            internal init(action: @escaping Action, interruptsOnDeinit: Bool) {
                self._send = action
                self.interruptsOnDeinit = interruptsOnDeinit
            }
    
        public init(_ action: @escaping Action) {
                self._send = action
                self.interruptsOnDeinit = false
        }
    
            public func send(_ event: Event) {
                _send(event)
            }
            public func send(value: Value) {
                _send(.value(value))
            }
           public func send(error: Error) {
                _send(.failed(error))
            }
           public func sendInterrupted() {
                _send(.interrupted)
            }
            
    }
    
    

    从源码可以看出,Observer内部有一个 (Event) -> Void的闭包_send,在初始化Observer的时候就是给这个闭包赋值,调用Observer一系列send方法就是在执行这个闭包,很简单!

    Signal

    Signal是事件流,event是其基本单元,它会主动将事件event向外发送,是热信号,不会等到有人订阅后才开始发送,Signal中包含一个核心属性:Core属性,Signal其实只是包装Core的一个壳而已,所以我们主要还是来分析Core的源码:

    private let core: Core
    
    private final class Core {
            private let disposable: CompositeDisposable
            private let stateLock: Lock 
            private let sendLock: Lock 
            private var state: State 
    
                                   ......
    }
    

    Core里面的State属性,确切来说是一个枚举:

    private enum State {
            case alive(Bag<Observer>, hasDeinitialized: Bool)
            case terminating(Bag<Observer>, TerminationKind)
            case terminated
            ......
    }
    

    其中,除了alive,terminating关联一个Observer数组外,状态terminated是不带关联数组的,这意味着当信号切换到terminated状态时,那么那些被保存的Observer对象也就跟着释放了,所以当不再需要使用信号时,应该向信号发送一个非Value事件确保资源释放。

    State的作用有两个:

    • 指示信号的状态。
    • alive,terminating会保存信号订阅者添加进来的Observer对象。

    Observer对象是怎么通过observe方法被添加进来的呢?先看下该方法具体实现:

    fileprivate func observe(_ observer: Observer) -> Disposable? {
                var token: Bag<Observer>.Token?
                stateLock.lock()
                           
                if case let .alive(observers, hasDeinitialized) = state {
                                 //1.信号处于state.alive状态,将新的observer对象添加到  state.alive的数组中
                    var newObservers = observers
                    token = newObservers.insert(observer)
                    self.state = .alive(newObservers, hasDeinitialized: hasDeinitialized)
                }
    
                stateLock.unlock()
                            
                            //2. 如果1顺利执行,token会被赋值(即信号处于alive状态) 返回一个Disposable对象 
                if let token = token {
                    return AnyDisposable { [weak self] in
                        self?.removeObserver(with: token)
                    }
                } else { //如果1顺利执行,直接向observer对象发送interrupted事件
                    observer.sendInterrupted()
                    return nil
                }
            }
    
    

    从源码我们可以看出,通过observe方法添加并保存了Observer,也就是保存了Event的处理逻辑。

    那数据如何发送的,如何处理Observer内部的处理逻辑的呢?
    再来分析Core的send方法的源码:

    private func send(_ event: Event) {
                if event.isTerminating { // 收到非Value的Event,信号状态从.alive切换到.terminating
    
                    self.stateLock.lock()
    
                                    //  将.alive中的Observer数组移到.terminating中,遍历. terminating的数组observers,调用send函数
                    if case let .alive(observers, _) = state {
                        self.state = .terminating(observers, .init(event))
                        self.stateLock.unlock()
                    } else {
                        self.stateLock.unlock()
                    }
    
                    tryToCommitTermination()
                } else { // 收到Value的Event
                    self.sendLock.lock()
                    self.stateLock.lock()
    
                                    //遍历.alive的数组observers,调用send函数
                    if case let .alive(observers, _) = self.state {
                        self.stateLock.unlock()
    
                        for observer in observers {
                            observer.send(event)
                        }
                    } else {
                        self.stateLock.unlock()
                    }
    
                    self.sendLock.unlock()
    
                    stateLock.lock()
    
                                    //遍历. terminating的数组observers,调用send函数
                    if case .terminating = state {
                        stateLock.unlock()
                        tryToCommitTermination()
                    } else {
                        stateLock.unlock()
                    }
                }
    }
    
    private func tryToCommitTermination() {
        ......
        if case let .terminating(observers, terminationKind) = state {
    
            //切换状态到.terminated,遍历. terminating的数组observers,调用send函数
            state = .terminated
            if let event = terminationKind.materialize() {
                for observer in observers {
                    observer.send(event)
                }
            }
        }
    }
    
    

    可以看出,配合着切换状态,最终send方法中都是遍历Observers数组,让其中各Observer调用send方法。

    我们分析了如何将Observer添加Signal的数组中,Observer的send方法,接下来再看看创建管道的pipe方法。

    public static func pipe(disposable: Disposable? = nil) -> (output: Signal, input: Observer) {
        var observer: Observer!
        
        let signal = self.init { innerObserver, lifetime in
            observer = innerObserver
            lifetime += disposable
        }
        return (signal, observer)
    }
    
    //Signal.init()
    public init(_ generator: (Observer, Lifetime) -> Void) {
         core = Core(generator)
    }
    
    //Core.init()
    fileprivate init(_ generator: (Observer, Lifetime) -> Void) {
        
        //1. 设置信号初始状态为alive 同时初始化alive中的数组
        state = .alive(Bag(), hasDeinitialized: false)
        ......
        //2. 创建一个Observer对象,并将Core.send赋值给该对象的_send闭包
        generator(Observer(action: self.send, interruptsOnDeinit: true), Lifetime(disposable))
    }
    

    从源码可以看出, pipe()函数中包含Signal的初始化创建,而Signal的初始化创建中有包含Core的初始化创建,Core的初始化中又会真正执行generator: (Observer, Lifetime)闭包,这个闭包传入一个Observer对象,所以其实pipe()函数是通过generator闭包捕获了Core.init()中的Observer对象的,而这个Observer对象的_send闭包可以看出其实是Core.send,最后pipe()将返回Signal和Observer。

    所以,pipe().output(即Signal)的作用是管理信号状态并保存由订阅者提供的Observer对象, 而pipe().input(即InnerObserver)的作用则是在接收到Event后依次执行这些被保存的Observer._send

    实例展示,加深印象!!

    上面讲过,通过Signal.pipe()函数会返回一个元组,元组的第一个值是output(类型为Signal),第二个值是input(类型为Observer)。
    创建如下:

    let signalTuple = Signal<Int, NoError>.pipe()
    let (signal, observer) = Signal<Int, NoError>.pipe()
    

    我们在viewDidLoad中创建一个热信号,并发送数据:

    typealias NSignal<T> = Signal<T, NoError>
    override func viewDidLoad() {
            super.viewDidLoad()
           
            let (signal, innerObserver) = NSignal<Int>.pipe()
            
           //create Observer and add it into Signal
            signal.observeValues { (value) in  
                print("did received value: \(value)")
            }
       
            //create Observer and add it into Signal again
            signal.observeValues { (value) in   
                print("did received value: \(value)")
            }
            
            innerObserver.send(value: 1) 
            innerObserver.sendCompleted() 
    }
    //输出结果:  did received value: 1
                         did received value: 1
    

    Signal.observeValues,是Signal.observe的一个便利函数, 作用是创建一个只处理Value事件的Observer并添加到Signal中, 类似的还有只处理Failed事件的Signal.observeFailed和所有事件都能处理的Signal.observeResult。

    再来看一个ViewModel中的Demo:

    typealias NSignal<T> = Signal<T, NoError>
    //ViewModel.swift
    class ViewModel {
        let signal: NSignal<Int>
        let innerObserver: NSignal<Int>.Observer
        
        init() {
                 (signal, innerObserver) = NSignal<Int>.pipe() 
        }
    }
    
    //View1.swift
    class View1 {
        func bind(viewModel: ViewModel) {
            viewModel.signal.observeValues { (value) in
                print("View1 received value: \(value)")
            }
        }
    }
    
    //View2.swift
    class View2 {
        func bind(viewModel: ViewModel) {
            viewModel.signal.observeValues { (value) in
                print("View2 received value: \(value)")
            }
        }
    }
    
    //View3.swift
    class View3 {
        func bind(viewModel: ViewModel) {
            viewModel.signal.observeValues { (value) in
                print("View3 received value: \(value)")
            }
            viewModel.signal.observeInterrupted {
                print("View3 received interrupted")
            }
        }
    }
    
    override func viewDidLoad() {
            super.viewDidLoad()
            
            let view1 = View1()
            let view2 = View2()
            let view3 = View3()
            let viewModel = ViewModel()
    
            view1.bind(viewModel: viewModel)
            viewModel.innerObserver.send(value: 1)
    
            view2.bind(viewModel: viewModel)
            viewModel.innerObserver.send(value: 2)
            viewModel.innerObserver.sendCompleted()//发送一个非Value事件 信号无效
            
            view3.bind(viewModel: viewModel)//信号无效后才订阅
            viewModel.innerObserver.send(value: 3)//信号无效后发送事件
        }
    输出: View1 received value: 1
          View1 received value: 2
          View2 received value: 2
          View3 received interrupted
    

    view2的订阅时间晚于value1的发送时间,所以view2收不到value1对应的事件,而之后发送一个非Value的事件后,信号便无效了。所以虽然view1和view2的订阅都早于value3的发送时间,但因为value3在信号发送前先发送了completed,结果就是view1和view2都不会收到value3事件,当然view3也不会收到value3事件,它只会收到一个interrupted。

    小结

    我们通过pipe()方法,创建一个‘管道’,这个‘管道’其实是个元组,元素output代表输出端,类型为Signal <Int,NoError> , 元素input代表输入端,类型为Observer <Int,NoError>,作为input的Observer发送数据,将事件注入output的Signal,同时触发绑定到Signal中的所有Observer调用send函数。

    相关文章

      网友评论

          本文标题:ReactiveSwift框架分析2 — Observer,Si

          本文链接:https://www.haomeiwen.com/subject/ddegtctx.html