前言
Kotlin 协程作为异步编程的强大工具,带来了便捷和高效。然而,随着多个协程共同操作共享数据,我们面临竞态条件和数据竞争的挑战。本文将深入探讨 Kotlin 协程中的并发安全性问题,提供解决方案和最佳实践。
协程并发安全实战
1. 单线程调度(Main Thread)
var countVar = 0
fun main() = runBlocking{
val jobs = mutableListOf<Job>()
val timeCost = measureTimeMillis {
repeat(1000){
val job = launch {
delay(100)
countVar++
}
jobs.add(job)
}
jobs.forEach{
it.join()
}
}
log("timeCost =$timeCost")
log("count =$countVar")
}
第一个例子:count结果是1000,因为共享数据没有发生线程切换,并不会出现并发安全,所以答案是1000。
2. 多线程调度
fun main() = runBlocking{
val jobs = mutableListOf<Job>()
val mutex = Mutex()
val timeCost = measureTimeMillis {
repeat(1000){
val job = launch(Dispatchers.Default) {
delay(100)
countVar++
}
jobs.add(job)
}
jobs.forEach{
it.join()
}
}
log("timeCost =$timeCost")
log("count =$countVar")
}
第二个例子:count的结果肯定小于等于1000,因为多线程访问,会出现并发安全问题。需要同步。
3. 单线程调度串行执行
fun main() = runBlocking {
val jobs = mutableListOf<Job>()
val timeCost = measureTimeMillis {
val job = launch(Dispatchers.Default) {
repeat(1000) {
delay(100)
countVar++
}
}
jobs.add(job)
jobs.forEach {
it.join()
}
}
log("timeCost =$timeCost")
log("count =$countVar")
}
第三个例子:count结果是1000,和第一个例子一样,因为共享数据没有发生线程切换,并不会出现并发安全,但是串行执行的,所以答案是1000。
所以说 协程作用域是否安全取决于共享数据有没有发生线程切换。若发生线程切换,则需要额外的同步,否则数据不安全。
协程并发安全几种同步方式
CAS 乐观锁
/**
* CAS 乐观锁
*/
fun main() = runBlocking {
val atomicInteger = AtomicInteger()
val jobs = mutableListOf<Job>()
val timeCost = measureTimeMillis {
repeat(1000) {
val job = launch(Dispatchers.Default) {
delay(100)
atomicInteger.incrementAndGet()
}
jobs.add(job)
}
jobs.forEach {
it.join()
}
}
log("timeCost =$timeCost")
log("count =${atomicInteger.get()}")
}
sychronized 高阶函数
/**
* synchronized
*/
fun main() = runBlocking {
val jobs = mutableListOf<Job>()
val lock = Any()
val timeCost = measureTimeMillis {
repeat(100) {
val job = launch(Dispatchers.Default) {
delay(100)
synchronized(lock) {
countVar++
}
}
jobs.add(job)
}
jobs.forEach {
it.join()
}
}
log("timeCost =$timeCost")
log("count =$countVar")
}
mutex
/**
* 多线程调度器
* 需要配合mutex
*/
fun main() = runBlocking{
val jobs = mutableListOf<Job>()
val mutex = Mutex()
val timeCost = measureTimeMillis {
repeat(1000){
val job = launch(Dispatchers.Default) {
delay(100)
mutex.withLock{
countVar++
}
}
jobs.add(job)
}
jobs.forEach{
it.join()
}
}
log("timeCost =$timeCost")
log("count =$countVar")
}
无锁实现
协程内部访问外部count实现自增改为返回增量结果
fun main() = runBlocking {
val count = 0
val timeCost = measureTimeMillis {
val result = count + List(1000) {
GlobalScope.async {
delay(100)
1
}
}.sumOf {
it.await()
}
log("result -->$result")
}
log("timeCost -->$timeCost")
}
协程并发另外一个例子
并发获取User,使用count 记录还剩多少user没有获取
/**
* 并发获取User
*/
fun main() = runBlocking {
val startTime = System.currentTimeMillis()
val userIds: MutableList<Int> = ArrayList()
for (i in 1..1000) {
userIds.add(i)
}
var count = userIds.size
val map: MutableMap<Int, User> = HashMap()
val deferredResults = userIds.map { userId ->
async {
val user = getUserAsync(userId)
//log("userId-->$userId :::: user ---> $user")
map[userId] = user
map
}
}
// 获取每个 async 任务的结果
val results = deferredResults.map { deferred ->
count--
log("count $count")
deferred.await()
}
val costTime = (System.currentTimeMillis() - startTime) / 1000
log("count -> $count")
log("costTime-->$costTime")
log("user size -> ${results.size}")
}
/**
* 异步同步化
*/
suspend fun getUserAsync(userId: Int): User = suspendCoroutine { continuation ->
ClientManager.getUser(userId) {
continuation.resume(it)
}
}
其中 ClientManager:
/**
* 模拟客户端请求
*/
object ClientManager {
var executor: Executor = Executors.newFixedThreadPool(Runtime.getRuntime().availableProcessors() * 2)
val customDispatchers = executor.asCoroutineDispatcher()
/**
* getUser
*/
fun getUser(userId: Int, callback: (User) -> Unit) {
executor.execute {
val sleepTime = Random().nextInt(100)
Thread.sleep(sleepTime.toLong())
callback(User(userId, sleepTime.toString(), "avatar", ""))
}
}
/**
* getAvatar
*/
fun getUserAvatar(user: User, callback: (User) -> Unit) {
executor.execute {
val sleepTime = Random().nextInt(1000)
try {
Thread.sleep(sleepTime.toLong())
} catch (e: InterruptedException) {
e.printStackTrace()
}
user.file = "$sleepTime.png"
callback(user)
}
}
}
思考,这里面的count 为何不需要同步就能正确获取数据呢?(因为count写操作发生在单线程调度器上)
协程并发总结
避免多线程访问外部可变状态
出现并发安全问题,无非是多线程访问公共变量,如果我们能在单线程调度器的情况下去访问公共变量,就不会出现并发安全问题。
总而言之,如非必须,则避免访问外部可变状态; 如无必要,则避免使用可变状态。这样可以有效降低并发问题的出现概率,使代码更加稳定可靠。
在协程并发中,几种同步方式(CAS 乐观锁、synchronized 高阶函数、mutex)都是为了保护共享的可变状态,确保在任意时刻只有一个协程能够修改它,从而避免数据竞争和不一致的结果。
结尾
通过本文,我们深入了解了 Kotlin 协程中的并发安全性问题,并提供了一些解决方案和最佳实践。在实际应用中,根据具体情况选择适当的同步机制,保证在并发环境中代码的正确性和稳定性。协程作为异步编程的强大工具,能够更方便地处理并发问题,但也需要谨慎使用,特别是在多线程环境下。
网友评论