美文网首页
RxSwift特征序列

RxSwift特征序列

作者: yahibo | 来源:发表于2019-07-30 23:13 被阅读0次

一、概述
二、Single
三、Completable
四、Maybe
五、Driver
六、Signal
七、ControlEvent

ReactiveX.png

一、概述

任何序列都可以用Observable描述,创建序列 -> 订阅序列 -> 信号发送 -> 信号接收。

Observable<Any>.create { (observer) -> Disposable in
    observer.onNext("信号1")
    return Disposables.create()
}.subscribe(onNext: { (val) in
    print("信号接收区:\(val)")
}).disposed(by: disposeBag)

Observable是通用序列的描述符,调用.onNext.onErroronCompleted来发送信号,通用性强,但针对特殊需求可能会觉得繁琐,因此RxSwift还提供了一组特征序列,是Observable序列的变种,它能够帮助我们更准确的描述序列。即SingleCompletableMaybeDriverSignalControlEvent

二、Single

1、定义

单元素序列,信号只发送一次,响应信号或错误信号。

Single<Any>.create { (single) -> Disposable in
            single(.success("假装我是一个正儿八经的数据"))
            //single(.error(NSError.init(domain: "网络出现错误", code: 101, userInfo:["name":"hibo"])))
            return Disposables.create()
        }.subscribe(onSuccess: { (val) in
            print("Single:\(val)")
        }) { (error) in
            print("SingleError:\(error)")
        }.disposed(by: disposeBag)
  • sinngle(.success(data)) -> onSuccess 发送响应元素到成功观察者
  • sinngle(.error(error)) -> error 发送错误元素到错误观察者

响应元素和错误元素分开处理,此时我们可以联想到应用中的网络请求,成功数据用来渲染,错误数则据弹出提示框。

2、源码探索

2.1、Single定义

/// Sequence containing exactly 1 element
public enum SingleTrait { }
/// Represents a push style sequence containing 1 element.
public typealias Single<Element> = PrimitiveSequence<SingleTrait, Element>

public enum SingleEvent<Element> {
    /// One and only sequence element is produced. (underlying observable sequence emits: `.next(Element)`, `.completed`)
    case success(Element)
    
    /// Sequence terminated with an error. (underlying observable sequence emits: `.error(Error)`)
    case error(Swift.Error)
}

定位到Single.swift文件,首先能看到SinglePrimitiveSequence结构体类型的别名,SingleEvent是事件枚举,有successerror两个成员变量。

2.2、create创建序列。代码如下(此处代码标记为1️⃣):

extension PrimitiveSequenceType where TraitType == SingleTrait {
    public typealias SingleObserver = (SingleEvent<ElementType>) -> Void
          //代码省略若干行
    public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single<ElementType> {
        let source = Observable<ElementType>.create { observer in
            return subscribe { event in
                switch event {
                case .success(let element):
                    observer.on(.next(element))
                    observer.on(.completed)
                case .error(let error):
                    observer.on(.error(error))
                }
            }
        }
        
        return PrimitiveSequence(raw: source)
    }
}

首先看参数是一个带Disposable类型返回值的闭包,交由外部(业务层)实现,内部调用向外传入一个SingleObserver闭包,以上写法不太好理解,我们可以换一种写法:

public static func create(subscribe: @escaping (@escaping SingleObserver) -> Disposable) -> Single<ElementType> {
    let source = Observable<ElementType>.create { observer in
        // 1、内部实现一个闭包,用来接收外界传入的SingleEvent信号,接着做进一步的信号发送
        let block = { (event:SingleEvent<ElementType>) -> Void in
            switch event {
            case .success(let element):
                observer.on(.next(element))
                observer.on(.completed)
            case .error(let error):
                observer.on(.error(error))
            }
        }
        // 2、调用外部实现的闭包方法,向外部发送内部实现的闭包方法做连接作用
        let disposable = subscribe(block)//3、返回值Disposable对象 
        return disposable
    }
    return PrimitiveSequence(raw: source)//4、创建PrimitiveSequence对象并保存Observable序列对象
}
  • 内部实现一个闭包block,用来接收外界传入的SingleEvent信号,接着做进一步的信号发送
  • 调用外部实现的闭包方法,将内部实现的闭包block发送出去,起连接作用
  • 创建PrimitiveSequence对象并保存Observable序列对象source,返回PrimitiveSequence对象

