开篇预警:继续往下读之前,请确认已具备下面列的知识,本文不做过多的额外讲解哈。包括但不限于:
- 多线程编程及资源同步
- Kotlin 基础
- Kotlin 协程的基本概念及其使用
信号量
信号量是什么?借用度娘的解释:
信号量 (Semaphore),有时被称为信号灯,是在多线程环境下使用的一种设施,是可以用来保证两个或多个关键代码段不被并发调用。
其中,关键词有两个:多线程,并发。所以说,信号量是解决多线程下并发问题的一个概念。
借着上面的解释,举个不恰当但又容易理解的例子:
两座山之间,有一道索桥,索桥本身有重量限制,所以不能同时承载超量的人在桥上走。因此,桥两端各有一个路灯,作为信号灯,灯亮,可上桥;灯灭,桥上已有超额的人,有危险,不可上桥,。
例子很蹩脚,但是,「信号灯」是不是就有了?索桥就是所谓的「有限性资源」,只有信号灯允许时,才能安全地上桥 —— 这也就是信号量的作用:保证资源有限地、安全地访问
协程的信号量
协程,在 Kotlin 的世界里,就是一个优雅的后台任务工具,「后台」两个字一出,必然就可能引出多线程安全的问题。
比如说,应用启动后,有一堆资源由数据库加载而来,但是数据库的内容,又可能需要拉取接口更新。这样一来,数据加载完成的时间就不可控了,如果不加控制,那读取方拿到数据的完整性、可靠性就都不确定了。
这样的场景,虽然可以通过同步来解决,但是好像又有点儿过于严格了,毕竟,读取也需要锁住啊。
而信号量在这儿就能派上用场了。信号量,就像是钥匙,可以只有一把,一次允许一个人开门,也可以有多把,分发多人,谁用完,谁就传递给等待钥匙的人;但是使用钥匙的人之间,却互不影响。
Semaphore
Kotlin 中的信号量,定义成了一个接口,为 Semaphore
,以下,是官方文档的介绍:
A counting semaphore for coroutines that logically maintains a number of available permits. Each acquire takes a single permit or suspends until it is available. Each release adds a permit, potentially releasing a suspended acquirer. Semaphore is fair and maintains a FIFO order of acquirers.
解释下意思:Semaphore
控制了可数的信号量,每一个请求,如果有可用的信号量,则消耗一个;如果没有,则悬挂而等待,直到有信号量释放。信号量控制遵循「先进先出」原则,谁先请求,谁先拿到。
接口定义如下:
public interface Semaphore {
/**
* 信号量允许数
*/
public val availablePermits: Int
/**
* 请求信号量
*/
public suspend fun acquire()
/**
* 请示信号量(直接给出请求结果,无suspend)
*/
public fun tryAcquire(): Boolean
/**
* 释放一个信号量
*/
public fun release()
}
很简单的四个方法。获取一个 Semaphore
可以通过如下方法:
public fun Semaphore(permits: Int, acquiredPermits: Int = 0): Semaphore = SemaphoreImpl(permits, acquiredPermits)
其中,SemaphoreImpl
就是系统的实现。
案例
好了,基础知识介绍了,案例才能加深理解。
1
private val semaphore = Semaphore(1, 0)
private suspend fun postTask(name: String) {
println("$name acquiring")
semaphore.acquire()
println("$name acquired")
delay(1000L)
println("$name released")
semaphore.release()
}
fun main(array: Array<String>) = runBlocking {
for (i in 0 until 3) {
GlobalScope.launch {
postTask("task-$i")
}
}
println("all posted")
GlobalScope.launch {
while (true) {
// 进度模拟
print(".")
delay(200)
}
}
delay(5_000L)
println("done")
}
其输出结果:
all posted
task-0 acquiring
task-1 acquiring
task-0 acquired
task-2 acquiring
.....task-0 released
task-1 acquired
.....task-1 released
task-2 acquired
.....task-2 released
..........done
好,来具体分析下输出。
首先,postTask
模拟一个任务,进入时,都需要获取信号量,然后执行 1 秒,完毕后,释放信号量,供可能的其他线程使用。
main 执行时,循环直接完成了三个任务的启动,所以输出了 「all posted」。
三个任务基本同时准备请求信号量(「acquiring」),但是只有 0 号任务成功(「acquired」),然后它开始执行任务。
1 秒后(已经输出「.....」了)任务结束,释放信号量,然后 1 号任务马上请求成功(「acquired」),开始自己的任务执行。同样地,后来是 2 号任务。
当全部任务执行完毕后,再等几秒,main 结束。
2
现在,我们把「信号量允许数」改成 2,即:一次允许两个请求同时满足。
private val semaphore = Semaphore(2, 0)
执行结果:
all posted
task-0 acquiring
task-1 acquiring
task-2 acquiring
task-0 acquired
.task-1 acquired
....task-0 released
task-1 released
task-2 acquired
.....task-2 released
...............done
可以看到,几乎同时,0 号和 1 号任务就拿到了准入,开始执行自己的任务。但因为允许数是 2,2 号任务就只能等到有信号释放才能执行了。
3
再来。如果注释掉 semaphore.release()
,信号量没有释放,2 号任务就无法执行了:
all posted
task-1 acquiring
task-0 acquiring
task-0 acquired
task-2 acquiring
task-1 acquired
.....task-1 released
task-0 released
....................done
4
还有一个方法没使用呢:tryAcquire()
,来添加一个「尝试执行」的任务:
private suspend fun tryPostTask(name: String) {
println("try $name acquiring")
if (semaphore.tryAcquire()) { // 尝试请求准入
println("try $name acquired")
delay(1500)
println("$name released")
semaphore.release()
} else {
println("try $name failed")
}
}
fun main(array: Array<String>) = runBlocking {
GlobalScope.launch {
tryPostTask("before")
}
for (i in 0 until 3) {
GlobalScope.launch {
postTask("task-$i")
}
}
println("all posted")
GlobalScope.launch {
tryPostTask("before")
}
// ...
}
在循环前后分别加了一个 try 的任务(允许数还是 2)。执行结果:
try before acquiring
task-0 acquiring
task-1 acquiring
try before acquired
all posted
task-2 acquiring
task-0 acquired
try after acquiring
try after failed
.....task-0 released
task-1 acquired
...before released
task-2 acquired
..task-1 released
...task-2 released
............done
嗯,值得分析。
首先,before 的 try 任务,率先请求成功并执行,其他三个任务也发出请求,但只有 0 号任务请求成功 —— 因为这两个任务就消耗了所有的信号量。
后面紧着 after 的 try 任务,因为当前没有可用的信号量,它失败了,不会执行任务而返回。
0 号任务执行时间比 before 短,先于它完毕,释放了一个信号,1 号任务等到了,开始执行;接着就是 2 号,直到全部结束。
tryAcquire
的行为适用于那些非强制性但又有资源要求的任务,即:可以不执行,但执行就必须满足条件。
withPermit
前面的案例中,任务的执行,其实是一个标准的行为模式:请求准入
--> 获准后执行
--> 释放准入
。
这个模式有专门的官方实现,是 Semaphore
的扩展方法 withPermit()
:
public suspend inline fun <T> Semaphore.withPermit(action: () -> T): T {
contract {
callsInPlace(action, InvocationKind.EXACTLY_ONCE)
}
acquire()
try {
return action()
} finally {
release()
}
}
标准的步骤:acquire -> action -> release。使用这个方法,就不用操心实际应用的时候,因忘掉释放信号造成未知错误了,这个方法保证请求和释放成对执行。
可以修改前面的案例任务了:
private suspend fun postTask(name: String) {
println("$name acquiring")
semaphore.withPermit {
println("$name acquired")
delay(1000L)
println("$name released")
}
}
小结
从案例的实践可以看到,在合适的应用场景下,利用 Kotlin 协程的信号量,简单的几句调用,就能发挥出「钥匙」之功,实现「门」的作用。「钥匙」数一把、多把皆可。一把的时候,其实就相当于同步锁了,起到独立读写的作用;多把的时候,就是门,起到「准入」效果。
网友评论