美文网首页
Kotlin 协程的并发安全之道

Kotlin 协程的并发安全之道

作者: BlueSocks | 来源:发表于2023-11-13 17:54 被阅读0次

    前言

    Kotlin 协程作为异步编程的强大工具,带来了便捷和高效。然而,随着多个协程共同操作共享数据,我们面临竞态条件和数据竞争的挑战。本文将深入探讨 Kotlin 协程中的并发安全性问题,提供解决方案和最佳实践。

    协程并发安全实战

    1. 单线程调度(Main Thread)

    var countVar = 0
    
    fun main() = runBlocking{
    
        val jobs = mutableListOf<Job>()
        val timeCost = measureTimeMillis {
            repeat(1000){
                val job = launch {
                    delay(100)
                    countVar++
                }
                jobs.add(job)
            }
            jobs.forEach{
                it.join()
            }
        }
        log("timeCost =$timeCost")
        log("count =$countVar")
    }
    
    

    第一个例子:count结果是1000,因为共享数据没有发生线程切换,并不会出现并发安全,所以答案是1000。

    2. 多线程调度

    fun main() = runBlocking{
    
        val jobs = mutableListOf<Job>()
        val mutex = Mutex()
        val timeCost = measureTimeMillis {
            repeat(1000){
                val job = launch(Dispatchers.Default) {
                    delay(100)
                    countVar++
                }
                jobs.add(job)
            }
            jobs.forEach{
                it.join()
            }
        }
        log("timeCost =$timeCost")
        log("count =$countVar")
    }
    
    

    第二个例子:count的结果肯定小于等于1000,因为多线程访问,会出现并发安全问题。需要同步。

    3. 单线程调度串行执行

    fun main() = runBlocking {
    
        val jobs = mutableListOf<Job>()
        val timeCost = measureTimeMillis {
            val job = launch(Dispatchers.Default) {
                repeat(1000) {
                    delay(100)
                    countVar++
                }
            }
            jobs.add(job)
            jobs.forEach {
                it.join()
            }
        }
        log("timeCost =$timeCost")
        log("count =$countVar")
    }
    
    

    第三个例子:count结果是1000,和第一个例子一样,因为共享数据没有发生线程切换,并不会出现并发安全,但是串行执行的,所以答案是1000。

    所以说 协程作用域是否安全取决于共享数据有没有发生线程切换。若发生线程切换,则需要额外的同步,否则数据不安全。

    协程并发安全几种同步方式

    CAS 乐观锁

    /**
     * CAS 乐观锁
     */
    fun main() = runBlocking {
        val atomicInteger = AtomicInteger()
        val jobs = mutableListOf<Job>()
        val timeCost = measureTimeMillis {
            repeat(1000) {
                val job = launch(Dispatchers.Default) {
                    delay(100)
                    atomicInteger.incrementAndGet()
                }
                jobs.add(job)
            }
            jobs.forEach {
                it.join()
            }
        }
        log("timeCost =$timeCost")
        log("count =${atomicInteger.get()}")
    }
    
    

    sychronized 高阶函数

    /**
     * synchronized
     */
    fun main() = runBlocking {
    
        val jobs = mutableListOf<Job>()
        val lock = Any()
        val timeCost = measureTimeMillis {
            repeat(100) {
                val job = launch(Dispatchers.Default) {
                    delay(100)
                    synchronized(lock) {
                        countVar++
                    }
                }
                jobs.add(job)
            }
    
            jobs.forEach {
                it.join()
            }
        }
        log("timeCost =$timeCost")
        log("count =$countVar")
    }
    
    

    mutex

    /**
     * 多线程调度器
     * 需要配合mutex
     */
    fun main() = runBlocking{
    
        val jobs = mutableListOf<Job>()
        val mutex = Mutex()
        val timeCost = measureTimeMillis {
            repeat(1000){
                val job = launch(Dispatchers.Default) {
                    delay(100)
                    mutex.withLock{
                        countVar++
                    }
    
                }
                jobs.add(job)
            }
            jobs.forEach{
                it.join()
            }
        }
        log("timeCost =$timeCost")
        log("count =$countVar")
    }
    
    

    无锁实现

    协程内部访问外部count实现自增改为返回增量结果

    fun main() = runBlocking {
    
        val count = 0
        val timeCost = measureTimeMillis {
            val result = count + List(1000) {
                GlobalScope.async {
                    delay(100)
                    1
                }
            }.sumOf {
                it.await()
            }
            log("result -->$result")
        }
    
        log("timeCost -->$timeCost")
        
    }
    
    

    协程并发另外一个例子

    并发获取User,使用count 记录还剩多少user没有获取

    /**
     * 并发获取User
     */
    fun main() = runBlocking {
    
         val startTime = System.currentTimeMillis()
         val userIds: MutableList<Int> = ArrayList()
         for (i in 1..1000) {
              userIds.add(i)
         }
         var count = userIds.size
         val map: MutableMap<Int, User> = HashMap()
         val deferredResults = userIds.map { userId ->
              async {
                   val user = getUserAsync(userId)
                   //log("userId-->$userId :::: user --->  $user")
                   map[userId] = user
                   map
              }
         }
    
    
         // 获取每个 async 任务的结果
         val results = deferredResults.map { deferred ->
              count--
              log("count  $count")
              deferred.await()
         }
         val costTime = (System.currentTimeMillis() - startTime) / 1000
         log("count -> $count")
         log("costTime-->$costTime")
         log("user size -> ${results.size}")
    
    }
    
    /**
     * 异步同步化
     */
    suspend fun getUserAsync(userId: Int): User = suspendCoroutine { continuation ->
         ClientManager.getUser(userId) {
              continuation.resume(it)
         }
    }
    
    

    其中 ClientManager:

    /**
     * 模拟客户端请求
     */
    object ClientManager {
    
        var executor: Executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2)
        val customDispatchers = executor.asCoroutineDispatcher()
    
        /**
         * getUser
         */
        fun getUser(userId: Int, callback: (User) -> Unit) {
            executor.execute {
                val sleepTime = Random().nextInt(100)
                Thread.sleep(sleepTime.toLong())
                callback(User(userId, sleepTime.toString(), "avatar", ""))
    
            }
        }
        /**
         * getAvatar
         */
        fun getUserAvatar(user: User, callback: (User) -> Unit) {
            executor.execute {
                val sleepTime = Random().nextInt(1000)
                try {
                    Thread.sleep(sleepTime.toLong())
                } catch (e: InterruptedException) {
                    e.printStackTrace()
                }
                user.file = "$sleepTime.png"
                callback(user)
            }
        }
        
    }
    
    

    思考,这里面的count 为何不需要同步就能正确获取数据呢?(因为count写操作发生在单线程调度器上)

    协程并发总结

    避免多线程访问外部可变状态

    出现并发安全问题,无非是多线程访问公共变量,如果我们能在单线程调度器的情况下去访问公共变量,就不会出现并发安全问题。

    总而言之,如非必须,则避免访问外部可变状态; 如无必要,则避免使用可变状态。这样可以有效降低并发问题的出现概率,使代码更加稳定可靠。

    在协程并发中,几种同步方式(CAS 乐观锁、synchronized 高阶函数、mutex)都是为了保护共享的可变状态,确保在任意时刻只有一个协程能够修改它,从而避免数据竞争和不一致的结果。

    结尾

    通过本文,我们深入了解了 Kotlin 协程中的并发安全性问题,并提供了一些解决方案和最佳实践。在实际应用中,根据具体情况选择适当的同步机制,保证在并发环境中代码的正确性和稳定性。协程作为异步编程的强大工具,能够更方便地处理并发问题,但也需要谨慎使用,特别是在多线程环境下。

    相关文章

      网友评论

          本文标题:Kotlin 协程的并发安全之道

          本文链接:https://www.haomeiwen.com/subject/kssdwdtx.html