协程可用多线程调度器(比如默认的 Dispatchers.Default)并行执行。这样就可以提出所有常见的并行问题。主要的问题是同步访问共享的可变状态。
一、问题
使用多线程的 Dispatchers.Default来递增一个共享的可变变量。
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
counter不会等于100000,因为100个协程在后台多个线程中同时递增计数器,但是没有做并发处理。
二、volatile无济于事
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed time ms")
}
@Volatile
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
运行速度变慢,还是不能得到100000。因为 volatile 变量保证可线性化读取和写入变量,但在大量动作发生时并不提供原子性。
三、线程安全的数据结构
一种对线程、协程都有效的常规解决方法,就是使用线程安全(也称为同步的、 可线性化、原子)的数据结构,它为需要在共享状态上执行的相应操作提供所有必需的同步处理。在简单的计数器场景中,我们可以使用具有 incrementAndGet 原子操作的 AtomicInteger 类:
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
val counter = AtomicInteger()
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
counter.incrementAndGet()
}
}
println("Counter = $counter")
}
它适用于普通计数器、集合、队列和其他标准数据结构以及它们的基本操作。然而,它并不容易被扩展来应对复杂状态、或一些没有现成的线程安全实现的复杂操作。
四、以细粒度限制线程
限制线程是解决共享可变状态问题的一种方案:对特定共享状态的所有访问权都限制在单个线程中。它通常应用于UI 程序中:所有UI状态都局限于单个事件分发线程或应用主线程中。这在协程中很容易实现,通过使用一个单线程上下文:
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// 将每次自增限制在单线程上下文中
withContext(counterContext) {
counter++
}
}
}
println("Counter = $counter")
}
这段代码运行非常缓慢,因为它进行了 细粒度 的线程限制。每个增量操作都得使用 [withContext(counterContext)] 块从多线程 Dispatchers.Default上下文切换到单线程上下文。
五、以粗粒度限制线程
在实践中,线程限制是在大段代码中执行的,例如:状态更新类业务逻辑中大部分都是限于单线程中。下面的示例演示了这种情况, 在单线程上下文中运行每个协程。运行更快,且打印出来了正确的结果。
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
val counterContext = newSingleThreadContext("CounterContext")
var counter = 0
fun main() = runBlocking {
// 将一切都限制在单线程上下文中
withContext(counterContext) {
massiveRun {
counter++
}
}
println("Counter = $counter")
}
六、互斥
协程中的使用 Mutex解决,不会同时执行关键代码,来保护共享状态的所有修改。它具有 lock和 unlock方法, 可以隔离关键的部分。关键的区别在于 Mutex.lock() 是一个挂起函数,它不会阻塞线程。
还有 withLock扩展函数,可以方便的替代常用的 mutex.lock(); try { …… } finally { mutex.unlock() }
模式:
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同一动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
val mutex = Mutex()
var counter = 0
fun main() = runBlocking {
withContext(Dispatchers.Default) {
massiveRun {
// 用锁保护每次自增
mutex.withLock {
counter++
}
}
}
println("Counter = $counter")
}
此示例中锁是细粒度的,因此会付出一些代价。但是对于某些必须定期修改共享状态的场景,它是一个不错的选择,但是没有自然线程可以限制此状态。
七、Actors
一个 actor是由协程、 被限制并封装到该协程中的状态以及一个与其它协程通信的 通道 组合而成的一个实体。一个简单的 actor 可以简单的写成一个函数, 但是一个拥有复杂状态的 actor 更适合由类来表示。
使用 actor 的第一步是定义一个 actor 要处理的消息类。 密封类很适合这种场景。 使用 IncCounter
消息(用来递增计数器)和 GetCounter
消息(用来获取值)来定义 CounterMsg
密封类。 后者需要发送回复。CompletableDeferred通信原语表示未来可知(可传达)的单个值。
suspend fun massiveRun(action: suspend () -> Unit) {
val n = 100 // 启动的协程数量
val k = 1000 // 每个协程重复执行同个动作的次数
val time = measureTimeMillis {
coroutineScope { // 协程的作用域
repeat(n) {
launch {
repeat(k) { action() }
}
}
}
}
println("Completed ${n * k} actions in $time ms")
}
// 计数器 Actor 的各种类型
sealed class CounterMsg
object IncCounter : CounterMsg() // 递增计数器的单向消息
class GetCounter(val response: CompletableDeferred<Int>) : CounterMsg() // 携带回复的请求
// 这个函数启动一个新的计数器 actor
fun CoroutineScope.counterActor() = actor<CounterMsg> {
var counter = 0 // actor 状态
for (msg in channel) { // 即将到来消息的迭代器
when (msg) {
is IncCounter -> counter++
is GetCounter -> msg.response.complete(counter)
}
}
}
fun main() = runBlocking<Unit> {
val counter = counterActor() // 创建该 actor
withContext(Dispatchers.Default) {
massiveRun {
counter.send(IncCounter)
}
}
// 发送一条消息以用来从一个 actor 中获取计数值
val response = CompletableDeferred<Int>()
counter.send(GetCounter(response))
println("Counter = ${response.await()}")
counter.close() // 关闭该actor
}
actor 本身执行时所处上下文(就正确性而言)无关紧要。一个 actor 是一个协程,而一个协程是按顺序执行的,因此将状态限制到特定协程可以解决共享可变状态的问题。实际上,actor 可以修改自己的私有状态, 但只能通过消息互相影响(避免任何锁定)。
actor 在高负载下比锁更有效,因为在这种情况下它总是有工作要做,而且根本不需要切换到不同的上下文。
actor协程构建器是一个双重的 produce协程构建器。一个 actor 与它接收消息的通道相关联,而一个 producer 与它发送元素的通道相关联。
网友评论