Schedulers 是RxSwift实现多线程的核心模块。它主要用于控制任务在哪个线程或队列运行。
小伙伴在平时的开发过程中,肯定都使用过网络请求,网络请求是在后台执行的,获取到数据之后,再在主线程更新UI。
来一段网络请求伪代码。
DispatchQueue.global(qos: .userInitiated).async {
let data = try? Data(contentsOf: url)
DispatchQueue.main.async {
// 更新UI
}
}
如果用RxSwift来实现上面的网络请求,则大致是这样的:
let rxData: Observable<Data> = ...
rxData
.subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
.observeOn(MainScheduler.instance)
.subscribe(onNext: { [weak self](data) in
// 更新UI
})
.disposed(by: disposeBag)
说明:
- 我们用
subscribeOn
来决定数据序列的构建函数在哪个 Scheduler 上运行。
在上面的例子中,由于获取Data
需要花费很长的时间,所以用subsribeOn
切换到 后台Scheduler 来获取Data
。这样就可以避免阻塞主线程。 - 我们用
observeOn
来决定在哪个 Scheduler 监听这个数据序列。
在上面的例子中,通过observerOn
方法切换到主线程来监听并处理结果。
下面👇介绍一下RxSwift中的几种调度器
MainScheduler
MainScheduler 代表 主线程。如果需要执行和UI相关的任务,就需要切换到该 Scheduler 运行。
查看其源码:
![](https://img.haomeiwen.com/i1820747/6d0b20fae6c30ed0.jpg)
MainScheduler
对象的内部,绑定了主队列 DispatchQueue.main
。
SerialDispatchQueueScheduler
SerialDispatchQueueScheduler 抽象了 串行DispatchQueue。如果需要执行一些串行任务,可以切换到这个 Scheduler 执行。
查看其源码:
![](https://img.haomeiwen.com/i1820747/0ce40d02da125e28.jpg)
SerialDispatchQueueScheduler
对象时,需要传入一个 DispatchQueue
,保存在 self.configuration
结构体中。
ConcurrentDispatchQueueScheduler
ConcurrentDispatchQueueScheduler 抽象了 并行DispatchQueue。如果需要执行一些并发任务,可以切换到这个 Scheduler执行。
查看其源码:
![](https://img.haomeiwen.com/i1820747/534e69c7c0c05a90.jpg)
self.configuration
保存了传入的 DispatchQueue
对象。
OperationQueueScheduler
OperationQueueScheduler 抽象了 NSOperationQueue。它具备一些 NSOperationQueue 的特点。例如,可以通过设置
maxConcurrentOperationCount
来控制同时执行并发任务的最大数量。
查看其源码:
![](https://img.haomeiwen.com/i1820747/14ed72bed5485098.jpg)
OperationQueueScheduler
对象时,需要传入 OperationQueue
和 优先级queuePriority
,作为初始化参数。
Scheduler的调度执行
从上一小节的几种调度器的源码可以发现,所有的调度器 Scheduler
都继承自 ImmediateSchedulerType
协议。
![](https://img.haomeiwen.com/i1820747/a490b7b49cf95722.jpg)
schedule
方法,而通过注释可以知道,在调度器调度执行的时候都会调用这个 schedule
方法。
以 SerialDispatchQueueScheduler
调度器为例:
public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.scheduleInternal(state, action: action)
}
func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.configuration.schedule(state, action: action)
}
public final func scheduleRelative<StateType>(_ state: StateType, dueTime: Foundation.TimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
return self.configuration.scheduleRelative(state, dueTime: dueTime, action: action)
}
public func schedulePeriodic<StateType>(_ state: StateType, startAfter: TimeInterval, period: TimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
return self.configuration.schedulePeriodic(state, startAfter: startAfter, period: period, action: action)
}
以上四个方法,都是 SerialDispatchQueueScheduler
调度器中的调度方法。
分析以上方法会发现,最终都会调用 self.configuration
的某个方法。而且查看几种调度器的源码可以知道,Scheduler 中都有一个重要的属性 let configuration: DispatchQueueConfiguration
。其中保存了我们需要的队列和leeway信息。
那么,我们就来分析 DispatchQueueConfiguration
中的方法。
![](https://img.haomeiwen.com/i1820747/651f55c3f3ec67d0.jpg)
schedule
方法,虽然 schedule
方法中只有寥寥几句代码,但是也清晰的展示其 核心逻辑:就是在当前队列下面,异步调度执行了闭包 action(state)
。
我们再来看其他方法:
func scheduleRelative<StateType>(_ state: StateType, dueTime: Foundation.TimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
let deadline = DispatchTime.now() + dispatchInterval(dueTime)
let compositeDisposable = CompositeDisposable()
let timer = DispatchSource.makeTimerSource(queue: self.queue)
timer.schedule(deadline: deadline, leeway: self.leeway)
// 因篇幅原因,省略部分代码 ...
timer.setEventHandler(handler: {
if compositeDisposable.isDisposed {
return
}
_ = compositeDisposable.insert(action(state))
cancelTimer.dispose()
})
timer.resume()
_ = compositeDisposable.insert(cancelTimer)
return compositeDisposable
}
func schedulePeriodic<StateType>(_ state: StateType, startAfter: TimeInterval, period: TimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
let initial = DispatchTime.now() + dispatchInterval(startAfter)
var timerState = state
let timer = DispatchSource.makeTimerSource(queue: self.queue)
timer.schedule(deadline: initial, repeating: dispatchInterval(period), leeway: self.leeway)
// 因篇幅原因,省略部分代码 ...
timer.setEventHandler(handler: {
if cancelTimer.isDisposed {
return
}
timerState = action(timerState)
})
timer.resume()
return cancelTimer
}
以上两个方法中虽然没有直接在当前队列中异步调用闭包,但是创建 timer
时,却是在当前队列中创建的,因此 timer
回调时也是在当前队列执行 eventHandler
,间接实现当前队列下的调度。
以上则是调度器 Scheduler 的介绍及简单的调度分析,若有不足之处,请评论指正。
附图解一张
![]()
网友评论