美文网首页
协程的互斥锁

协程的互斥锁

作者: 小城哇哇 | 来源:发表于2023-05-22 14:14 被阅读0次

    在上一篇《协程的信号量》中,从一个简单的例子,我们一窥 Kotlin 协程的信号量,即 Semaphore 的用法,也了解了适合它的场景。

    今天来讲另一个同步的工具:Mutex,互斥锁

    Mutex

    名为 Mutex,和其他语言里的东西一样,就是一把同步锁,实现互斥机制,对临界区(比如共享资源)加以限制,用以保证多线程的共享资源安全。

    Semaphore 一样,Kotlin 协程里的 Mutex,也是一个接口:

    public interface Mutex {
        /**
         * 标志是否又锁
         */
        public val isLocked: Boolean
    
        /**
         * 尝试锁,如果已锁,则返回 false
         */
        public fun tryLock(owner: Any? = null): Boolean
    
        /**
         * 锁,如果 已锁,则挂起等待
         */
        public suspend fun lock(owner: Any? = null)
    
        
        /**
         * 检查 owner 是否锁,如果没有、或者锁的是其他 owner,返回 false
         */
        public fun holdsLock(owner: Any): Boolean
    
        /**
         * 解锁
         */
        public fun unlock(owner: Any? = null)
    }
    
    

    值得注意的是,所有「锁」方法,都带了一个参数,名为 owner。它的作用就像一个 key 一样,决定了关注的是哪把锁。换句话说:一个 Mutex 可以锁不同的 owner。一旦使用了 owner,但又不符合锁的状态要求而调用了特定的方法,可能会抛异常。

    类似信号量,Mutex 也提供了方法获取实例:

    public fun Mutex(locked: Boolean = false): Mutex =
        MutexImpl(locked)
    
    

    可以通过参数 locked,来设置初始状态是否锁住。

    然后呢,同样也有一个 withXXX 工具函数封装,帮我们解决了「锁/解锁」的操作配对问题:

    public suspend inline fun <T> Mutex.withLock(owner: Any? = null, action: () -> T): T {
        contract { 
            callsInPlace(action, InvocationKind.EXACTLY_ONCE)
        }
    
        lock(owner) // 锁住目标owner
        try {
            return action() // 执行任务
        } finally {
            unlock(owner) // 解锁
        }
    }
    
    

    案例

    看过之前信号量的文章,现在再看 Mutex 的,就能驾轻就熟般地使用它了。借用之前的例子,改造一下任务实现:

    private val mutex = Mutex()
    
    private suspend fun mutexTask(name: String, owner: Any? = null) {
        println("$name locking")
        mutex.withLock(owner) {
            println("$name locked")
            delay(1000L)
            println("$name unlocked")
        }
    }
    
    private suspend fun tryMutexTask(name: String, owner: Any? = null) {
        println("try $name locking")
        if (mutex.tryLock(owner)) {
            println("try $name locked")
            delay(1500)
            println("$name unlocked")
            mutex.unlock(owner)
        } else {
            println("try $name failed")
        }
    }
    
    

    1

    同样的,先看看普通 lock 下的使用:

    for (i in 0 until 3) {
        GlobalScope.launch {
            mutexTask("task-$i")
        }
    }
    println("all posted")
    GlobalScope.launch {
        while (true) {
            print(".")
            delay(200)
        }
    }
    delay(5_000L)
    println("done")
    
    

    结果 :

    all posted
    task-1 locking
    task-1 locked
    task-2 locking
    task-0 locking
    .....task-1 unlocked
    task-2 locked
    .....task-2 unlocked
    task-0 locked
    .....task-0 unlocked
    ..........done
    
    

    三个任务都尝试锁,但只有任务 1 成功;任务 1 完成后解锁,这时,任务 2 成功锁住并执行任务。后面的任务 0 与此类似。

    可以看出,Mutex 就是一种「独占式」的保护门。

    2

    我们再来看看「尝试锁」是怎么样的:

    GlobalScope.launch {
        tryMutexTask("before")
    }
    for (i in 0 until 3) {
        GlobalScope.launch {
            mutexTask("task-$i")
        }
    }
    println("all posted")
    GlobalScope.launch {
        tryMutexTask("after")
    }
    // ...
    
    

    结果:

    task-0 locking
    task-0 locked
    all posted
    task-1 locking
    task-2 locking
    try before locking
    try before failed
    try after locking
    try after failed
    .....task-0 unlocked
    task-2 locked
    .....task-2 unlocked
    task-1 locked
    .....task-1 unlocked
    ..........done
    
    

    这个结果完美体现了多线程的不确定性:率先取得锁的,是任务0,而不是 before。这就导致 before 的 try 失败。其他流程和前面实验 1 一样了。

    再运行一遍:

    try before locking
    try before locked
    task-0 locking
    task-1 locking
    all posted
    task-2 locking
    try after locking
    try after failed
    ........before unlocked
    task-0 locked
    .....task-0 unlocked
    task-1 locked
    .....task-1 unlocked
    task-2 locked
    .....task-2 unlocked
    ..done
    
    

    嗯,这次 before 先锁住了,try 成功。于是乎,后面的三个任务全都锁挂起,after 是 try 锁,直接失败返回。然后同样的,后面就是「锁住、解锁」的成对序列了。

    3

    前面接口说明有提到,如果设置了 owner 的锁,如果调用不合时宜,将抛异常。

    比如下面:

    for (i in 0 until 3) {
        GlobalScope.launch {
            mutexTask("task-$i", "try")
        }
    }
    
    

    预想的崩溃来了:

    all posted
    task-0 locking
    task-1 locking
    task-2 locking
    .task-0 locked
    Exception in thread "DefaultDispatcher-worker-2" Exception in thread "DefaultDispatcher-worker-3" java.lang.IllegalStateException: Already locked by try
        at kotlinx.coroutines.sync.MutexImpl.lockSuspend(Mutex.kt:208)
        at kotlinx.coroutines.sync.MutexImpl.lock(Mutex.kt:186)
        at coroutine.SyncKt.mutexTask(sync.kt:115)
        at coroutine.SyncKt.access$mutexTask(sync.kt:1)
        at coroutine.SyncKt$main$1$1.invokeSuspend(sync.kt:86)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
        at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
        Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@663fba34, Dispatchers.Default]
    java.lang.IllegalStateException: Already locked by try
        at kotlinx.coroutines.sync.MutexImpl.lockSuspend(Mutex.kt:208)
        at kotlinx.coroutines.sync.MutexImpl.lock(Mutex.kt:186)
        at coroutine.SyncKt.mutexTask(sync.kt:115)
        at coroutine.SyncKt.access$mutexTask(sync.kt:1)
        at coroutine.SyncKt$main$1$1.invokeSuspend(sync.kt:86)
        at kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(ContinuationImpl.kt:33)
        at kotlinx.coroutines.DispatchedTask.run(DispatchedTask.kt:106)
        at kotlinx.coroutines.scheduling.CoroutineScheduler.runSafely(CoroutineScheduler.kt:570)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.executeTask(CoroutineScheduler.kt:750)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.runWorker(CoroutineScheduler.kt:677)
        at kotlinx.coroutines.scheduling.CoroutineScheduler$Worker.run(CoroutineScheduler.kt:664)
        Suppressed: kotlinx.coroutines.DiagnosticCoroutineContextException: [StandaloneCoroutine{Cancelling}@702043ed, Dispatchers.Default]
    ....task-0 unlocked
    
    

    第一个 owner 为 「try」的锁可以,后面再来就抛异常了。

    当然,改改名就可以解决:

    for (i in 0 until 3) {
        GlobalScope.launch {
            mutexTask("task-$i", "try$i")
        }
    }
    
    

    这样每次的 owner 就都不同了。

    再谈信号量 Semaphore

    看到这里,有没有发现,Mutex 的作用,甚至在使用上,都和信号量 Semaphore 很类似啊?

    没错,其实思考一下就清楚了,对于信号量允许数设置为 1 的 Semaphore,不就是一个 Mutex 吗? 每使用一个信号量,就没有多余的存在,其他请求方必须等这一个释放才能获取,妥妥地就是一个互斥锁。

    小结

    相较于信号量,Mutex 更为简单。不过呢,使用的出错概率也增加了,因为它动不动就有可能来个崩溃,还是小心使用为好。

    相关文章

      网友评论

          本文标题:协程的互斥锁

          本文链接:https://www.haomeiwen.com/subject/anqtedtx.html