create方法内部实际上实现了一个Observable序列,由此可见Single序列是对Observable序列的封装,Disposable对象通过闭包交由业务层创建,Single序列在实现上,方式方法与Observable保持一致,此处可称一绝。当前我们只探索Single的信号是如何完成传递,Observable序列的信号传递流程在《Swift核心源码探索》中有详细介绍。

2.3、订阅序列

也是在同PrimitiveSequenceType扩展中定义,代码如下(此处代码标记为2️⃣):

public func subscribe(onSuccess: ((ElementType) -> Void)? = nil, onError: ((Swift.Error) -> Void)? = nil) -> Disposable {
    #if DEBUG
         let callStack = Hooks.recordCallStackOnError ? Thread.callStackSymbols : []
    #else
        let callStack = [String]()
    #endif

    return self.primitiveSequence.subscribe { event in
        switch event {
        case .success(let element):
            onSuccess?(element)
        case .error(let error):
            if let onError = onError {
                onError(error)
            } else {
                Hooks.defaultErrorHandler(callStack, error)
            }
        }
    }
}

方法中先调用了self.primitiveSequence方法,返回了self,方法是在遵循PrimitiveSequenceType协议的PrimitiveSequence的扩展中,为了保证协议的一致性。代码如下:

extension PrimitiveSequence: PrimitiveSequenceType {
    /// Additional constraints
    public typealias TraitType = Trait
    /// Sequence element type
    public typealias ElementType = Element

    // Converts `self` to primitive sequence.
    ///
    /// - returns: Observable sequence that represents `self`.
    public var primitiveSequence: PrimitiveSequence<TraitType, ElementType> {
        return self
    }
}

紧接着调用另一个subscribe方法,代码如下(此处代码标记为3️⃣):

public func subscribe(_ observer: @escaping (SingleEvent<ElementType>) -> Void) -> Disposable {
    var stopped = false
    return self.primitiveSequence.asObservable().subscribe { event in
        if stopped { return }
        stopped = true
        
        switch event {
        case .next(let element):
            observer(.success(element))
        case .error(let error):
            observer(.error(error))
        case .completed:
            rxFatalErrorInDebug("Singles can't emit a completion event")
        }
    }
}
  • self.primitiveSequence -> asObservable() -> subscribe
  • 此处截断了completed信号的向上传递,因此Single序列只能收到响应信号和错误信号

该段代码也调用了self.primitiveSequence方法,接着调用asObservable()方法,查看代码发现此处是为了获取source对象,即Observable可观察序列。

再查看subscribe的方法(此处标记为代码4️⃣):

public func subscribe(_ on: @escaping (Event<E>) -> Void)
    -> Disposable {
        let observer = AnonymousObserver { e in
            on(e)
        }
        return self.asObservable().subscribe(observer)
}
  • 代码创建了一个观察者,当前观察者将会收到发送过来的消息,并由此通过闭包一层层传到业务层。 4️⃣ -> 3️⃣ -> 2️⃣ -> 1️⃣ ->业务层

  • 当前self指向的是1️⃣处创建并保存的Observable类型的source对象,因此该处subscribe所调用的即是Produce类中的subscribe方法,在方法内部创建了sink对象,来触发创建序列时实现的闭包,即代码1️⃣处所create后的闭包

  • 此时就到了业务层,通过create内部实现的闭包single向内部发送消息,再有observer调用on来向观察者发送信号

  • 信号发送不做赘述,最终会到达4️⃣处代码的观察者,此时再由闭包一层层向上传递,直到业务层的监听闭包

总结:

序列的产生,订阅,发送,接收还是由Observable来实现的,Single只是对Observable做了封装,去除了onCompleted的消息监听及消息发送。

具体的Observable序列产生到观察流程见《Swift核心源码探索》

三、Completable

只能产生completed事件和error事件,没有序列元素值产生。

Completable.create { (completable) -> Disposable in
    completable(.completed)
    //completable(.error(NSError.init(domain: "出现异常", code: 101, userInfo: nil)))
    return Disposables.create()
}.subscribe(onCompleted: {
    print("Completable")
}) { (error) in
    print(error)
}.disposed(by: disposeBag)
  • 应用场景,只关心任务是否完成,不关心不需要结果
  • Competable.swift下,在PrimitiveSequenceType扩展中实现了序列的创建,订阅,即信号转发

定义如下:

