美文网首页
RxSwift初探(1)

RxSwift初探(1)

作者: 理想的不俗人 | 来源:发表于2019-10-16 14:20 被阅读0次

    一、前提:函数响应式编程思想

    简单来说 函数响应式编程 = 函数式编程 + 响应式编程

    (1)函数式

    函数式编程是种编程方式,它将电脑运算视为函数的计算。函数编程语言最重要的基础是λ演算(lambda calculus),而且λ演算的函数可以接受函数当作输入(参数)和输出(返回值)。

    y = f(x)
    x: 参数  f: 函数  y: 返回值
    //把 x  作为一个函数传入  --> x = f(x)  例如 x 值是2,2 = 1+1 = 0 + 2
    y =  f(f(x))//增加了灵活性
    
    let  array = [1,2,3,4,5,6,7]
    //筛选数组(1.获取 > 3的数字;2. 获取到的数字 + 1;3. 获取偶数; 4.)
    //正常操作--可读性底,清晰度差
    for num in array{
        if num > 3{
          let  number = num + 1
          if (number % 2  == 0){
              print(number)
          }
        }
    }
    
    //RxSwift函数式
    array.filter{ $0 > 3}//拿到第一个数大于3
            .filter{($0 + 1) % 2 == 0}// +1 获取偶数
            .forEach{ print($0)}//输出
    
    
    //如果需求变化,函数式则直接注释某项filter,完全不影响,正常操作中则需要大改动
    

    (2)响应式

    对象对某一数据流变化做出响应的这种编码方式

    //对象A和对象B,A和B有一种“说不清”的关系,A要时刻监控B的行为,对B的变化也做出相应的变化。
    例如,A是10岁 ,B比A大10岁,20年后,A是30岁, B则是40岁 。---->类似于KVO 
    

    二、RxSwift 简单使用

    创建序列 -> 订阅序列 -> 发送信号 -> 信号接收

    (1)Button点击事件

    //修改事件类型 button.rx.controlEvent(.touchUpOutside)
    button.rx.tap.subscribe(onNext: { () in
        print("被点击了")
        //处理事件 ……
    }).disposed(by: disposeBag)
    

    (2)手势点击事件

    let tap = UITapGestureRecognizer.init()
    self.view.addGestureRecognizer(tap)
    tap.rx.event.subscribe(onNext: { (tap) in
        print(tap.view as Any)
        //处理事件 
    }).disposed(by: disposeBag)
    

    (3)输入框监听

    //监听输入变化
    textfield.rx.text.orEmpty.changed.subscribe(onNext: { (text) in
        print(text)
        //处理事件 ……
    }).disposed(by: disposeBag)
    //绑定,数据传递
    textfield.rx.text.bind(to: label.rx.text)
    

    (4)KVO事件监听

    self.person.rx.observeWeakly(String.self, "name").subscribe(onNext: { (value) in
        print(value as Any)
        //处理事件 ……
    }).disposed(by: disposeBag)
    

    (5)滑动事件监听

    scrollview.rx.contentOffset.subscribe(onNext: { [weak self](content) in
        let y = content.y
        print(content.y)
        //处理事件 ……
    }).disposed(by: disposeBag)
    

    (6)定时器

    let timer = Observable<Int>
        .interval(1, scheduler: MainScheduler.instance)
    timer.subscribe(onNext: { (time) in
            print(time)
            //处理事件 ……
        }).disposed(by: disposeBag)
    

    (7)通知事件

    NotificationCenter.default.rx
    .notification(UIResponder.keyboardWillShowNotification)
        .subscribe(onNext: { (notification) in
            //获取值
            let during = notification.userInfo?["UIKeyboardAnimationDurationUserInfoKey"] as? Float
            print(during!)
            //处理事件 ……
        }).disposed(by: disposeBag)
    

    (8)网络请求

    let url = URL.init(string: "https://www.baidu.com")
    URLSession.shared.dataTask(with: url!) { (data, response, error) in
        print("network")
        print(String.init(data: data!, encoding: .utf8))
    }.resume()
    URLSession.shared.rx.response(request: URLRequest.init(url: url!))
        .subscribe(onNext: { (data) in
            print(data)
        }, onError: { (error) in
            print(error)
        }, onCompleted: {
            print("请求完成")
        }).disposed(by: disposeBag)
    

    三、RxSwift核心逻辑

    (1)逻辑流程:创建序列 -->订阅序列-->发送信号-->接受信号

    //1、创建序列
    let ob = Observable<Any>.create { (obserber) -> Disposable in
                // 3:发送信号
                obserber.onNext("发送信号")
                obserber.onCompleted()
    //            obserber.onError(NSError.init(domain: "coocieeror", code: 10087, userInfo: nil))
                return Disposables.create()
            }
    //2、订阅序列
    let _ = ob.subscribe(onNext: { (text) in//5、返回销毁者(_)
              //4、接受信号
                print("订阅到:\(text)")
            }, onError: { (error) in
              //4、接受信号
                print("error: \(error)")
            }, onCompleted: {
              //4、接受信号
                print("完成")
            }) {
                print("销毁")
            }
    

    (2)源码分析

    1.创建序列之Create

    Create源码

    //创建返回一个匿名内部类--->AnonymousObservable
    public static func create(_ subscribe: @escaping (AnyObserver<E>) -> Disposable) -> Observable<E> {
            return AnonymousObservable(subscribe)
        }
    

    AnonymousObservable内部类源码

    //继承Producer,初始化并保存了一个_subscribeHandler的回调记录
    final private class AnonymousObservable<Element>: Producer<Element> {
        // 闭包
        typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    
        let _subscribeHandler: SubscribeHandler
    
        init(_ subscribeHandler: @escaping SubscribeHandler) {
            self._subscribeHandler = subscribeHandler//保存闭包-->函数式编程思想
        }
    
        override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
            let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
            let subscription = sink.run(self)
            return (sink: sink, subscription: subscription)
        }
    }
    

    通过源码分析Observable<Any>.create这一步主要做了:
    ^创建一个内部类(AnonymousObservable -->继承Producer)
    ^保存闭包(_subscribeHandler回调)

    2.订阅序列之subscribe(onNext

    subscribe(onNext源码

    /**
    创建一个观察者-->AnonymousObserver-->把所有event事件保存到observer里面
    返回一个销毁者-->Disposables
    */
    public func subscribe(onNext: ((E) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil, onCompleted: (() -> Void)? = nil, onDisposed: (() -> Void)? = nil)
            -> Disposable {
                let disposable: Disposable
                
                if let disposed = onDisposed {
                    disposable = Disposables.create(with: disposed)
                }
                else {
                    disposable = Disposables.create()
                }
                
                #if DEBUG
                    let synchronizationTracker = SynchronizationTracker()
                #endif
                
                let callStack = Hooks.recordCallStackOnError ? Hooks.customCaptureSubscriptionCallstack() : []
                
                //创建AnonymousObserver观察者,并且把所有event事件保存到observer里面
                let observer = AnonymousObserver<E> { event in
                    
                    #if DEBUG
                        synchronizationTracker.register(synchronizationErrorMessage: .default)
                        defer { synchronizationTracker.unregister() }
                    #endif
                    
                    switch event {//判断当前event
                    case .next(let value)://调用event.next信息,则会调用onNext()
                        onNext?(value)
                    case .error(let error):
                        if let onError = onError {
                            onError(error)
                        }
                        else {
                            Hooks.defaultErrorHandler(callStack, error)
                        }
                        disposable.dispose()
                    case .completed:
                        onCompleted?()
                        disposable.dispose()
                    }
                }
                return Disposables.create(//创建一个销毁者返回给subscribe
                // self.asObservable()---> 就是创建序列的ob ,subscribe --->调用父类Producer的subscribe
                    self.asObservable().subscribe(observer),
                    disposable
                )
        }
    

    AnonymousObserver观察者源码

    /**
    保存回调事件闭包(_eventHandler -->所有onNext事件&onError事件&onCompleted事件)
    */
    final class AnonymousObserver<ElementType> : ObserverBase<ElementType> {
        typealias Element = ElementType
        
        typealias EventHandler = (Event<Element>) -> Void
        
        private let _eventHandler : EventHandler
        
        init(_ eventHandler: @escaping EventHandler) {
    #if TRACE_RESOURCES
            _ = Resources.incrementTotal()
    #endif
            self._eventHandler = eventHandler
        }
    
        override func onCore(_ event: Event<Element>) {
            return self._eventHandler(event)
        }
        
    #if TRACE_RESOURCES
        deinit {
            _ = Resources.decrementTotal()
        }
    #endif
    }
    
    //进入Producer中的subscribe方法
    class Producer<Element> : Observable<Element> {
        override init() {
            super.init()
        }
    
        override func subscribe<O : ObserverType>(_ observer: O) -> Disposable where O.E == Element {
            if !CurrentThreadScheduler.isScheduleRequired {
                // The returned disposable needs to release all references once it was disposed.
                let disposer = SinkDisposer()
                let sinkAndSubscription = self.run(observer, cancel: disposer)//调用AnonymousObservable的run方法
                disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                return disposer
            }
            else {
                return CurrentThreadScheduler.instance.schedule(()) { _ in
                    let disposer = SinkDisposer()
                    let sinkAndSubscription = self.run(observer, cancel: disposer)
                    disposer.setSinkAndSubscription(sink: sinkAndSubscription.sink, subscription: sinkAndSubscription.subscription)
    
                    return disposer
                }
            }
        }
    
        func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
            rxAbstractMethod()
        }
    }
    
    //进入AnonymousObservable中的run方法
    final private class AnonymousObservable<Element>: Producer<Element> {
        typealias SubscribeHandler = (AnyObserver<Element>) -> Disposable
    
        let _subscribeHandler: SubscribeHandler
    
        init(_ subscribeHandler: @escaping SubscribeHandler) {
            self._subscribeHandler = subscribeHandler
        }
    
        override func run<O : ObserverType>(_ observer: O, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where O.E == Element {
            let sink = AnonymousObservableSink(observer: observer, cancel: cancel)
            let subscription = sink.run(self)//调用了AnonymousObservableSink中的run方法
            return (sink: sink, subscription: subscription)
        }
    }
    
    //进入AnonymousObservableSink中调用run方法
    final private class AnonymousObservableSink<O: ObserverType>: Sink<O>, ObserverType {
        typealias E = O.E
        typealias Parent = AnonymousObservable<E>
    
        // state
        private let _isStopped = AtomicInt(0)
    
        #if DEBUG
            fileprivate let _synchronizationTracker = SynchronizationTracker()
        #endif
    
        override init(observer: O, cancel: Cancelable) {
            super.init(observer: observer, cancel: cancel)
        }
    
        func on(_ event: Event<E>) {
            #if DEBUG
                self._synchronizationTracker.register(synchronizationErrorMessage: .default)
                defer { self._synchronizationTracker.unregister() }
            #endif
            switch event {
            case .next:
                if load(self._isStopped) == 1 {
                    return
                }
                self.forwardOn(event)
            case .error, .completed:
                if fetchOr(self._isStopped, 1) == 0 {
                    self.forwardOn(event)
                    self.dispose()
                }
            }
        }
    
        func run(_ parent: Parent) -> Disposable {
            return parent._subscribeHandler(AnyObserver(self))//run 方法中的_subscribeHandler,是不是很熟悉,是不是在《创建序列》创建的_subscribeHandler 闭包, 调用了_subscribeHandler,传入了AnyObserver(self)-->observer
        }
    }
    
    

    是不是有点蒙,来来来, 咱们总结下:
    1.创建一个观察者
    -->AnonymousObserver(let observer = AnonymousObserver<E> { event in)🤚保存闭包回调_eventHandler
    -->把所有event事件保存到observer里面

    2.返回一个销毁者
    -->Disposables (return Disposables.create)
    -->self.asObservable().subscribe(observer), self.asObservable==创建序列的ob ,subscribe --->调用父类Producer的subscribe
    -->进入父类Producer的subscribe方法调用self.run(observer, cancel: disposer)
    -->进入AnonymousObservable中的run方法调用sink.run(self)
    -->进入AnonymousObservableSink中调用run方法,return parent._subscribeHandler(AnyObserver(self))
    -->调用了\color{#FF0000}{《创建序列》}中保存的闭包回调_subscribeHandler,传入了AnyObserver(self),也就是传入了🤚这个行代码创建的observer
    -->直到这个时候,\color{#FF0000}{《创建序列》} let ob = Observable<Any>.create { (obserber) -> Disposable in}中的obserber就被传入进来
    -->当obserber调用了,onNext()、onError()、onCompleted(),则通过_eventHandler闭包回调到\color{#FF0000}{《订阅序列》}中的onNext、onError、onCompleted

    3.销毁

    下面引用下别人的总结流程图
    细细品一品 再结合这张图品一品

    \color{#DA7860}{如果不对之处还敬请指教😝}

    相关文章

      网友评论

          本文标题:RxSwift初探(1)

          本文链接:https://www.haomeiwen.com/subject/jlobpctx.html