美文网首页Swift编程swift
RxSwift核心之调度器Schedulers

RxSwift核心之调度器Schedulers

作者: 慕_風 | 来源:发表于2019-08-01 17:45 被阅读0次

    Schedulers 是RxSwift实现多线程的核心模块。它主要用于控制任务在哪个线程或队列运行。

    小伙伴在平时的开发过程中,肯定都使用过网络请求,网络请求是在后台执行的,获取到数据之后,再在主线程更新UI。

    来一段网络请求伪代码。

    DispatchQueue.global(qos: .userInitiated).async {
        
        let data = try? Data(contentsOf: url)
        
        DispatchQueue.main.async {
            // 更新UI
        }
    }
    

    如果用RxSwift来实现上面的网络请求,则大致是这样的:

    let rxData: Observable<Data> = ...
    
    rxData
        .subscribeOn(ConcurrentDispatchQueueScheduler(qos: .userInitiated))
        .observeOn(MainScheduler.instance)
        .subscribe(onNext: { [weak self](data) in
            // 更新UI
        })
        .disposed(by: disposeBag)
    

    说明:

    1. 我们用 subscribeOn 来决定数据序列的构建函数在哪个 Scheduler 上运行。
      在上面的例子中,由于获取 Data 需要花费很长的时间,所以用 subsribeOn 切换到 后台Scheduler 来获取 Data 。这样就可以避免阻塞主线程。
    2. 我们用 observeOn 来决定在哪个 Scheduler 监听这个数据序列。
      在上面的例子中,通过 observerOn 方法切换到主线程来监听并处理结果。

    下面👇介绍一下RxSwift中的几种调度器

    MainScheduler

    MainScheduler 代表 主线程。如果需要执行和UI相关的任务,就需要切换到该 Scheduler 运行。

    查看其源码:

    可以清晰的知道,在初始化时,在 MainScheduler 对象的内部,绑定了主队列 DispatchQueue.main

    SerialDispatchQueueScheduler

    SerialDispatchQueueScheduler 抽象了 串行DispatchQueue。如果需要执行一些串行任务,可以切换到这个 Scheduler 执行。

    查看其源码:

    在初始化 SerialDispatchQueueScheduler 对象时,需要传入一个 DispatchQueue,保存在 self.configuration 结构体中。

    ConcurrentDispatchQueueScheduler

    ConcurrentDispatchQueueScheduler 抽象了 并行DispatchQueue。如果需要执行一些并发任务,可以切换到这个 Scheduler执行。

    查看其源码:

    ConcurrentDispatchQueueSchedulerSerialDispatchQueueScheduler 思路是一样的。也是在初始化时使用 self.configuration 保存了传入的 DispatchQueue 对象。

    OperationQueueScheduler

    OperationQueueScheduler 抽象了 NSOperationQueue。它具备一些 NSOperationQueue 的特点。例如,可以通过设置 maxConcurrentOperationCount 来控制同时执行并发任务的最大数量。

    查看其源码:

    在初始化 OperationQueueScheduler 对象时,需要传入 OperationQueue优先级queuePriority,作为初始化参数。

    Scheduler的调度执行

    从上一小节的几种调度器的源码可以发现,所有的调度器 Scheduler 都继承自 ImmediateSchedulerType 协议。

    而这个协议只声明了一个 schedule 方法,而通过注释可以知道,在调度器调度执行的时候都会调用这个 schedule 方法。

    SerialDispatchQueueScheduler 调度器为例:

    public final func schedule<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        return self.scheduleInternal(state, action: action)
    }
    
    func scheduleInternal<StateType>(_ state: StateType, action: @escaping (StateType) -> Disposable) -> Disposable {
        return self.configuration.schedule(state, action: action)
    }
    
    public final func scheduleRelative<StateType>(_ state: StateType, dueTime: Foundation.TimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
        return self.configuration.scheduleRelative(state, dueTime: dueTime, action: action)
    }
    
    public func schedulePeriodic<StateType>(_ state: StateType, startAfter: TimeInterval, period: TimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
        return self.configuration.schedulePeriodic(state, startAfter: startAfter, period: period, action: action)
    }
    

    以上四个方法,都是 SerialDispatchQueueScheduler 调度器中的调度方法。

    分析以上方法会发现,最终都会调用 self.configuration 的某个方法。而且查看几种调度器的源码可以知道,Scheduler 中都有一个重要的属性 let configuration: DispatchQueueConfiguration。其中保存了我们需要的队列和leeway信息。

    那么,我们就来分析 DispatchQueueConfiguration 中的方法。

    首先分析 schedule 方法,虽然 schedule 方法中只有寥寥几句代码,但是也清晰的展示其 核心逻辑就是在当前队列下面,异步调度执行了闭包 action(state)

    我们再来看其他方法:

    func scheduleRelative<StateType>(_ state: StateType, dueTime: Foundation.TimeInterval, action: @escaping (StateType) -> Disposable) -> Disposable {
        let deadline = DispatchTime.now() + dispatchInterval(dueTime)
        
        let compositeDisposable = CompositeDisposable()
        
        let timer = DispatchSource.makeTimerSource(queue: self.queue)
        timer.schedule(deadline: deadline, leeway: self.leeway)
        
        // 因篇幅原因,省略部分代码 ...
        timer.setEventHandler(handler: {
            if compositeDisposable.isDisposed {
                return
            }
            _ = compositeDisposable.insert(action(state))
            cancelTimer.dispose()
        })
        timer.resume()
        
        _ = compositeDisposable.insert(cancelTimer)
        
        return compositeDisposable
    }
    
    func schedulePeriodic<StateType>(_ state: StateType, startAfter: TimeInterval, period: TimeInterval, action: @escaping (StateType) -> StateType) -> Disposable {
        let initial = DispatchTime.now() + dispatchInterval(startAfter)
        
        var timerState = state
        
        let timer = DispatchSource.makeTimerSource(queue: self.queue)
        timer.schedule(deadline: initial, repeating: dispatchInterval(period), leeway: self.leeway)
        
        // 因篇幅原因,省略部分代码 ...
        
        timer.setEventHandler(handler: {
            if cancelTimer.isDisposed {
                return
            }
            timerState = action(timerState)
        })
        timer.resume()
        
        return cancelTimer
    }
    

    以上两个方法中虽然没有直接在当前队列中异步调用闭包,但是创建 timer 时,却是在当前队列中创建的,因此 timer 回调时也是在当前队列执行 eventHandler,间接实现当前队列下的调度。

    以上则是调度器 Scheduler 的介绍及简单的调度分析,若有不足之处,请评论指正。

    附图解一张

    相关文章

      网友评论

        本文标题:RxSwift核心之调度器Schedulers

        本文链接:https://www.haomeiwen.com/subject/pupcdctx.html