/// Sequence containing 0 elements
public enum CompletableTrait { }
/// Represents a push style sequence containing 0 elements.
public typealias Completable = PrimitiveSequence<CompletableTrait, Swift.Never>

public enum CompletableEvent {
    /// Sequence terminated with an error. (underlying observable sequence emits: `.error(Error)`)
    case error(Swift.Error)
    
    /// Sequence completed successfully.
    case completed
}

同样Completable类也是PrimitiveSequence的别名,并声明一个枚举包含,errorcompleted成员变量,限定了事件产生类型。都是对Observable序列的封装,源码此处不做探索说明,和Single一致,只是在订阅阶段对.next事件做了拦截。

四、Maybe

Single序列相似,发出一个元素或一个completed事件或error事件。

Maybe<Any>.create { (maybe) -> Disposable in
        maybe(.success("element"))
        //maybe(.completed)
        //maybe(.error(NSError.init(domain: "出现异常", code: 101, userInfo: nil)))
        return Disposables.create()
    }.subscribe(onSuccess: { (val) in
        print(val)
    }, onError: { (error) in
        print("error:\(error)")
    }) {
        print("completed")
    }.disposed(by: disposeBag)

在开发中,如果一个业务有时候需要一个元素,有时候只需要知道处理完成的时候,可以使用该Maybe,解决不确定需求问题。源码探索略,同上。

五、Driver

  • Driver序列不会产生error事件
  • 在主线程中监听,会向新订阅者发送上次发送过的元素,简化UI层的代码
  • 共享序列

下面看一下为什么会扩展一个Driver序列。

有一个需求:

  • 搜索框中每次输入一个文本,获取一次网络请求,成功后渲染UI,多个控件显示

先实现一个简单的UI

let tf = UITextField.init(frame: CGRect(x: 100, y: 100, width: 200, height: 40))
tf.borderStyle = .roundedRect
tf.placeholder = "请输入"
self.view.addSubview(tf)

let label1 = UILabel.init(frame: CGRect(x: 100, y: 160, width: 200, height: 40))
label1.backgroundColor = .groupTableViewBackground
label1.textAlignment = .center
self.view.addSubview(label1)

let label2 = UILabel.init(frame: CGRect(x: 100, y: 210, width: 200, height: 40))
label2.backgroundColor = .groupTableViewBackground
label2.textAlignment = .center
self.view.addSubview(label2)

创建了一个textfield,两个label用来展示。下面在来实现一个网络请求,返回一个Single序列:

func network(text:String) -> Single<Any> {
    return Single<Any>.create(subscribe: { (single) -> Disposable in
        if text == "1234"{
            single(.error(NSError.init(domain: "出现错误", code: 101, userInfo: nil)))
        }
        DispatchQueue.global().async {
            print("请求网络")
            single(.success(text))
        }
        return Disposables.create()
    })
}

网络请求为耗时操作,因此我们在异步中来完成,直接发送序列,假装我们请求了一次网络。

1、普通方法实现textfield输入序列的监听,并调取网络请求方法:

let result = tf.rx.text.orEmpty.skip(1)
                .flatMap{
                    return self.network(text: $0)
                        .observeOn(MainScheduler.instance)
                        .catchErrorJustReturn("网络请求失败")
            }.share(replay: 1, scope: .whileConnected)
//网络请求将发送多次请求
result.subscribe(onNext: { (val) in
    print("订阅一:\(val) 线程:\(Thread.current)")
}).disposed(by: disposeBag)

result.subscribe(onNext: { (val) in
    print("订阅二:\(val) 线程:\(Thread.current)")
}).disposed(by: disposeBag)

result.map{"\(($0 as! String).count)"}.bind(to: label1.rx.text).disposed(by: disposeBag)
result.map{"\($0)"}.bind(to: label2.rx.text).disposed(by: disposeBag)
  • flatMap将原序列转换为Observables,将这些Observables的元素合并之后发出
  • observeOn选择在哪个线程执行
  • catchErrorJustReturn错误处理,将onError事件转为onNext事件
  • share为多个观察者共享资源,网络请求只发送呢一次,否则多个订阅将会触发多个请求

2、Driver实现:

let result = tf.rx.text.orEmpty
    .asDriver()
    .flatMap {
        return self.network(text: $0).asDriver(onErrorJustReturn: "网络请求失败")
    }
result.map{"长度:\(($0 as! String).count)"}
        .drive(label1.rx.text).disposed(by: disposeBag)
