美文网首页
协程的信号量

协程的信号量

作者: 小城哇哇 | 来源:发表于2023-05-19 13:58 被阅读0次

    开篇预警:继续往下读之前,请确认已具备下面列的知识,本文不做过多的额外讲解哈。包括但不限于:

    • 多线程编程及资源同步
    • Kotlin 基础
    • Kotlin 协程的基本概念及其使用

    信号量

    信号量是什么?借用度娘的解释:

    信号量 (Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用。

    其中,关键词有两个:多线程并发。所以说,信号量是解决多线程下并发问题的一个概念

    借着上面的解释,举个不恰当但又容易理解的例子:

    两座山之间,有一道索桥,索桥本身有重量限制,所以不能同时承载超量的人在桥上走。因此,桥两端各有一个路灯,作为信号灯,灯亮,可上桥;灯灭,桥上已有超额的人,有危险,不可上桥,。

    例子很蹩脚,但是,「信号灯」是不是就有了?索桥就是所谓的「有限性资源」,只有信号灯允许时,才能安全地上桥 —— 这也就是信号量的作用:保证资源有限地、安全地访问

    协程的信号量

    协程,在 Kotlin 的世界里,就是一个优雅的后台任务工具,「后台」两个字一出,必然就可能引出多线程安全的问题。

    比如说,应用启动后,有一堆资源由数据库加载而来,但是数据库的内容,又可能需要拉取接口更新。这样一来,数据加载完成的时间就不可控了,如果不加控制,那读取方拿到数据的完整性、可靠性就都不确定了。

    这样的场景,虽然可以通过同步来解决,但是好像又有点儿过于严格了,毕竟,读取也需要锁住啊。

    而信号量在这儿就能派上用场了。信号量,就像是钥匙,可以只有一把,一次允许一个人开门,也可以有多把,分发多人,谁用完,谁就传递给等待钥匙的人;但是使用钥匙的人之间,却互不影响。

    Semaphore

    Kotlin 中的信号量,定义成了一个接口,为 Semaphore,以下,是官方文档的介绍:

    A counting semaphore for coroutines that logically maintains a number of available permits. Each acquire takes a single permit or suspends until it is available. Each release adds a permit, potentially releasing a suspended acquirer. Semaphore is fair and maintains a FIFO order of acquirers.

    解释下意思:Semaphore 控制了可数的信号量,每一个请求,如果有可用的信号量,则消耗一个;如果没有,则悬挂而等待,直到有信号量释放。信号量控制遵循「先进先出」原则,谁先请求,谁先拿到。

    接口定义如下:

    public interface Semaphore {
        /**
         *  信号量允许数
         */
        public val availablePermits: Int
    
        /**
         * 请求信号量
         */
        public suspend fun acquire()
    
        /**
         * 请示信号量(直接给出请求结果,无suspend)
         */
        public fun tryAcquire(): Boolean
    
        /**
         * 释放一个信号量
         */
        public fun release()
    }
    
    

    很简单的四个方法。获取一个 Semaphore 可以通过如下方法:

    public fun Semaphore(permits: Int, acquiredPermits: Int = 0): Semaphore = SemaphoreImpl(permits, acquiredPermits)
    
    

    其中,SemaphoreImpl 就是系统的实现。

    案例

    好了,基础知识介绍了,案例才能加深理解。

    1

    private val semaphore = Semaphore(1, 0)
    
    private suspend fun postTask(name: String) {
        println("$name acquiring")
        semaphore.acquire()
        println("$name acquired")
        delay(1000L)
        println("$name released")
        semaphore.release()
    }
    
    fun main(array: Array<String>) = runBlocking  {
    
        for (i in 0 until 3) {
            GlobalScope.launch {
                postTask("task-$i")
            }
        }
        println("all posted")
        GlobalScope.launch {
            while (true) {
                // 进度模拟
                print(".")
                delay(200)
            }
        }
        delay(5_000L)
        println("done")
    }
    
    

    其输出结果:

    all posted
    task-0 acquiring
    task-1 acquiring
    task-0 acquired
    task-2 acquiring
    .....task-0 released
    task-1 acquired
    .....task-1 released
    task-2 acquired
    .....task-2 released
    ..........done
    
    

    好,来具体分析下输出。

    首先,postTask 模拟一个任务,进入时,都需要获取信号量,然后执行 1 秒,完毕后,释放信号量,供可能的其他线程使用。

    main 执行时,循环直接完成了三个任务的启动,所以输出了 「all posted」。

    三个任务基本同时准备请求信号量(「acquiring」),但是只有 0 号任务成功(「acquired」),然后它开始执行任务。

    1 秒后(已经输出「.....」了)任务结束,释放信号量,然后 1 号任务马上请求成功(「acquired」),开始自己的任务执行。同样地,后来是 2 号任务。

    当全部任务执行完毕后,再等几秒,main 结束。

    2

    现在,我们把「信号量允许数」改成 2,即:一次允许两个请求同时满足。

    private val semaphore = Semaphore(2, 0)
    
    

    执行结果:

    all posted
    task-0 acquiring
    task-1 acquiring
    task-2 acquiring
    task-0 acquired
    .task-1 acquired
    ....task-0 released
    task-1 released
    task-2 acquired
    .....task-2 released
    ...............done
    
    

    可以看到,几乎同时,0 号和 1 号任务就拿到了准入,开始执行自己的任务。但因为允许数是 2,2 号任务就只能等到有信号释放才能执行了。

    3

    再来。如果注释掉 semaphore.release(),信号量没有释放,2 号任务就无法执行了:

    all posted
    task-1 acquiring
    task-0 acquiring
    task-0 acquired
    task-2 acquiring
    task-1 acquired
    .....task-1 released
    task-0 released
    ....................done
    
    

    4

    还有一个方法没使用呢:tryAcquire(),来添加一个「尝试执行」的任务:

    private suspend fun tryPostTask(name: String) {
        println("try $name acquiring")
        if (semaphore.tryAcquire()) { // 尝试请求准入
            println("try $name acquired")
            delay(1500)
            println("$name released")
            semaphore.release()
        } else {
            println("try $name failed")
        }
    }
    
    fun main(array: Array<String>) = runBlocking  {
        GlobalScope.launch {
            tryPostTask("before")
        }
        for (i in 0 until 3) {
            GlobalScope.launch {
                postTask("task-$i")
            }
        }
        println("all posted")
        GlobalScope.launch {
            tryPostTask("before")
        }
        
        // ...
    }
    
    

    在循环前后分别加了一个 try 的任务(允许数还是 2)。执行结果:

    try before acquiring
    task-0 acquiring
    task-1 acquiring
    try before acquired
    all posted
    task-2 acquiring
    task-0 acquired
    try after acquiring
    try after failed
    .....task-0 released
    task-1 acquired
    ...before released
    task-2 acquired
    ..task-1 released
    ...task-2 released
    ............done
    
    

    嗯,值得分析。

    首先,before 的 try 任务,率先请求成功并执行,其他三个任务也发出请求,但只有 0 号任务请求成功 —— 因为这两个任务就消耗了所有的信号量。

    后面紧着 after 的 try 任务,因为当前没有可用的信号量,它失败了,不会执行任务而返回。

    0 号任务执行时间比 before 短,先于它完毕,释放了一个信号,1 号任务等到了,开始执行;接着就是 2 号,直到全部结束。

    tryAcquire 的行为适用于那些非强制性但又有资源要求的任务,即:可以不执行,但执行就必须满足条件。

    withPermit

    前面的案例中,任务的执行,其实是一个标准的行为模式:请求准入 --> 获准后执行 --> 释放准入

    这个模式有专门的官方实现,是 Semaphore 的扩展方法 withPermit():

    public suspend inline fun <T> Semaphore.withPermit(action: () -> T): T {
        contract {
            callsInPlace(action, InvocationKind.EXACTLY_ONCE)
        }
    
        acquire()
        try {
            return action()
        } finally {
            release()
        }
    }
    
    

    标准的步骤:acquire -> action -> release。使用这个方法,就不用操心实际应用的时候,因忘掉释放信号造成未知错误了,这个方法保证请求和释放成对执行。

    可以修改前面的案例任务了:

    private suspend fun postTask(name: String) {
        println("$name acquiring")
        semaphore.withPermit {
            println("$name acquired")
            delay(1000L)
            println("$name released")
        }
    }
    
    

    小结

    从案例的实践可以看到,在合适的应用场景下,利用 Kotlin 协程的信号量,简单的几句调用,就能发挥出「钥匙」之功,实现「门」的作用。「钥匙」数一把、多把皆可。一把的时候,其实就相当于同步锁了,起到独立读写的作用;多把的时候,就是,起到「准入」效果。

    相关文章

      网友评论

          本文标题:协程的信号量

          本文链接:https://www.haomeiwen.com/subject/lpfnsdtx.html