RxSwift还封装了定时器,下面是一个每隔1秒在主线程执行回调的定时器:
Observable.interval(1, scheduler: MainScheduler.instance)
.subscribe(onNext: { (num) in
print("定时器执行")
}).disposed(by: disposbag)
第一个参数是时间间隔,第二个参数是执行回调的调用环境。
- 我们马上进入源码看一看这个定时器是怎么封装的:
extension ObservableType where Element : RxAbstractInteger {
public static func interval(_ period: Foundation.TimeInterval, scheduler: SchedulerType)
-> Observable<Element> {
return interval(.milliseconds(Int(period * 1000.0)), scheduler: scheduler)
}
}
extension ObservableType where Element : RxAbstractInteger {
public static func interval(_ period: RxTimeInterval, scheduler: SchedulerType)
-> Observable<Element> {
return Timer(
dueTime: period,
period: period,
scheduler: scheduler
)
}
}
在这个Timer
内部可以看到它继承了Producer
,所以Timer
也是一个序列
final private class Timer<Element: RxAbstractInteger>: Producer<Element> {
...
init(dueTime: RxTimeInterval, period: RxTimeInterval?, scheduler: SchedulerType) {
self._scheduler = scheduler
self._dueTime = dueTime
self._period = period
}
override func run<Observer: ObserverType>(_ observer: Observer, cancel: Cancelable) -> (sink: Disposable, subscription: Disposable) where Observer.Element == Element {
if self._period != nil {
let sink = TimerSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
else {
let sink = TimerOneOffSink(parent: self, observer: observer, cancel: cancel)
let subscription = sink.run()
return (sink: sink, subscription: subscription)
}
}
}
- 根据RxSwift核心逻辑,定时器序列在
subscribe
订阅之后便会来到run
函数。在run
函数里面得知,下沉业务由TimerSink
和TimerOneOffSink
负责,我们先看TimerSink
,看懂了TimerSink
自然能懂TimerOneOffSink
:
final private class TimerSink<Observer: ObserverType> : Sink<Observer> where Observer.Element : RxAbstractInteger {
...
init(parent: Parent, observer: Observer, cancel: Cancelable) {
self._parent = parent
super.init(observer: observer, cancel: cancel)
}
func run() -> Disposable {
return self._parent._scheduler.schedulePeriodic(0 as Observer.Element, startAfter: self._parent._dueTime, period: self._parent._period!) { state in
...
}
}
}
-
_scheduler
是一个调度者,以后会详细说说。先看后面的执行函数:
public class SerialDispatchQueueScheduler : SchedulerType {
...
init(serialQueue: DispatchQueue, leeway: DispatchTimeInterval = DispatchTimeInterval.nanoseconds(0)) {
self.configuration = DispatchQueueConfiguration(queue: serialQueue, leeway: leeway)
}
public func schedulePeriodic<StateType>(_ state: StateType, startAfter: RxTimeInterval, period: RxTimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
return self.configuration.schedulePeriodic(state, startAfter: startAfter, period: period, action: action)
}
}
- 最后来到重点,
schedulePeriodic
函数里封装了GCD定时器,所以这个RxSwift定时器不会受到滑动屏幕的影响而停止执行。
extension DispatchQueueConfiguration {
...
func schedulePeriodic<StateType>(_ state: StateType, startAfter: RxTimeInterval, period: RxTimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
let initial = DispatchTime.now() + startAfter
var timerState = state
//真正作用的定时器
let timer = DispatchSource.makeTimerSource(queue: self.queue)
timer.schedule(deadline: initial, repeating: period, leeway: self.leeway)
var timerReference: DispatchSourceTimer? = timer
let cancelTimer = Disposables.create {
timerReference?.cancel()//停止定时器
timerReference = nil
}
//定时执行
timer.setEventHandler(handler: {
if cancelTimer.isDisposed {
return
}
timerState = action(timerState)//执行外面的尾随闭包
})
timer.resume()//定时器启动
return cancelTimer
}
}
- 定时器启动后,便定时执行
action(timerState)
回到外面的尾随闭包,接着执行self.forwardOn(.next(state))
发送响应:
final private class TimerSink<Observer: ObserverType> : Sink<Observer> where Observer.Element : RxAbstractInteger {
...
func run() -> Disposable {
return self._parent._scheduler.schedulePeriodic(0 as Observer.Element, startAfter: self._parent._dueTime, period: self._parent._period!) { state in
self._lock.lock(); defer { self._lock.unlock() }
self.forwardOn(.next(state))//发送响应
return state &+ 1
}
}
}
最后根据RxSwift核心逻辑,最终实现定时响应
.subscribe(onNext: { (num) in
print("定时器执行")
}).disposed(by: disposbag)
网友评论