result.map{"\($0)"}.drive(label2.rx.text).disposed(by: disposeBag)
  • asDriver()将序列转换为driver序列
  • map重新组合并生成新的序列
  • driver将元素在主线程中绑定到label1label2

相比非driver下的代码实现,driver序列省去了线程的设置,share数据共享设置。

Driver源码探索

断点查看asDriver()方法:

extension ControlProperty {
    /// Converts `ControlProperty` to `Driver` trait.
    ///
    /// `ControlProperty` already can't fail, so no special case needs to be handled.
    public func asDriver() -> Driver<E> {
        return self.asDriver { _ -> Driver<E> in
            #if DEBUG
                rxFatalError("Somehow driver received error from a source that shouldn't fail.")
            #else
                return Driver.empty()
            #endif
        }
    }
}

ControlProperty的扩展方法,返回了一个Driver<E>类,DriverSharedSequence的别名,用来描述不同类型的序列,最后又调用了asDriver方法,而该方法在ObservableConvertibleType的扩展中,一直追踪会发现很多类都是继承自ObservableConvertibleType下。

extension ObservableConvertibleType {
    public func asDriver(onErrorRecover: @escaping (_ error: Swift.Error) -> Driver<E>) -> Driver<E> {
        let source = self
            .asObservable()
            .observeOn(DriverSharingStrategy.scheduler)
            .catchError { error in
                onErrorRecover(error).asObservable()
            }
        return Driver(source)
    }
}

如上代码也设置了observerOn方法,来指定线程,继续深入能够发现DriverSharingStrategy.scheduler内部指定的就是主线程,印证了上面所说的Driver的执行是在主线程的。最后初始化一个Driver对象返回,看一下初始化过程,及对SharedSequence类的初始化,代码如下:

public struct SharedSequence<S: SharingStrategyProtocol, Element> : SharedSequenceConvertibleType {
    public typealias E = Element
    public typealias SharingStrategy = S

    let _source: Observable<E>

    init(_ source: Observable<E>) {
        self._source = S.share(source)
    }
}

此处调用了share并传入了可观察序列,感觉好像在哪见过,此处猜想它是用来共享序列的,使用lldb:po S.self查找share所在位置:RxCocoa.DriverSharingStrategycmd+点击进入,代码如下:

public typealias Driver<E> = SharedSequence<DriverSharingStrategy, E>

public struct DriverSharingStrategy: SharingStrategyProtocol {
    public static var scheduler: SchedulerType { return SharingScheduler.make() }
    public static func share<E>(_ source: Observable<E>) -> Observable<E> {
        return source.share(replay: 1, scope: .whileConnected)
    }
}

在此处传入的序列用来掉用share,使得当前序列作为共享序列,即driver序列为共享序列。

六、Signal

Driver相似,不会产生error事件,在主线程执行,但不会像Driver一样会给新观察者发送上一次发送的元素。

使用如下:

let event : Driver<Void> = button.rx.tap.asDriver()
event.drive(onNext: {
    print("yahibo")
    event.drive(onNext: {
        print("yahibo1")
    }).disposed(by: self.disposeBag)
}).disposed(by: disposeBag)

运行打印,发现在点击后重新订阅的观察者,会直接收到点击事件,这是我们业务不允许的。下面再看Signal序列:

let event : Signal<Void> = button.rx.tap.asSignal()
event.emit(onNext: {
    print("yahibo")
    event.emit(onNext: {
        print("yahibo1")
    }).disposed(by: self.disposeBag)
}).disposed(by: disposeBag)

运行结果,每一个新序列都会在点击后触发。

七、ControlEvent

专门用于描述UI控件所产生的事件,不会产生error事件,在主线程中监听。代码如下:

1、监听点击事件

let event : ControlEvent<Void> = button.rx.tap.asControlEvent()
event.bind(onNext: {
    print("controllerEvent")
}).disposed(by: disposeBag)

2、监听点击事件并绑定数据到其他UI

let event : ControlEvent<Void> = button.rx.tap.asControlEvent()
event.map{"yahibo"}.bind(to: label1.rx.text).disposed(by: disposeBag)

总结:

以上序列都是基于Observable的,是对其更高层的封装,针对不同应用场景设计,简化不同场景下序列的使用流程。

相关文章

网友评论

      本文标题:RxSwift特征序列

      本文链接:https://www.haomeiwen.com/subject/snaorctx.html