美文网首页ReactiveX
RxSwift(5)-Observable的特殊序列

RxSwift(5)-Observable的特殊序列

作者: xxxxxxxx_123 | 来源:发表于2020-04-21 19:18 被阅读0次

    为了方便我们我们日常的使用,RxSwiftObservable基础上封装了一些我们常用的特殊序列,如DriverSingleCompletableMaybe等,下面我们就来看看其使用和原理。

    Driver

    可以看作是驱动应用程序的可观察序列的构建器模式。主要是为了简化UI层的代码。

    下面我们来看一个例子,我们在输入框输入一行文字,开始去服务器获取数据,返回数据之后刷新页面mLabelmButton上面的文字:

    private func httpTest() {
        let res = mTextField.rx.text.skip(1)
            .flatMap { (input) -> Observable<String> in
                return self.fetchData(inputText: input ?? "")
        }
            
        res.bind(to: mLabel.rx.text)
        .disposed(by: disposeBag)
            
        res.bind(to: mButton.rx.title())
        .disposed(by: disposeBag)
    }
        
    private func fetchData(inputText: String) -> Observable<String> {
        print("---网络请求之前的线程--\(Thread.current)--")
        return Observable<String>.create { (observer) -> Disposable in
                
            // 模拟网络请求
            DispatchQueue.global().async {
                print("---网络请求中的线程--\(Thread.current)--")
                observer.onNext("请求回来的数据")
                observer.onCompleted()
            }
                
            return Disposables.create();
        }
    }
    

    运行结果如下:

    drive01.png

    可以看出上面的代码其实是有问题的:

    • 如果fetchData的序列产生了一个错误(网络请求失败),这个错误将取消所有绑定,当用户输入一个新的关键字时,是无法发起新的网络请求。
    • 返回的结果被绑定到两个UI元素上。那就意味着,每次用户输入一个新的关键字时,就会分别为两个UI元素发起HTTP请求。这就是控制台会输出两遍的原因。
    • 如果fetchData在后台返回序列,那么刷新页面也会在后台进行,这样就会出现异常崩溃。

    我们可以对上述代码进行优化:

    let res = mTextField.rx.text.skip(1)
        .flatMap { (input) -> Observable<String> in
            return self.fetchData(inputText: input ?? "")
                .observeOn(MainScheduler.instance)
                .catchErrorJustReturn("网络请求错误")
    }
    .share(replay: 1, scope: .whileConnected)
    
    • observeOn(MainScheduler.instance):在主线程返回结果
    • catchErrorJustReturn():处理网络错误异常
    • share():共享网络请求,防止多次调用

    我们可以直接使用Drive优化:

    private func driveTest() {
        let res = mTextField.rx.text.asDriver()
            .flatMap { input in
                self.fetchData(inputText: input ?? "")
                 .asDriver(onErrorJustReturn: "网络请求错误")
        }
    
        res.drive(mLabel.rx.text)
        .disposed(by: disposeBag)
            
        res.drive(mButton.rx.title())
        .disposed(by: disposeBag)
    }
    

    我们只需要把普通的观察序列转化为DriveUI绑定的时候也使用drive,并且加上一句容错处理即可。

    我们来看看源码:

    public typealias Driver<E> = SharedSequence<DriverSharingStrategy, E>
    

    Driver其实是SharedSequence这个序列的别名。它有以下特点:

    • 不会失败
    • 在主线程传递事件
    • 共享策略是share(replay: 1, scope: .whileConnected),也就是说所有的观察者都共享序列的计算资源

    上述例子中,我们调用asDriver()的时候就会调用进入下面方法:

    extension ControlProperty {
        public func asDriver() -> Driver<E> {
            return self.asDriver { _ -> Driver<E> in
                return Driver.empty()
            }
        }
    }
    

    继续跟进:

    public func asDriver(onErrorRecover: @escaping (_ error: Swift.Error) -> Driver<E>) -> Driver<E> {
        let source = self
            .asObservable()
            // 主线程
            .observeOn(DriverSharingStrategy.scheduler)
            .catchError { error in
                onErrorRecover(error).asObservable()
            }
        return Driver(source)
    }
    
    public static var scheduler: SchedulerType { return SharingScheduler.make() }
    
    // 主线程
    public private(set) static var make: () -> SchedulerType = { MainScheduler() }
    

    这里就可以看到.observeOn指定调度者的时候就会指定到主线程中。catchError是对错误信息的处理。最后会返回Driver(source)方法,DriverSharedSequence这个序列的别名,返回也就是SharedSequence的初始化方法。

    public struct SharedSequence<S: SharingStrategyProtocol, Element> : SharedSequenceConvertibleType {
        public typealias E = Element
        public typealias SharingStrategy = S
    
        let _source: Observable<E>
    
        init(_ source: Observable<E>) {
            self._source = S.share(source)
        }
    
        .....
    
        public func asObservable() -> Observable<E> {
            return self._source
        }
    
        public func asSharedSequence() -> SharedSequence<SharingStrategy, E> {
            return self
        }
    }
    

    其初始化方法会调用DriverSharingStrategyshare方法:

    public struct DriverSharingStrategy: SharingStrategyProtocol {
        
        public static var scheduler: SchedulerType { return SharingScheduler.make() }
        
        public static func share<E>(_ source: Observable<E>) -> Observable<E> {
            return source.share(replay: 1, scope: .whileConnected)
        }
    }
    

    可以看到最终还是调用了share(replay: 1, scope: .whileConnected)共享策略。也就是说调用asDriver()方法就确定了是在主线程订阅观察而且确定了共享策略。

    我们再来看看Driver<E>也就是SharedSequence调用drive方法做了什么。

    public func drive<O: ObserverType>(_ observer: O) -> Disposable where O.E == E? {
        // 判断是否在主线程
        MainScheduler.ensureRunningOnMainThread(errorMessage: errorMessage)
        return self.asSharedSequence().asObservable().map { $0 as E? }.subscribe(observer)
    }
    
    public class func ensureRunningOnMainThread(errorMessage: String? = nil) {
        guard Thread.isMainThread else {
            rxFatalError(errorMessage ?? "Running on background thread.")
        }
    }
    

    首先判断是否是在主线程操作,不是的话就报错。然后使用map函数进行映射调整数据类型,接着调用订阅方法subscribe()。其实也就是做了一层封装。

    Single

    只包含一个元素的序列,要么发出一个元素,要么发出错误事件。不能共享资源。

    public typealias Single<Element> = PrimitiveSequence<SingleTrait, Element>
    

    SinglePrimitiveSequence的别名。它定义了两个事件的枚举:

    public enum SingleEvent<Element> {
        /// 只发出一个元素 默认发出`.next(Element)`和`.completed`)
        case success(Element)
        
        /// 只发出错误事件`.error(Error)`)
        case error(Swift.Error)
    }
    

    我们可以使用如下方法创建一个Single序列:

    func createSingleTest() {
        Single<String>.create { (sob) -> Disposable in
            if showSuccess {
                sob(.success("这是一个成功的Single事件"))
            } else {
                sob(.error(NSError.init(domain: "错误的signle事件", code: 10000, userInfo: nil)))
            }
            return Disposables.create()
        }.subscribe { (element) in
            print("==\(element)==")
        }.disposed(by: disposeBag)
    }
    

    控制台输出:

    // 成功信号
    ==success("这是一个成功的Single事件")==
    // 错误信号
    ==error(Error Domain=错误的signle事件 Code=10000 "(null)")==
    

    源码如下:

    public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single<Element> {
       let source = Observable<Element>.create { observer in
           return subscribe { event in
               switch event {
               case .success(let element):
                   observer.on(.next(element))
                   observer.on(.completed)
               case .error(let error):
                   observer.on(.error(error))
               }
           }
       }
       
       return PrimitiveSequence(raw: source)
    }
    

    可以看出来Single其实是对Observable的封装,响应只有两种,successerrorsuccess响应之后会正常发出事件,并且发出完成事件,error就是发出错误事件。

    我们再来看看Single的订阅方法:

    public func subscribe(_ observer: @escaping (SingleEvent<Element>) -> Void) -> Disposable {
        var stopped = false
        return self.primitiveSequence.asObservable().subscribe { event in
            if stopped { return }
            stopped = true
            
            switch event {
            case .next(let element):
                observer(.success(element))
            case .error(let error):
                observer(.error(error))
            case .completed:
                rxFatalErrorInDebug("Singles can't emit a completion event")
            }
        }
    }
    

    还是调用的Observablesubscribe方法,只是对结果做了一层处理,如果是next事件,转换成success发送;如果是error事件就正常发送,不错处理;如果是completed事件直接报错。也就是Single是没有单独的completed事件。

    另外,我们也可以使用如下方法创建Single序列

    func asSingleTest() {
        Observable.of("1").asSingle()
            .subscribe(onSuccess: { (element) in
                print("==\(element)==")
            }).disposed(by: disposeBag)
    }
    

    控制台输出:

    ==1==
    

    如果把元素改为多个,控制台就会报错:

    Unhandled error happened: Sequence contains more than one element.
     subscription called from:
    

    我们再来看看源码:

    public func asSingle() -> Single<Element> {
        return PrimitiveSequence(raw: AsSingle(source: self.asObservable()))
    }
    
    final class AsSingle<Element>: Producer<Element> {
        fileprivate let _source: Observable<Element>
    
        init(source: Observable<Element>) {
            self._source = source
        }
    
        override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
            let sink = AsSingleSink(observer: observer, cancel: cancel)
            let subscription = self._source.subscribe(sink)
            return (sink: sink, subscription: subscription)
        }
    }
    
    
    fileprivate final class AsSingleSink<Observer: ObserverType> : Sink<Observer>, ObserverType { 
        typealias Element = Observer.Element
    
        private var _element: Event<Element>?
    
        func on(_ event: Event<Element>) {
            switch event {
            case .next:
                // 第一次发送事件的是_element==nil 如果不等于就说明观察序列中有多个元素
                if self._element != nil {
                    self.forwardOn(.error(RxError.moreThanOneElement))
                    self.dispose()
                }
    
                self._element = event
            case .error:
                self.forwardOn(event)
                self.dispose()
            case .completed:
                // 第二次进来就是completed事件了
                if let element = self._element {
                    self.forwardOn(element)
                    self.forwardOn(.completed)
                }
                else {
                    self.forwardOn(.error(RxError.noElements))
                }
                self.dispose()
            }
        }
    }
    

    其实Single就是对Observable做了一层封装,封装成只能是单个元素或者错误事件的序列。

    Completable

    Completable是一种不包含任何元素的序列,也就是包含的元素个数为0,所以它要么只能产生一个completed事件,要么产生一个error事件。不具备共享资源的能力。

    定义如下:

    public typealias Completable = PrimitiveSequence<CompletableTrait, Swift.Never>
    

    它也是PrimitiveSequence的别名,不过传入的参数不同。Completable定了两种类型事件:

    public enum CompletableEvent {
        /// 错误事件 .error(Error)
        case error(Swift.Error)
        
        /// 完成事件
        case completed
    }
    

    我们可以使用如下方法创建Completable序列:

    func createCompletableTest() {
        var showCompleted = true
        Completable.create { (cob) -> Disposable in
            if showCompleted {
                cob(.completed)
            } else {
                cob(.error(NSError.init(domain: "错误的signle信号", code: 10000, userInfo: nil)))
            }
            return Disposables.create()
        }.subscribe(onCompleted: {
            print("==完成==")
        }) { (error) in
            print("==\(error)==")
        }.disposed(by: disposeBag)
    }
    

    控制台输出:

    // .completed 
    ==完成==
    // .error
    ==Error Domain=错误信号 Code=10000 "(null)"==
    
    

    源码如下:

    public static func create(subscribe: @escaping (@escaping CompletableObserver) -> Disposable) -> PrimitiveSequence<Trait, Element> {
        let source = Observable<Element>.create { observer in
            return subscribe { event in
                switch event {
                case .error(let error):
                    observer.on(.error(error))
                case .completed:
                    observer.on(.completed)
                }
            }
        }
            
        return PrimitiveSequence(raw: source)
    }
    

    可以看出来CompletableSingle类似,响应只有两种,completederrorcompleted即完成事件,error就是错误事件。

    我们再来看看Completablesubscribe方法:

    public func subscribe(onCompleted: (() -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil) -> Disposable {
        return self.primitiveSequence.subscribe { event in
            switch event {
            case .error(let error):
                if let onError = onError {
                    onError(error)
                }
            case .completed:
                onCompleted?()
            }
        }
    }
    

    如果是.completed直接就回调完成的闭包,如果是.error,就执行onError发送error事件。

    我们也可以使用asCompletable()方法:

    func asCompletableTest() {
        Observable<Never>.empty()
            .asCompletable()
            .subscribe(onCompleted: {
                print("==完成==")
            }) { (error) in
                print("==\(error)==")
            }.disposed(by: disposeBag)
    }
    

    控制台输出:

    ==完成==
    

    源码如下:

    public func asCompletable()
        -> Completable {
        return PrimitiveSequence(raw: self.asObservable())
    }
    

    Maybe

    Maybe序列只能做一件事情,也就是只能发出一个事件,它要么只能发出一个元素,要么产生一个completed事件,要么产生一个error事件。

    定义如下:

    public typealias Maybe<Element> = PrimitiveSequence<MaybeTrait, Element>
    

    它也是PrimitiveSequence的别名,不过传入的参数不同。Maybe定了三种类型事件:

    public enum MaybeEvent<Element> {
        /// 发送一个元素且发送完成事件 .next(Element)、.completed
        case success(Element)
        
        /// 错误事件 .error(Error)
        case error(Swift.Error)
        
        /// 完成事件
        case completed
    }
    

    我们可以使用如下方法创建Maybe序列:

    func createMaybeTest() {
        Maybe<String>.create(subscribe: { (mob) -> Disposable in
        mob(.success("成功事件"))
            mob(.completed)
            mob(.error(NSError.init(domain: "错误事件", code: 10000, userInfo: nil)))
            return Disposables.create()
        }).subscribe({ (element) in
            print("==\(element)==")
        }).disposed(by: disposeBag)
    }
    

    控制台输出:

    ==success("成功事件")==
    

    源码如下:

    public static func create(subscribe: @escaping (@escaping MaybeObserver) -> Disposable) -> PrimitiveSequence<Trait, Element> {
        let source = Observable<Element>.create { observer in
            return subscribe { event in
                switch event {
                case .success(let element):
                    observer.on(.next(element))
                    observer.on(.completed)
                case .error(let error):
                    observer.on(.error(error))
                case .completed:
                    observer.on(.completed)
                }
            }
        }
        
        return PrimitiveSequence(raw: source)
    }
    

    可以看出来,Maybe也和和Single类似,不过它有三种响应,successcompletederrorsuccess即发送一个元素而且发送完成事件,completed即完成事件,error就是错误事件。

    我们再来看看Maybesubscribe方法:

    public func subscribe(_ observer: @escaping (MaybeEvent<Element>) -> Void) -> Disposable {
        var stopped = false
        return self.primitiveSequence.asObservable().subscribe { event in
            if stopped { return }
            stopped = true
            
            switch event {
            case .next(let element):
                observer(.success(element))
            case .error(let error):
                observer(.error(error))
            case .completed:
                observer(.completed)
            }
        }
    }
    

    这里有一个stopped的变量,专门用来控制Maybe的事件只能响应一个。

    我们也可以使用asMaybe()方法:

    func asMaybeTest() {
        Observable<String>.of("A")
        .asMaybe()
            .subscribe(onSuccess: { (element) in
                print("==\(element)==")
            }, onError: { (error) in
                print("==错误==")
            }) {
                print("==完成==")
        }.disposed(by: disposeBag)
    }
    

    控制台输出:

    ==A==
    

    源码如下:

    public func asMaybe() -> Maybe<Element> {
        return PrimitiveSequence(raw: AsMaybe(source: self.asObservable()))
    }
    
    final class AsMaybe<Element>: Producer<Element> {
        fileprivate let _source: Observable<Element>
    
        init(source: Observable<Element>) {
            self._source = source
        }
    
        override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
            let sink = AsMaybeSink(observer: observer, cancel: cancel)
            let subscription = self._source.subscribe(sink)
            return (sink: sink, subscription: subscription)
        }
    }
    
    fileprivate final class AsMaybeSink<Observer: ObserverType> : Sink<Observer>, ObserverType {
        typealias Element = Observer.Element
    
        private var _element: Event<Element>?
    
        func on(_ event: Event<Element>) {
            switch event {
            case .next:
                if self._element != nil {
                    // 多个元素会报错
                    self.forwardOn(.error(RxError.moreThanOneElement))
                    self.dispose()
                }
    
                self._element = event
            case .error:
                self.forwardOn(event)
                self.dispose()
            case .completed:
                if let element = self._element {
                    self.forwardOn(element)
                }
                self.forwardOn(.completed)
                self.dispose()
            }
        }
    }
    

    可以看出来asMaybe()asSingle()的处理很类似。asMaybe只是多加入了完成事件的处理。

    Maybe可以看做是SingleCompletable的结合体。需要注意的是,Maybe序列如果存在多个元素也是会抛出异常的。

    总结

    • Drive:订阅操作都是在主线程的,并且会共享资源操作,一般用于对UI的绑定操作。
    • Single:要么发出单个元素并且完成,要么是错误事件。
    • Completable:没有元素的序列,要么是完成事件、要么是错误事件。
    • Maybe:要么发出单个元素并且完成,要么是错误事件,要么是完成事件。

    相关文章

      网友评论

        本文标题:RxSwift(5)-Observable的特殊序列

        本文链接:https://www.haomeiwen.com/subject/mwipihtx.html