学习资料:
https://beeth0ven.github.io/RxSwift-Chinese-Documentation/
https://juejin.im/post/5d52ebf36fb9a06b2d77c9d5
1、RxSwift 功能概览
功能概览RxSwift 是 Swift 函数响应式编程的一个开源库。它的目的是让数据/事件流和异步任务能够更方便的序列化处理,可使 Swift 进行响应式编程。其本质是观察者模式。
1、创建流程
1:创建序列
2:订阅序列
3:发送信号
// 1: 创建序列
_ = Observable<String>.create { (obserber) -> Disposable in
// 3:发送信号
obserber.onNext("Cooci - 框架班级")
return Disposables.create() // 这个销毁不影响我们这次的解读
// 2: 订阅序列
}.subscribe(onNext: { (text) in
print("订阅到:\(text)")
})
// “订阅到:Cooci - 框架班级”
RxSwift 把我们程序中每一个操作都看成一个事件,如:TextField 文本改变、按钮点击、网络请求等。每一个事件源就可以看成一个管道(sequence),当状态发生改变时就会有事件 sequence 中流出,通过监听这个 sequence,可以做出相应的处理。
一个 Observable 序列被创建出来后它并不会被激活发出 Event,而是要等到它被某个人订阅了才会激活它。而 Observable 序列激活之后要一直等到它发出了.error 或者 .completed 的 event 后它才被终结。
2、核心
核心内容Observable -- 产生事件
Observer -- 响应事件
Operator -- 创建变化组合事件
Disposable -- 管理绑定(订阅)的生命周期
Schedulers -- 线程队列调配
示例:
// Observable<String>
let text = usernameOutlet.rx.text.orEmpty.asObservable()
// Observable<Bool>
let passwordValid = text
// Operator
.map { $0.count >= minimalUsernameLength }
// Observer<Bool>
let observer = passwordValidOutlet.rx.isHidden
// Disposable
let disposable = passwordValid
// Scheduler 用于控制任务在那个线程队列运行
.subscribeOn(MainScheduler.instance)
.observeOn(MainScheduler.instance)
.bind(to: observer)
...
// 取消绑定,你可以在退出页面时取消绑定
disposable.dispose()
3、RxSwift 和 RxCocoa 的区别:
RxSwift:它只是基于 Swift 语言的 Rx 标准实现接口库,所以 RxSwift 里不包含任何 Cocoa 或者 UI 方面的类。
RxCocoa:是基于 RxSwift 针对于 iOS 开发的一个库,它通过 Extension 的方法给原生的比如 UI 控件添加了 Rx 的特性,使得我们更容易订阅和响应这些控件的事件。
2、Observable -- 可观察序列
1、Observable<T>
Observable<T> 可观察序列,这个类就是 Rx 框架的基础。它的作用就是可以异步地产生一系列的 Event(事件),即一个 Observable<T> 对象会随着时间推移不定期地发出 event(element : T) ,而且这些 Event 还可以携带数据,它的泛型 <T> 就是用来指定这个 Event 携带的数据的类型。有了可观察序列,我们还需要有一个 Observer(订阅者)来订阅它,这样这个订阅者才能收到 Observable<T> 不时发出的 Event。
2、Event
Event 是一个 enum
public enum Event<Element> {
/// Next element is produced.
case next(Element)
/// Sequence terminated with an error.
case error(Swift.Error)
/// Sequence completed successfully.
case completed
}
next:是携带数据 <T> 的事件,可以说它就是一个“最正常”的事件。
error:错误事件。它可以携带具体的错误内容,一旦 Observable 发出了 error event,则这个 Observable 就等于终止了,以后它再也不会发出 event 事件了。
completed:正常结束事件。一旦 Observable 发出了 completed event,则这个 Observable 就等于终止了,以后它再也不会发出 event 事件了。
3、Observable 与 Sequence 比较
SequenceType 是同步的循环,而 Observable 是异步的。
Observable 对象会在有任何 Event 时候,自动将 Event 作为一个参数通过 ObservableType.subscribe(_:) 发出,并不需要使用 next 方法。
4、Observable 的特征序列
特征序列可以帮助我们更准确的描述序列,还可以给我们提供语法糖,让我们能够用更加优雅的方式书写代码,他们是:Single,Completable,Maybe,Driver,Signal,ControlEvent。
Single:Single 是 Observable 的另外一个版本。不像 Observable 可以发出多个元素,它要么只能发出一个元素,要么产生一个 error 事件。
特点:发出一个元素或一个 error 事件;不会共享附加作用。
示例:
//创建
func getRepo(_ repo: String) -> Single<[String: Any]> {
return Single<[String: Any]>.create { single in
let url = URL(string: "https://api.github.com/repos/\(repo)")!
let task = URLSession.shared.dataTask(with: url) {
data, _, error in
if let error = error {
single(.error(error))
return
}
guard let data = data,
let json = try? JSONSerialization.jsonObject(with: data, options: .mutableLeaves),
let result = json as? [String: Any] else {
single(.error(DataError.cantParseJSON))
return
}
single(.success(result))
}
task.resume()
return Disposables.create { task.cancel() }
}
}
//使用:
getRepo("ReactiveX/RxSwift")
.subscribe(onSuccess: { json in
print("JSON: ", json)
}, onError: { error in
print("Error: ", error)
})
.disposed(by: disposeBag)
注:可以对 Observable 调用 .asSingle() 方法,将它转换为 Single。
Completable:Completable 是 Observable 的另外一个版本。不像 Observable 可以发出多个元素,它要么只能产生一个 completed 事件,要么产生一个 error 事件。
特点:发出零个元素;发出一个 completed
事件或者一个 error
事件;不会共享附加作用。
示例
//创建:
func cacheLocally() -> Completable {
return Completable.create { completable in
// Store some data locally
guard success else {
completable(.error(CacheError.failedCaching))
return Disposables.create {}
}
completable(.completed)
return Disposables.create {}
}
}
//使用:
cacheLocally()
.subscribe(onCompleted: {
print("Completed with no error")
}, onError: { error in
print("Completed with an error: \(error.localizedDescription)")
})
.disposed(by: disposeBag)
Maybe:是 Observable
的另外一个版本。它介于 Single 和 Completable 之间,它要么只能发出一个元素,要么产生一个 completed
事件,要么产生一个 error
事件。
特点:发出一个元素或者一个 completed
事件或者一个 error
事件;不会共享附加作用。
示例
//创建
func generateString() -> Maybe<String> {
return Maybe<String>.create { maybe in
maybe(.success("RxSwift"))
// OR
maybe(.completed)
// OR
maybe(.error(error))
return Disposables.create {}
}
}
//使用:
generateString()
.subscribe(onSuccess: { element in
print("Completed with element \(element)")
}, onError: { error in
print("Completed with an error \(error.localizedDescription)")
}, onCompleted: {
print("Completed with no element")
})
.disposed(by: disposeBag)
注:可以对 Observable 调用 .asMaybe() 方法,将它转换为 Maybe。
Driver:Driver 是一个精心准备的特征序列。它主要是为了简化 UI 层的代码。
任何可监听序列都可以被转换为 Driver,只要他满足 3 个条件:不会产生 error
事件;一定在 MainScheduler
监听(主线程监听);共享附加作用。
示例
let results = query.rx.text.asDriver() // 将普通序列转换为 Driver
.throttle(0.3, scheduler: MainScheduler.instance)
.flatMapLatest { query in
fetchAutoCompleteItems(query)
.asDriver(onErrorJustReturn: []) // 仅仅提供发生错误时的备选返回值
}
results
.map { "\($0.count)" }
.drive(resultCount.rx.text) // 这里改用 `drive` 而不是 `bindTo`
.disposed(by: disposeBag) // 这样可以确保必备条件都已经满足了
results
.drive(resultsTableView.rx.items(cellIdentifier: "Cell")) {
(_, result, cell) in
cell.textLabel?.text = "\(result)"
}
.disposed(by: disposeBag)
注:drive 方法只能被 Driver 调用。这意味着,如果你发现代码存在 drive,那么这个序列不会产生错误事件并且一定在主线程监听。这样你可以安全的绑定 UI 元素。
Signal:Signal 和 Driver 相似,唯一的区别是,Driver会对新观察者回放(重新发送)上一个元素,而 Signal 不会对新观察者回放上一个元素。
特点:不会产生 error
事件;一定在 MainScheduler
监听(主线程监听);共享附加作用。
经验:一般情况下状态序列我们会选用 Driver 这个类型,事件序列我们会选用 Signal 这个类型。
ControlEvent:ControlEvent 专门用于描述 UI 控件所产生的事件,它具有以下特征:不会产生 error
事件;一定在 MainScheduler
订阅(主线程订阅);一定在 MainScheduler
监听(主线程监听);共享附加作用。
3、订阅
1、subscribe
创建观察者最直接的方法就是在 Observable 的 subscribe 方法后面描述当事件发生时需要如何做出响应。
let observable = Observable.of("A", "B", "C")
observable.subscribe(onNext: { element in
print(element)
}, onError: { error in
print(error)
}, onCompleted: {
print("completed")
})
2、bind
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
@IBOutlet weak var label: UILabel!
let disposeBag = DisposeBag()
override func viewDidLoad() {
//Observable序列(每隔1秒钟发出一个索引数)
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable
.map { "当前索引数:\($0 )"}
.bind { [weak self](text) in
//收到发出的索引数后显示到label上
self?.label.text = text
}
.disposed(by: disposeBag)
}
}
3、AnyObserver:可以用来描叙任意一种观察者
配合 subscribe 使用:
//观察者
let observer: AnyObserver<String> = AnyObserver { (event) in
switch event {
case .next(let data):
print(data)
case .error(let error):
print(error)
case .completed:
print("completed")
}
}
let observable = Observable.of("A", "B", "C")
observable.subscribe(observer)
/*
A
B
C
completed
*/
配合 bindTo 使用:
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
@IBOutlet weak var label: UILabel!
let disposeBag = DisposeBag()
override func viewDidLoad() {
//观察者
let observer: AnyObserver<String> = AnyObserver { [weak self] (event) in
switch event {
case .next(let text):
//收到发出的索引数后显示到label上
self?.label.text = text
default:
break
}
}
//Observable序列(每隔1秒钟发出一个索引数)
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable
.map { "当前索引数:\($0 )"}
.bind(to: observer)
.disposed(by: disposeBag)
}
}
4、Binder
不会处理错误事件,一旦产生错误事件,在调试环境下将执行 fatalError,在发布环境下将打印错误信息。
特点:不会处理错误事件;确保绑定都是在给定 Scheduler 上执行(默认 MainScheduler)。
import UIKit
import RxSwift
import RxCocoa
class ViewController: UIViewController {
@IBOutlet weak var label: UILabel!
let disposeBag = DisposeBag()
override func viewDidLoad() {
//观察者
let observer: Binder<String> = Binder(label) { (view, text) in
//收到发出的索引数后显示到label上
view.text = text
}
//Observable序列(每隔1秒钟发出一个索引数)
let observable = Observable<Int>.interval(1, scheduler: MainScheduler.instance)
observable
.map { "当前索引数:\($0 )"}
.bind(to: observer)
.disposed(by: disposeBag)
}
}
5、Observable & Observer 既是可监听序列也是观察者
// 作为可监听序列
let observable = textField.rx.text
observable.subscribe(onNext: { text in show(text: text) })
// 作为观察者
let observer = textField.rx.text
let text: Observable<String?> = ...
text.bind(to: observer)
常见控件有:textField 的当前文本,switch 的开关状态,segmentedControl 的选中索引号,datePicker 的选中日期等等。
框架里面定义了一些辅助类型,它们既是可监听序列也是观察者。如:AsyncSubject
,PublishSubject,ReplaySubject,BehaviorSubject,ControlProperty。
AsyncSubject:AsyncSubject 将在源 Observable 产生完成事件后,发出最后一个元素(仅仅只有最后一个元素),如果源 Observable 没有发出任何元素,只有一个完成事件。那 AsyncSubject 也只有一个完成事件。如果源 Observable 因为产生了一个 error 事件而中止, AsyncSubject 就不会发出任何元素,而是将这个 error 事件发送出来。
示例
let disposeBag = DisposeBag()
let subject = AsyncSubject<String>()
subject
.subscribe { print("Subscription: 1 Event:", $0) }
.disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")
subject.onNext("🐹")
subject.onCompleted()
//结果
Subscription: 1 Event: next(🐹)
Subscription: 1 Event: completed
PublishSubject:PublishSubject 将对观察者发送订阅后产生的元素,而在订阅前发出的元素将不会发送给观察者。如果你希望观察者接收到所有的元素,你可以通过使用 Observable
的 create
方法来创建 Observable
,或者使用 ReplaySubject。如果源 Observable 因为产生了一个 error 事件而中止, PublishSubject 就不会发出任何元素,而是将这个 error 事件发送出来。
let disposeBag = DisposeBag()
let subject = PublishSubject<String>()
subject
.subscribe { print("Subscription: 1 Event:", $0) }
.disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")
subject
.subscribe { print("Subscription: 2 Event:", $0) }
.disposed(by: disposeBag)
subject.onNext("🅰️")
subject.onNext("🅱️")
//结果
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)
ReplaySubject:ReplaySubject 将对观察者发送全部的元素,无论观察者是何时进行订阅的。
这里存在多个版本的 ReplaySubject,有的只会将最新的 n 个元素发送给观察者,有的只会将限制时间段内最新的元素发送给观察者。
如果把 ReplaySubject 当作观察者来使用,注意不要在多个线程调用 onNext, onError 或 onCompleted。这样会导致无序调用,将造成意想不到的结果。
let disposeBag = DisposeBag()
let subject = ReplaySubject<String>.create(bufferSize: 1)
subject
.subscribe { print("Subscription: 1 Event:", $0) }
.disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")
subject
.subscribe { print("Subscription: 2 Event:", $0) }
.disposed(by: disposeBag)
subject.onNext("🅰️")
//结果
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 2 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)
BehaviorSubject:当观察者对 BehaviorSubject 进行订阅时,它会将源 Observable 中最新的元素发送出来(如果不存在最新的元素,就发出默认元素)。然后将随后产生的元素发送出来。如果源 Observable 因为产生了一个 error 事件而中止, BehaviorSubject 就不会发出任何元素,而是将这个 error 事件发送出来。
let disposeBag = DisposeBag()
let subject = BehaviorSubject(value: "🔴")
subject
.subscribe { print("Subscription: 1 Event:", $0) }
.disposed(by: disposeBag)
subject.onNext("🐶")
subject.onNext("🐱")
subject
.subscribe { print("Subscription: 2 Event:", $0) }
.disposed(by: disposeBag)
subject.onNext("🅰️")
subject.onNext("🅱️")
subject
.subscribe { print("Subscription: 3 Event:", $0) }
.disposed(by: disposeBag)
subject.onNext("🍐")
subject.onNext("🍊")
//结果
Subscription: 1 Event: next(🔴)
Subscription: 1 Event: next(🐶)
Subscription: 1 Event: next(🐱)
Subscription: 2 Event: next(🐱)
Subscription: 1 Event: next(🅰️)
Subscription: 2 Event: next(🅰️)
Subscription: 1 Event: next(🅱️)
Subscription: 2 Event: next(🅱️)
Subscription: 3 Event: next(🅱️)
Subscription: 1 Event: next(🍐)
Subscription: 2 Event: next(🍐)
Subscription: 3 Event: next(🍐)
Subscription: 1 Event: next(🍊)
Subscription: 2 Event: next(🍊)
Subscription: 3 Event: next(🍊)
ControlProperty:ControlProperty 专门用于描述 UI 控件属性的,它具有以下特征:不会产生 error
事件;一定在 MainScheduler
订阅(主线程订阅);一定在 MainScheduler
监听(主线程监听);共享附加作用。
4、销毁(Dispose)
通常来说,一个序列如果发出了 error 或者 completed 事件,那么所有内部资源都会被释放。如果你需要提前释放这些资源或取消订阅的话,那么你可以对返回的可被清除的资源(Disposable) 调用 dispose 方法。
1、dispose()
通过调用 dispose() 方法把这个订阅给销毁掉,防止内存泄漏。当一个订阅行为被 dispose 了,那么之后 observable 如果再发出 event,这个已经 dispose 的订阅就收不到消息了。
let observable = Observable.of("A", "B", "C")
//使用subscription常量存储这个订阅方法
let subscription = observable.subscribe { event in
print(event)
}
//调用这个订阅的dispose()方法
subscription.dispose()
2、DisposeBag
管理多个订阅行为的销毁。我们可以把一个 DisposeBag 对象看成一个垃圾袋,把用过的订阅行为都放进去。而这个 DisposeBag 就会在自己快要 dealloc 的时候,对它里面的所有订阅行为都调用 dispose()方法。
let disposeBag = DisposeBag()
//第1个Observable,及其订阅
let observable1 = Observable.of("A", "B", "C")
observable1.subscribe { event in
print(event)
}.disposed(by: disposeBag)
//第2个Observable,及其订阅
let observable2 = Observable.of(1, 2, 3)
observable2.subscribe { event in
print(event)
}.disposed(by: disposeBag)
注:在 退出页面的时候(viewWillDisappear / viewDidDisappear)调用以下方法
3、takeUntil
override func viewDidLoad() {
super.viewDidLoad()
_ = usernameValid
.takeUntil(self.rx.deallocated)
.bind(to: usernameValidOutlet.rx.isHidden)
_ = everythingValid
.takeUntil(self.rx.deallocated)
.bind(to: doSomethingOutlet.rx.isEnabled)
_ = doSomethingOutlet.rx.tap
.takeUntil(self.rx.deallocated)
.subscribe(onNext: { [weak self] in self?.showAlert() })
}
5、调度者(Scheduler)
调度器(Schedulers)是 RxSwift 实现多线程的核心模块,它主要用于控制任务在哪个线程或队列运行。
调度器
示例
// 后台取得数据,主线程处理结果
DispatchQueue.global(qos: .userInitiated).async {
let data = try? Data(contentsOf: url)
DispatchQueue.main.async {
self.data = data
}
}
//RxSwift 实现
let rxData: Observable<Data> = ...
rxData
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
.observeOn(MainScheduler.instance)
.subscribe(onNext: { [weak self] data in
self?.data = data
})
.disposed(by: disposeBag)
subscribeOn:来决定数据序列的构建函数在哪个 Scheduler 上运行。耗时操作要放到后台 Scheduler。
observeOn:来决定在哪个 Scheduler 监听这个数据序列。
MainScheduler:代表主线程。
SerialDispatchQueueScheduler:抽象了串行 DispatchQueue。如果你需要执行一些串行任务,可以切换到这个 Scheduler 运行。
ConcurrentDispatchQueueScheduler:抽象了并行 DispatchQueue。如果你需要执行一些并发任务,可以切换到这个 Scheduler 运行。
OperationQueueScheduler:抽象了 NSOperationQueue。它具备 NSOperationQueue 的一些特点,例如,你可以通过设置 maxConcurrentOperationCount,来控制同时执行并发任务的最大数量。
6、错误处理
一旦序列里面产出了一个 error
事件,整个序列将被终止。RxSwift 主要有两种错误处理机制:retry - 重试;catch - 恢复。
retry - 重试:可以让序列在发生错误后重试。
// 请求 JSON 失败时,立即重试,
// 重试 3 次后仍然失败,就将错误抛出
let rxJson: Observable<JSON> = ...
rxJson
.retry(3)
.subscribe(onNext: { json in
print("取得 JSON 成功: \(json)")
}, onError: { error in
print("取得 JSON 失败: \(error)")
})
.disposed(by: disposeBag)
retryWhen:在发生错误时,经过一段延时后重试。
// 请求 JSON 失败时,等待 5 秒后重试,
let retryDelay: Double = 5 // 重试延时 5 秒
rxJson
.retryWhen { (rxError: Observable<Error>) -> Observable<Int> in
return Observable.timer(retryDelay, scheduler: MainScheduler.instance)
}
.subscribe(onNext: { json in
print("取得 JSON 成功: \(json)")
}, onError: { error in
print("取得 JSON 失败: \(error)")
})
.disposed(by: disposeBag)
// 请求 JSON 失败时,等待 5 秒后重试,
// 重试 4 次后仍然失败,就将错误抛出
let maxRetryCount = 4 // 最多重试 4 次
let retryDelay: Double = 5 // 重试延时 5 秒
rxJson
.retryWhen { (rxError: Observable<Error>) -> Observable<Int> in
return rxError.flatMapWithIndex { (error, index) -> Observable<Int> in
guard index < maxRetryCount else {
return Observable.error(error)
}
return Observable<Int>.timer(retryDelay, scheduler: MainScheduler.instance)
}
}
.subscribe(onNext: { json in
print("取得 JSON 成功: \(json)")
}, onError: { error in
print("取得 JSON 失败: \(error)")
})
.disposed(by: disposeBag)
catch - 恢复:可以在错误产生时,用一个备用元素或者一组备用元素将错误替换掉。
示例
searchBar.rx.text.orEmpty
...
.flatMapLatest { query -> Observable<[Repository]> in
...
return searchGitHub(query)
.catchErrorJustReturn([])
}
...
.bind(to: ...)
.disposed(by: disposeBag)
注:用到了catchErrorJustReturn,当错误产生时,就返回一个空数组。
// 先从网络获取数据,如果获取失败了,就从本地缓存获取数据
let rxData: Observable<Data> = ... // 网络请求的数据
let cahcedData: Observable<Data> = ... // 之前本地缓存的数据
rxData
.catchError { _ in cahcedData }
.subscribe(onNext: { date in
print("获取数据成功: \(date.count)")
})
.disposed(by: disposeBag)
Result:
public enum Result<Success, Failure> where Failure : Error {
case success(Success)
case failure(Failure)
}
updateUserInfoButton.rx.tap
.withLatestFrom(rxUserInfo)
.flatMapLatest { userInfo -> Observable<Result<Void, Error>> in
return update(userInfo)
.map(Result.success) // 转换成 Result
.catchError { error in Observable.just(Result.failure(error)) }
}
.observeOn(MainScheduler.instance)
.subscribe(onNext: { result in
switch result { // 处理 Result
case .success:
print("用户信息更新成功")
case .failure(let error):
print("用户信息更新失败: \(error.localizedDescription)")
}
})
.disposed(by: disposeBag)
分析:
将错误事件包装成 Result.failure(Error) 元素,就不会终止整个序列。
即便网络请求失败了,整个订阅依然存在。如果用户再次点击更新按钮,也是能够发起网络请求进行更新操作的。
网友评论