响应式编程&链式编程
公司的几个共享项目, 有较多后台定时的网络请求,定位和蓝牙操作的组合.
原实现方案是直接把不同操作通过闭包嵌套起来, 如此一来有些比较头疼的问题:
-
异步操作组合出现"回调地狱", 每个组合操作的业务上有变动需要做大修.
-
任务管理不便,无法获取或取消上一次的请求/操作.
-
异步响应不及时可能造成之前的请求后至, 让数据出错. 或在页面退出之后仍然在进行未完成的请求.
网上较推荐的解决方案,就是使用响应式编程框架.
大体分为RxSwift和ReactiveCocoa / ReactiveSwift.
ReactiveCocoa是从OC时代就开始有的, 在Swift时代迁移过来, 开始的时候只是单纯的调用OC源码混编, 到后来衍生出了ReactiveSwift.
而RxSwift则是"原生"Swift诞生的.
刚好公司的项目里面本来已经使用了Swagger来自动生成网络请求业务的代码,
自带Moya框架并选择的是RxSwift.
于是就理所当然用了RxSwift.
串行异步请求处理
举个例子需要更新坐标然后请求附近的设备列表, 需要考虑定位超时或者定位出错的情况(例如没有开启权限), 以及提示用户当前正在更新数据.
分解之后需要实现以下函数:
// 更新坐标
func updateLocation(success: (CLLocation) -> Void, failure: (Error) -> Void)
// 请求设备
func fetchDevices(near location: CLLocation, success: ([Device]) -> Void, failure: (Error) -> Void)
// 更新数据以及界面
func update(with devices: [Device])
// 处理错误
func handlError(_ error: Error)
原来的传统闭包实现方式
self.updateLocation(success: { [weak self] location in
self?.fetchDevices(near: location,
success: { res in
self?.update(with: res)
}
failure: { err in
self?.handlError(err)
})},
failure: { [weak self] err in
self?.handlError(err)
})
类似这样的业务在共享系列项目经常出现, 如:
1.首页地图上轮询用户坐标附近的设备
2.设备列表界面
先讨论第2种情况, 这个时候上面的代码应该在下拉刷新中调用, 就是实际上项目写出的是这种嵌套的代码:
self.tableView.pullToRefresh(action: { [unowned self] in
self.updateLocation(success: { [weak self] location in
self?.fetchDevices(near: location,
success: { res in
self?.update(with: res)
}
failure: { err in
self?.handlError(err)
})},
failure: { [weak self] err in
self?.handlError(err)
})
})
一开始我尝试把更新定位和数据这一段操作封装为一个函数func updateData(), 然后在tableView的下拉刷新闭包里面调用.
如此类推也可以把updateData
里面的操作继续拆分, 视觉上就不会面对这么难看的代码块.
可是即使这样, 下拉刷新之后一系列的请求动作还是没有很直观的呈现出来, 代码也会因此变得相对零散, 最后甚至更加难看出原来的业务逻辑.
这个时候搬出RxSwift. 在接入之后, 代码就可以改成以下这个样子:
self.tableView.rx_pullToRefresh
.flatMap({[unowned self] in self.rx_updateLocation()})
.flatMap({[unowned self] in self.rx_fetchDevices(near: $0)})
.subscribe(onNext: {[weak self] in self?.update(with: $0)},
onError: {[weak self] in self?.handleError($0)})
.disposed(by: self.disposeBag)
然后逐步解析这段代码, 关键点主要有:
-
链式
把原来通过闭包回调的函数都改造成了返回
Observable
的函数, 改造后回调的处理就通过链式调用Observable
的Operator结合函数式编程来传递. -
函数式
把处理回调的函数作为参数传递给
flatMap
和subscribe
函数. -
Error聚合处理
原来每个异步操作都有一个Error的返回, 用闭包回调的方式需要在每次调用函数的时候传入.
而使用RxSwift的话, 其实所有Observable是被聚合成了1个, 就是在subscribe调用的那个, 而之前聚合起来的Observable的Error都可以在subscribe的onError
参数里传递处理.
这段代码仍不够完善, 优化方式后面会提到. 但是显而易见, 这一系列异步操作的条理顺序已经出来了, 而如果中间某一环需要修改, 也并不难操作.
并行异步请求处理
照样举例, 下载多张图片然后更新数据库:
分解之后有以下几个函数:
// 下载图片
func downloadImage(withURL: URL) -> (imageData: Data?, error: Error?)
// 处理下载好的图片
func handleDownloadImage(imageData: Data)
func handleError(error: Error)
// 写入数据库
func updateDatabase()
原来的实现:
let group = DispatchGroup()
DispatchQueue.global().async(group: group, execute: { [unowned self] in
let res = self.downloadImage(withURL: url1)
if let data = res.imageData {
self.handleDownloadImage(imageData: data)
}else if let error = res.error {
self.handleError(error: error)
}
})
DispatchQueue.global().async(group: group, execute: { [unowned self] in
let res = self.downloadImage(withURL: url2)
if let data = res.imageData {
self.handleDownloadImage(imageData: data)
}else if let error = res.error {
self.handleError(error: error)
}
})
DispatchQueue.global().async(group: group, execute: { [unowned self] in
let res = self.downloadImage(withURL: url2)
if let data = res.imageData {
self.handleDownloadImage(imageData: data)
}else if let error = res.error {
self.handleError(error: error)
}
})
group.notify(queue: DispatchQueue.main) { [unowned self] in
self.updateDatabase()
}
RxSwift的做法, 利用Observable
的zip
函数把多个下载任务聚合:
Observable.zip(self.rx_downloadImage(withURL: url1), self.rx_downloadImage(withURL: url2), self.rx_downloadImage(withURL: url3))
.subscribe(onNext: {[unowned self] res in self.updateDatabase()}
onError: {[unowned self] err in self.handleError(error: error)})
.disposed(by: self.disposeBag)
改造耗时的同步函数
ReactiveX框架确实很适合用来处理异步场景,只要使用者习惯了用return Observable来代替block, 并且要在Observable中传递处理结果.
RxSwift提供了一个比较简单的方式创建Observable, 就是Observable的静态函数create
:
Creates an observable sequence from a specified subscribe method implementation.
public static func create(_ subscribe: @escaping (RxSwift.AnyObserver<Self.E>) -> Disposable) -> RxSwift.Observable<Self.E>
这个函数只有一个闭包参数需要传入, 而这个闭包会给我们提供一个Observer
的实例, 我们的操作都会在这个闭包里面执行, 而结果就通过Observer
来派发, 比方上面的下载图片函数:
func rx_downloadImage(withURL url: URL) -> Observable<Data> {
return Observable.create({ [unowned self] observer -> Disposable in
DispatchQueue.global().async({
let res = self.downloadImage(withURL: url)
if let data = res.data {
observer.onNext(data)
observer.onCompleted()
}else if let error = res.error {
observer.onError(error)
}
})
return Disposables.create()
})
}
这样就封装了一个在内部子线程调用旧函数downloadImage
的函数. 对于子线程调用这个需求, 简单地使用了global
队列来异步执行下载.
队列的切换
以上的做法实现了在函数内异步下载并回调的需求, 但是这个做法会让调用者无法控制下载任务在哪个队列执行.
如果我们有必要手动去控制这类函数执行的队列, 可以通过Rx提供的解决方案实现, 就是以下两个函数observeOn
和subscribeOn
:
Wraps the source sequence in order to run its observer callbacks on the specified scheduler.
public func observeOn(_ scheduler: ImmediateSchedulerType) -> PrimitiveSequence<Trait, Element>
Wraps the source sequence in order to run its subscription and unsubscription logic on the specified
public func subscribeOn(_ scheduler: ImmediateSchedulerType)
其中subscribeOn
是指定了Observable
是在什么队列被订阅的, 这个队列同时也会是被订阅的Observable任务执行所在的队列.
而observeOn
则是指定了处理结果所在的队列, 就是我们最后调用subscribe(onNext, onError,...)的时候, 闭包里的任务的执行所在队列.
这两个函数接收的参数ImmediateSchedulerType
是RxSwift定义的协议, 实际上已经帮我们实现了几个结构体可以直接使用, 比较常用的是SerialDispatchQueueScheduler
, ConcurrentDispatchQueueScheduler
, ConcurrentMainScheduler
和MainScheduler
.
我们可以在SerialDispatchQueueScheduler
和ConcurrentDispatchQueueScheduler
的init函数中传入对应的队列.
或者直接指定qos, 类似DispatchQueue的global函数.
通过这2个函数, 可以便捷地把原来同步执行的函数放到子队列里面执行, 然后回到主线程处理结果, 比方:
func rx_downloadImage(withURL url: URL) -> Observable<Data> {
return Observable.create({ [unowned self] observer -> Disposable in
let res = self.downloadImage(withURL: url)
if let data = res.data {
observer.onNext(data)
observer.onCompleted()
}else if let error = res.error {
observer.onError(error)
}
return Disposables.create()
})
}
Observable.zip(self.rx_downloadImage(withURL: url1), self.rx_downloadImage(withURL: url2), self.rx_downloadImage(withURL: url3))
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: DispatchQoS.default))
.observeOn(ConcurrentMainScheduler.asyncInstance)
.subscribe(onNext: {[unowned self] res in self.updateDatabase()}
onError: {[unowned self] err in self.handleError(error: error)})
Observable对于Error报错处理的"缺陷"
在串行异步请求处理里面提到了这样的处理方式并不完善, 因为Observable
有一个特点就是抛出Error之后就会自行销毁订阅.
而这段代码里面, 下拉刷新的Observable是与另外两个异步操作聚合
在一起的, 就是说如果网络请求或者定位的操作抛出Error, 那么用户下一次下拉刷新也是不会被处理的.
所以, 为了避免这个问题, 解决方式有两种:
1.不要把下拉刷新的Observable
与另外两个异步操作的Observable
聚合;
2.拦截另外两个Observable
可能抛出的Error
第1个方案的解决代码:
self.tableView.rx_pullToRefresh
.subscribe(onNext: { [unowned self] in
_ = self.rx_updateLocation()
.flatMap({self.rx_fetchDevices(near: $0)})
.subscribe(onNext: {self.update(with: $0)},
onError: {self.handleError($0)})
})
.disposed(by: self.disposeBag)
这样就可以保证rx_pullToRefresh
会一直被监听, 但是这个处理方式会让代码可读性又下降了.
所幸RxSwift还给我们提供了另外的选择, 利用Observable
的catchError
函数或者catchErrorJustReturn
函数, 我们就可以把以上2个异步操作的错误拦截.
Continues an observable sequence that is terminated by an error with the observable sequence produced by the handler.
public func catchError(_ handler: @escaping (Error) throws -> RxSwift.Observable<Self.E>) -> RxSwift.Observable<Self.E>
Continues an observable sequence that is terminated by an error with a single element.
public func catchErrorJustReturn(_ element: Self.E) -> RxSwift.Observable<Self.E>
其中catchErrorJustReturn
函数可以让我们产生一个默认值来继续事件链,
而catchError
则接收一个闭包参数, 可以通过我们产生的Error来制定一个值给后续事件链或者直接在闭包里面处理Error.
显然这个场景我们是需要处理Error的, 所以选用catchError
. 而这样的话, subscribe
函数的onError
就永远都不会执行了, 等于是把Error的处理提前到了catchError
:
// 处理Error并且返回默认值
func rx_handleError(_ error: Error) -> Observable<[Device]> {
// 处理Error, 生成默认参数
...
return Observable.of(__defaultValue__)
}
// 请求列表的数据(聚合2个异步操作的Observable)
func rx_fetchTableViewData() -> Observable<[Device]> {
return self.rx_updateLocation().flatMap({self.rx_fetchDevices(near: $0)})
}
self.tableView.rx_pullToRefresh
.flatMap({ [unowned self] in
self.rx_fetchTableViewData()
.catchError({self.rx_handleError($0)})
})
.subscribe(onNext: {[weak self] in self?.update(with: $0)})
.disposed(by: self.disposeBag)
以上这样操作算是把Error处理了, 但是还是存在问题, 我们还需要再改进一下, 不要让handleResponseValue
和handleError
的代码分散.
配合enum以及泛型更舒服地处理Error
Swift里面的枚举类型是可以带参数的, 我们可以把我们要的结果抽象出来, 定义一个枚举:
enum Result {
case value([Device])
case error(Error)
}
然后改动一下我们的fetch以及handle方式:
// 改进后的fetch函数把[Device]和Error转换成Result
func rx_fetchTableViewData() -> Observable<Result> {
return self.rx_updateLocation()
.flatMap({self.rx_fetchDevices(near: $0)})
.map({Result.value($0)})
.catchError({Observable.of(Result.error($0))})
}
// 统一处理Result
func handleResult(_ result: Result) {
switch result {
case .value(let value):
self.update(with: value)
case .error(let error):
self.handleError(error)
}
}
最后我们这个下拉刷新->更新定位坐标->获取附近设备->更新table的代码就可以变成:
self.tableView.rx_pullToRefresh
.flatMap({[unowned self] in self.rx_fetchTableViewData()})
.subscribe(onNext: {[weak self] in self?.handleResult($0)})
.disposed(by: self.disposeBag)
但是, 实际上有这种场景的不止是设备列表, 还有比如附近的设备中心(Station), 这个Result
显然可以承担更多的任务.
利用Swift的泛型改进, 让Result
适用于通用的场景:
enum Result<T> {
case value(T)
case error(Error)
}
func rx_fetchDeviceTableViewData() -> Observable<Result<[Device]>> {
return ...
}
func handleDeviceResult(_ result: Result<[Device]>) {
...
}
func rx_fetchStationTableViewData() -> Observable<Result<[Station]>> {
return ...
}
func handleStationResult(_ result: Result<[Station]>) {
...
}
改造原来的GCD异步函数
如果在在项目中途接入Rx, 原来项目中已经存在大量通过GCD回调的函数了.
这个时候把全部函数都改造是很高成本的, 而且部分函数可能在项目中被调用了很多次, 涉及的模块可能比较多, 但是不一定每个调用了这个函数的模块都有必要接入Rx.
在这种情况下通用可以使用Observable的create
函数去封装原来的函数.
比如有一个加载本地数据的函数:
function loadDataFromLocal(filePath: URL, success: (Data)->Void, failure: (Error)->Void) {
...
...
...
}
在不改动原函数的情况下, 增加一个新的函数:
function rx_loadDataFromLocal(filePath: URL) -> Observable<Result<Data>> {
return Observable.create({ observer in
loadDataFromLocal(filePath: filePath,
success: {data in observer.onNext(.value(data))},
failure: {error in observer.onNext(.error(error))})
return Disposables.create()
})
}
这样的函数改造和同步函数的改造一样, 有一个类似的缺陷, 就是不可以再改变loadDataFromLocal
函数在哪个队列执行.
相对的, 这个改造方式可以比较简便地复用现有的函数.
Delegate回调
除了GCD, delegate也是异步回调的一种常用方式, 而delegate回调也是有可能出现在异步函数嵌套组合中.比如: 一个弹出的选择框.
用传统Delegate的做法, 需要在调用端实现delegate的函数.
class ModalViewController: UIViewController {
weak var delegate: ModalViewControllerDelegate? = nil
...
private func internalSelect(at index: Int) {
self.delegate?.modalViewController(didSelectAt: index)
}
}
protocol ModalViewController: NSObjectProtocol {
func modalViewController(didSelectAt index: Int)
}
// 调用端
class ViewController: UIViewController, ModalViewControllerDelegate {
func performSelect() {
let modalViewController = ModalViewController()
modalViewController.delegate = self
self.present(viewController: modalViewController, animated: true, completion: nil)
}
func modalViewController(didSelectAt index: Int) {
// 处理回调
self.handleSelect(at: index)
}
func handleSelect(at index: Int) {
...
}
}
这种做法有一个明显的弊端: 当ViewController
是一个业务量大的页面的时候, 它一般会有很多个这样的回调, 如此一来ViewController
需要在定义的时候就声明实现很多个delegate, 并且会有很多个回调函数. 这样会对ViewController造成很大的代码量和增大了维护难度.
如果运用RxSwift去处理这种回调, 我们可以压缩ViewController
的代码:
设想我们需要ModalViewController
提供一个Observbable
给外部观察选择事件. 那调用的时候就可以直接通过这个Observable知道选中值.如此一来, ViewController
中回调对应的代码就可以控制在performSelect
函数中.
// 调用端
class ViewController: UIViewController {
func performSelect() {
let modalViewController = ModalViewController()
self.present(viewController: modalViewController, animated: true, completion: nil)
modalViewController.rx_didSelect
.subscribe(onNext: { [weak self] index in
// 处理回调
...
})
}
}
为了实现这个目的, 我们只需如下改造ModalViewController
:
class ModalViewController: UIViewController {
// PublishSubject不适合对外公开, 避免外部调用入口函数.
var rx_didSelect: Obsevable<Int> {
return self.rx_internal_didSelect.asObservable()
}
private let rx_internal_didSelect: PublishSubject<Int> = PublishSubject()
...
private func internalSelect(at index: Int) {
self.rx_internal_didSelect.onNext(index)
}
}
取消任务
这个比较容易操作, 使用Observable
的disposed(by:)
函数, 在想要取消这个任务的时候把传入的DisposeBag
实例销毁.
比方说让ViewController
持有一个DisposeBag
, 在ViewController
调用deinit
的时候, 绑定在这个DisposeBag
上的Observable
就都不会继续处理了.
小结
以上是Thinker.vc项目中接入RxSwift对异步/并行操作的优化改进经历.概括来说, 就是:
1.把多层的闭包嵌套降维成链式单层闭包
2.把散落的Error处理整合起来
3.更灵活方便地切换队列
4.更明确展现异步操作之间的关系
5.更方便顺应业务改变异步操作组合
6.可以取消任务响应
持续更新...
网友评论