美文网首页
框架学习RxSwift2.0 Observable创建

框架学习RxSwift2.0 Observable创建

作者: 数字d | 来源:发表于2019-07-24 23:53 被阅读0次

    1.常规操作导入RXSwift,参看https://www.jianshu.com/p/b73231a29949

    Rx 是 ReactiveX 的缩写,简单来说就是基于异步 Event(事件)序列的响应式编程。
    Rx 可以简化异步编程方法,并提供更优雅的数据绑定。让我们可以时刻响应新的数据同时顺序地处理它们。

    函数响应式编程 = 函数式编程 + 响应式编程

    函数式:

    (Functional Programming) 把运算过程尽量写成一系列嵌套的函数调用

    响应式编程:

    简称RP(Reactive Programming),响应式编程是一种面向数据流和变化传播的编程范式。

    在命令式编程环境中,a:=b+c表示将表达式的结果赋给a,而之后改变b或c的值不会影响a。但在响应式编程中,a的值会随着b或c的更新而更新。

    在iOS开发中我们经常会响应一些事件button、tap、textField、textView、notifaction、KVO、NSTimer等等这些,都需要做响应监听,响应后都需要在对应的响应事件中去做处理,而原生开发中,触发对象与响应方法是分离的,如button的初始化和点击响应方法是分离的。

    override func viewDidLoad(){
        button = UIButton.init(frame: CGRect(x: 40, y: 100, width: width-80, height: 40))
        button.backgroundColor = .gray
        button.setTitle("按钮", for: .normal)
        self.view.addSubview(button)
        button.addTarget(self, action: #selector(clickBtn(button:)), for: .touchUpInside)
    }
    //button event
    @objc func clickBtn(button:UIButton) {
        print("点击")
    }
    

    RxSwift按钮点击事件的实现

    
    ...
    let bag = DisposeBag()
    ....
     
    self.button.rx.tap    //序列
                .subscribe(onNext: { () in   //订阅
                    print("Button clicked!")
                }, onError: { (error) in
                    print("错误信息")
                }, onCompleted: {
                    print("订阅完成")
                })
                .disposed(by: bag)    //销毁
    
    

    为什么按钮可以.rx呢?
    在RxSwift项目中搜索Reactive.swift文件,协议ReactiveCompatible
    有rx属性,extension NSObject: ReactiveCompatible表示NSObject类及其子类实现协议ReactiveCompatible,所以NSObject类及其子类都可以.rx

    //  Reactive.swift
    
    ...
    public protocol ReactiveCompatible {
        /// Extended type
        associatedtype CompatibleType
    
        /// Reactive extensions.
        static var rx: Reactive<CompatibleType>.Type { get set }
    
        /// Reactive extensions.
        var rx: Reactive<CompatibleType> { get set }
    }
    
    import class Foundation.NSObject
    
    
    extension NSObject: ReactiveCompatible { }
    

    RxSwift

    全称ReactiveX for Swift,是一个简化异步编程的框架,实现了函数响应式编程,事件与对象紧密联系,业务流清晰,便于管理。在RxSwift中,所有异步操作(事件)和数据流均被抽象为可观察序列的概念。

    RXSwift核心:

    响应式编程的核心就是可监听序列的产生以及如何监听。

    名词解释:

    Observable:是一个可监听序列 作用是产生事件

    可以理解为观察者模式里的被观察者

    Observer: 序列的监听(订阅)者 作用是消费事件

    可以理解为观察者模式里的观察者

    Disposable: 可被清除的资源

    简单梳理下上面的流程

    可监听序列被订阅了,就会调用Producer的subscribe方法
    调用self的run方法,当前我们的self是Producer的子类AnonymousObservable

    创建AnonymousObservableSink并将订阅者observer传进去,接着调用AnonymousObservableSink实例的run,参数是AnonymousObservable实例。

    AnonymousObservableSink实例的run又调用AnonymousObservable的_subscribeHandler闭包,参数是AnyObserver,AnyObserver保存了事件的回调,而_subscribeHandler闭包也就是可监听序列创建时传的闭包。

    执行事件
    以next事件为例
    如:订阅onNext事件。

    代码:

    在creat尾随闭包中的onNext具体实现代码是

    public func onNext(_ element: E) {
            self.on(.next(element))
        }
    

    内部流程如下:

    调用observer的onNext方法,接着调用observer的on(_ event: Event<Element>)并将next事件传进去

    执行AnyObserver的observer的事件回调

    执行AnonymousObservableSink的func on(_ event: Event<Element>)方法,接着调用ObserverBase的func on(_ event: Event<Element>)让真正的Observer类去响应事件

    调用AnonymousObserver的onCore方法去执行真正的事件回调。

    序列例子:

    1.创建序列
         let ob = Observable<Any>.create { (obserber) -> Disposable in
                // 3:发送信号
                obserber.onNext("框架班级")
                obserber.onCompleted()
    //            obserber.onError(NSError.init(domain: "coocieeror", code: 10087, userInfo: nil))
                return Disposables.create()
            }
    
    //2.订阅
      let _ = ob.subscribe(onNext: { (text) in
    //4.订阅到
                print("订阅到:\(text)")
            }, onError: { (error) in
                print("error: \(error)")
            }, onCompleted: {
                print("完成")
            }) {
                print("销毁")
            }
    
    

    代码的执行顺序不按照代码行数一行一行的执行,说明这里含有闭包。

    看具体创建序列的流程:
    creat方法

    extension ObservableType {
    
        /**
             Creates an observable sequence from a specified subscribe method implementation.
        
             - seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
        
             - parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
             - returns: The observable sequence with the specified implementation for the `subscribe` method.
             */
        public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.E>) -> RxSwift.Disposable) -> RxSwift.Observable<Self.E>
    }
    

    Create.swift文件中的源码实现

    extension ObservableType {
        // MARK: create
    
        /**
         Creates an observable sequence from a specified subscribe method implementation.
    
         - seealso: [create operator on reactivex.io](http://reactivex.io/documentation/operators/create.html)
    
         - parameter subscribe: Implementation of the resulting observable sequence's `subscribe` method.
         - returns: The observable sequence with the specified implementation for the `subscribe` method.
         */
        public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
            return AnonymousObservable(subscribe)
        }
    }
    
    

    在Create.swift文件中实现的方法ObservableType.create,

    extension ObservableType {
        // MARK: create
    
        public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
            return AnonymousObservable(subscribe)
        }
    }
    
    

    上面代码可以看出creat方法返回了一个AnonymousObservable(匿名序列)

    接着看AnonymousObservable类

    final private class AnonymousObservable<Element>: Producer<Element> {
        typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    
        let _subscribeHandler: SubscribeHandler
    
        init(_ subscribeHandler: @escaping SubscribeHandler) {
    //初始化时候带的参数,这里保存了一个闭包,用于subscribe回调
            self._subscribeHandler = subscribeHandler
        }
    
        override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
            let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
            let subscription = sink.run(self)
            return (sink: sink, subscription: subscription)
        }
    }
    

    这里的AnonymousObservable类并没有subscribe方法,但是父类Producer有

    lass Producer<Element> : Observable<Element> {
        override init() {
            super.init()
        }
    
        override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
            if !CurrentThreadScheduler.isScheduleRequired {
                // The returned disposable needs to release all references once it was disposed.
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                return disposer
            }
            else {
                return CurrentThreadScheduler.instance.schedule(()) { _ in
                    let disposer = SinkDisposer()
                    let sinkAndSubscription = self.run(observer, cancel: disposer)
                    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                    return disposer
                }
            }
        }
    

    创建序列creat方法-生成AnonymousObservable序列-保存subscribeHandler
    到此整个创建序列的流程结束。

    接下来是订阅流程:

    ob.subscribe方法的实现

      extension ObservableType {
    
     
        public func subscribe(onNext: ((Self.E) -> Void)? = nil, onError: ((Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil) -> RxSwift.Disposable
    }
    
    

    而方法实现的源码在ObservableType+Extensions.swift文件中

       
        public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
            -> Disposable {
          
    ...
                
    //创建observer ,event从AnonymousObserver()构建
                let observer = AnonymousObserver<E> { event in
                    
                    #if DEBUG
                        synchronizationTracker.register(synchronizationErrorMessage: .default)
                        defer { synchronizationTracker.unregister() }
                    #endif
                    
                    switch event {
    //对next,error,complated,闭包进行初始化
    //只要观察者observer调用了event的.next事件,这里就会调用订阅事件.onNext
                    case .next(let value):
                        onNext?(value)
                    case .error(let error):
                        if let onError = onError {
                            onError(error)
                        }
                        else {
                            Hooks.defaultErrorHandler(callStack, error)
                        }
                        disposable.dispose()
                    case .completed:
                        onCompleted?()
                        disposable.dispose()
                    }
                }
                return Disposables.create(
                    self.asObservable().subscribe(observer),
    //回调了当前序列(ob)的_subscribeHandler闭包,由于AnonymousObserable类中没有subscribe回调,就使用其父类Produce的subscribe方法,调用子类的实现
                    disposable
                )
        }
    
    

    在AnonymousObserver.swift文件中的实现

    final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
        typealias Element = ElementType
        
        typealias EventHandler = (Event<Element>) -> Void
        
        private let _eventHandler : EventHandler
        
        init(_ eventHandler: @escaping EventHandler) {
    #if TRACE_RESOURCES
            _ = Resources.incrementTotal()
    #endif
            self._eventHandler = eventHandler //保存了包含事件 的闭包
        }
    
        override func onCore(_ event: Event<Element>) {
            return self._eventHandler(event)
        }
        
    #if TRACE_RESOURCES
        deinit {
            _ = Resources.decrementTotal()
        }
    #endif
    }
    
    

    实际流程参看下图

    3.png

    基本UI事件监听代码实现:https://gitee.com/xgkp/RX2.0login.git

    参考来源:
    1.https://www.jianshu.com/p/3617ab385060
    2.https://blog.csdn.net/qq_18951479/article/details/96832932
    3.https://www.jianshu.com/p/c9f854718933

    相关文章

      网友评论

          本文标题:框架学习RxSwift2.0 Observable创建

          本文链接:https://www.haomeiwen.com/subject/aqvqrctx.html