协程是什么
从使用的角度来说,协程可以简单理解为轻量化的线程,比线程更低一级的代码执行单元。
准备工作
implementation 'org.jetbrains.kotlinx:kotlinx-coroutines-core:1.1.1'
//针对 android 项目
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:1.1.1"
为什么需要协程
首先看一下,客户端从服务器获取数据的一般流程:
- 从本地缓存或服务器获取 token
- 从服务器获取数据
- 处理数据
理想的操作是这样的:
fun requestToken():Token {...}
fun createPost(token: Token, item: Item): Post {...}
fun processPost(post: Post)
fun postItem(item: Item) {
val token = requestToken()
val post = createPost(token, item)
processPost(post)
}
现实是残酷的,对于 Android 平台而言,获取 token 和数据的操作是耗时的,会阻塞主线程,导致页面无响应。
刚开始我们使用新线程 + 回调 来解决这个问题:
//启动新线程发起数据请求,当数据获取到后调用回调函数,必要的情况下,获取到数据后可以切换到主线程
fun requestToken(cb: (Token) -> Unit) {...}
fun createPost(token: Token, item: Item, cb: (Post) -> Unit) {...}
fun processPost(post: Post) {...}
fun postItem(item: Item) {
requestToken { token ->
createPost(token, item) { post ->
processPost(post)
}
}
}
问题来了,过多的回调使得代码变得冗长而难以阅读,这里使用了 lamada 表达式使得情况稍微好点,不用 lamada,再来个四五层的回调,会疯的。
很多人都受不了了,陆续出了三个方案来解决这个问题:
- Future
- Promise
- Rx
他们本质是一样的,通过数据的封装来减少回调:
fun requestToken():Promise<Token> {...}
fun createPost(token: Token, item: Item): Promise<Token> {...}
fun processPost(post: Post)
fun postItem(item: Item) {
requestToken()
.thenCompose { token -> createPost(token, item) }
.thenAccept {post -> processPost(post)}
}
fun requestToken(): Token { ... }
fun createPost(token: Token, item: Item): Post { ... }
fun processPost(post: Post) { ... }
fun postItem(item: Item) {
Single.fromCallable { requestToken() }
.map { token -> createPost(token, item) }
.subscribe(
{ post -> processPost(post) }, // onSuccess
{ e -> e.printStackTrace() } // onError
)
}
问题大为改观,巴特, 为了使用这些库你需要记忆诸如 thenCompose, thenAccept 这样的操作符,不同的库,操作符可能名字还不相同。新功能的增加往往伴随操作符的增加,给使用者带来的极大的学习负担。我大概 16 年开始使用 Rxjava,Rxjava 的操作符到现在我还没彻底搞清楚。可能是我太懒了吧。
最后我们看下,协程在这个问题的解决上究竟有多香!
suspend fun requestToken():Token {...}
suspend fun createPost(token: Token, item: Item): Post {...}
suspend fun processPost(post: Post)
suspend fun postItem(item: Item) {
val token = requestToken()
val post = createPost(token, item)
processPost(post)
}
无限接近,最开始提出的理想方案———顺序执行。
协程是如何做到的如此 nb 的 !?
如何启动协程
线程 -> 协程
我们的代码默认执行在线程环境下,runBlocking 像一个转换器,把线程环境转换为协程环境。
import kotlinx.coroutines.*
fun main() {
println("Hello,")
runBlocking { // 线程 -> 协程
delay(2000L) //main 线程阻塞两秒
}
}
runBlocking 还可以直接启动一个 main 协程:
import kotlinx.coroutines.*
fun main() = runBlocking {
println("Hello")
}
通过 CoroutineScope 接口的扩展方法 launch 可以启动一个协程。因为 CoroutineScope 是一个接口,不能直接调用其扩展方法,需要定义 CoroutineScope 接口的实现类才能使用。协程库提供了一个 CoroutineScope 的实现 GlobalScope。
import kotlinx.coroutines.*
fun main() {
//启动协程,就像启动一个线程一样,在后台运行
GlobalScope.launch {
delay(1000L)
println("World!")
}
println("Hello,")
//主线程不 sleep,则不会打印 World,程序直接结束,不会等待协程执行完毕
Thread.sleep(2000L)
}
我们也可以自己写 CoroutineScope 的实现类。实际上,不需要这么麻烦,协程库给我们提供了 CoroutineScope 方法来快速实现一个 CoroutineScope 对象。
val uiScope: CoroutineScope = CoroutineScope(Dispatchers.Main)
val ioScope: CoroutineScope = CoroutineScope(Dispatchers.IO)
val computeScope: CoroutineScope = CoroutineScope(Dispatchers.Default)
uiScope.launch {
delay(1000)
}
ioScope.launch {
delay(1000)
}
computeScope.launch {
delay(1000)
}
这里,Dispatchers.Main,Dispatchers.IO,Dispatchers.Default 用于指定协程运行的线程环境:
- Dispatchers.Main: 主线程
- Dispatchers.IO: IO 线程
- Dispatchers.Default: 计算线程
协程 -> 协程
我们也可以在协程环境下启动一个新的协程
通过 launch,async 在协程中启动协程:
fun main() {
runBlocking {
val job:Job = launch {
delay(1000)
println("子协程1执行完毕")
}
val deferred:Deferred<Int> = async {
delay(1200)
println("子协程2执行完毕")
3 //协程的返回值
}
delay(100)
println("父线程执行完毕")
//父协程等待子协程结束
}
}
launch 方法返回一个 Job 对象,async 返回一个 Deferred 对象,Deferred 是带返回值的,可以通过 deferred.await() 获取到返回值。
另外需要注意的是,协程内部启动的协程称为子协程,父协程会等待子协程执行完后才会结束,而不会直接结束。这里可以和之前的 GlobalScope 对比一下。
coroutineScope 也可以在协程环境下启动一个新的协程,通常用于完成并行任务。
import kotlinx.coroutines.*
fun main() = runBlocking {
coroutineScope {
launch {
delay(500L)
println("Task from nested launch")
}
delay(100L)
println("Task from coroutine scope")
}
//coroutineScope 执行完了,才会执行后续代码
println("Coroutine scope is over")
}
挂起 vs 阻塞 (suspend vs block)
在线程环境下,如果一个操作是耗时的,该耗时操作执行完成后,才能执行同一线程后续代码,我们称该操作 阻塞(block) 了线程。
在协程环境下,如果一个操作是耗时的,该耗时操作执行完成后,才能执行同一协程后续代码,我们称该操作 挂起(suspend) 了协程。
就是多了个叫法,方便区分。
kotlin 中有一个关键字 suspend,用于修饰方法。
- 被 suspend 修饰的方法通常表示该方法是耗时的,会挂起协程。
- suspend 方法执行完了,才会执行后续代码。
- suspend 方法只能在 suspend 方法内部或协程内部调用
import kotlinx.coroutines.*
fun main() = runBlocking {
launch { doWorld() }
println("Hello,")
}
suspend fun doWorld() {
delay(1000L)
println("World!")
}
用于启动协程的 coroutineScope 就是一个 suspend 方法。所以,coroutineScope 执行完了,才会执行后续代码。
原理
以上就是协程最基础的部分。了解一下协程的工作原理:
查看下源码:
public fun <T> runBlocking(context: CoroutineContext = EmptyCoroutineContext, block: suspend CoroutineScope.() -> T): T
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job
public fun <T> CoroutineScope.async(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> T
): Deferred<T>
public suspend fun <R> coroutineScope(block: suspend CoroutineScope.() -> R): R
可以看出,启动协程时,我们传入的 block 都是 CoroutineScope 的扩展函数:
public interface CoroutineScope {
public val coroutineContext: CoroutineContext
}
CoroutineScope 有一个成员变量 coroutineContext,在协程中我们都可以
访问到这个成员变量
fun main() {
GlobalScope.launch {
println(coroutineContext)
}
Thread.sleep(1000)
}
CoroutineContext 是一个接口,功能类似于 Map,用于保存 key-value 型数据。
public interface CoroutineContext {
//'Map' 中的 key,E 是对于 Value 的类型
public interface Key<E : Element>
//'Map' 中的 value
public interface Element : CoroutineContext {
public val key: Key<*>
public override operator fun <E : Element> get(key: Key<E>): E? =
@Suppress("UNCHECKED_CAST")
if (this.key == key) this as E else null
public override fun <R> fold(initial: R, operation: (R, Element) -> R): R =
operation(initial, this)
public override fun minusKey(key: Key<*>): CoroutineContext =
if (this.key == key) EmptyCoroutineContext else this
}
//根据 key 获取 value,重载操作符
public operator fun <E : Element> get(key: Key<E>): E?
//遍历 ‘map’,进行累积操作
public fun <R> fold(initial: R, operation: (R, Element) -> R): R
//重载操作符 ‘+’ , 用于将两个 'map' 合并
public operator fun plus(context: CoroutineContext): CoroutineContext =
if (context === EmptyCoroutineContext) this else // fast path -- avoid lambda creation
context.fold(this) { acc, element ->
val removed = acc.minusKey(element.key)
if (removed === EmptyCoroutineContext) element else {
// make sure interceptor is always last in the context (and thus is fast to get when present)
val interceptor = removed[ContinuationInterceptor]
if (interceptor == null) CombinedContext(removed, element) else {
val left = removed.minusKey(ContinuationInterceptor)
if (left === EmptyCoroutineContext) CombinedContext(element, interceptor) else
CombinedContext(CombinedContext(left, element), interceptor)
}
}
}
//从 ‘map’ 中删除键值对
public fun minusKey(key: Key<*>): CoroutineContext
}
CoroutineContext 表示协程工作的上下文。主要包含了下面几类对象:
- ContinuationInterceptor:用于指定协程运行在哪个线程之上
- Job:代表一个协程对象,就像 Thread 代表一个线程一样
- CoroutineExceptionHandler:用于协程的异常处理
- CoroutineName:协程的名字
- CoroutineId:协程的 id
通过重载操作符我们都可以访问到这些对象
fun main() {
GlobalScope.launch {
val job = coroutineContext[Job]
println(job)
val continnuation = coroutineContext[ContinuationInterceptor]
println(continnuation)
val exceptionHandler = coroutineContext[CoroutineExceptionHandler]
println(exceptionHandler)
val name = coroutineContext[CoroutineName]
println(name)
}
Thread.sleep(1000)
}
小结一下:
每个用于启动协程的 block 都是 CoroutineScope 接口的扩展方法,都继承了 coroutineContext 成员变量。CoroutineScope 用于定义协程的作用域(scope),
coroutineContext 表示协程工作的上下文,类似一个map,保存协程作用域中的重要对象:
- ContinuationInterceptor:用于指定协程运行在哪个线程之上
- Job:代表一个协程对象,就像 Thread 代表一个线程一样
- CoroutineExceptionHandler:用于协程的异常处理
- CoroutineName:协程的名字
- CoroutineId:协程的 id
从源代码可以看出 CoroutineContext 相对于 Map 的几点优势:
- 重载了 plus 操作符,可以方便的合并两个 CoroutineContext
- 存储的数据 Element 也实现了 CoroutineContext 接口,内部含有 Key 成员变量,简化了 CoroutineContext 中数据的添加。
- get 方法直接返回泛型类型,无需使用强制类型装换
我从协库里找了点代码(格式上稍作修改),来看看 CoroutineContext 到底有多 nice:
@ExperimentalCoroutinesApi
public actual fun CoroutineScope.newCoroutineContext(context: CoroutineContext): CoroutineContext {
//合并两个 CoroutineContext,后者权重更高,就是说,如果两个 context 内部有相同的 key,取加号右边的 value
val combined = coroutineContext + context
val debug = if (DEBUG) {
//CoroutineContext 与 Element 合并,加号右侧权重更高
combined + CoroutineId(COROUTINE_ID.incrementAndGet())
} else {
combined
}
return if (combined !== Dispatchers.Default && combined[ContinuationInterceptor] == null) {
//CoroutineContext 与 Element 合并,加号右侧权重更高
debug + Dispatchers.Default
} else {
debug
}
}
最后,再来看看协程的启动的一些细节
public fun CoroutineScope.launch(
context: CoroutineContext = EmptyCoroutineContext,
start: CoroutineStart = CoroutineStart.DEFAULT,
block: suspend CoroutineScope.() -> Unit
): Job
通过 launch 启动的协程,其上下文对象 CoroutineContext 由父协程的 CoroutineContext 对象和 launch 方法的第一个参数共同决定。具体规则如下图所示:
协程原理.png新协程context = 父协程context + 参数 context + child Job
启动过程中,会创建一个新的 Job 对象 child Job,父协程的 job 对象是它的 parent。内部是通过 attachChild 方法来确定 job 之间的关系的。
线程切换
在启动一个线程时我们可以指定一个线程:
//在 main 线程之上运行协程
launch(Dispatchers.Main) {
}
也可通过 withContext 方法切换线程
launch(Dispatchers.Main) {
withContext(Dispatchers.IO) {
}
}
用于指定线程的参数包括:
- Dispatchers.Main:主线程
- Dispatchers.IO:不限制数量的线程池
- Dispatchers.Default:线程数量等于 cpu 核心数的线程池。不指定的情况下都是这个
- Dispatchers.Unconfined:不改变线程,执行完一个 suspend 方法后,改变为 suspend 中的线程
fun main() {
GlobalScope.launch {
launch(Dispatchers.Unconfined) {
println("1 I'm working in thread ${Thread.currentThread().name}")
withContext(Dispatchers.IO) {
println("2 I'm working in thread ${Thread.currentThread().name}")
}
println("3 I'm working in thread ${Thread.currentThread().name}")
}
}
Thread.sleep(1000)
}
输出:
1 I'm working in thread DefaultDispatcher-worker-1
2 I'm working in thread DefaultDispatcher-worker-2
3 I'm working in thread DefaultDispatcher-worker-2
协程方法
join
协程的 join 方法和线程的 join 方法类似,都是让出当前执行权,让其它协程先执行。
import kotlinx.coroutines.*
fun main() = runBlocking {
//返回一个 job 对象
val job = GlobalScope.launch {
delay(1000L)
println("World!")
}
println("Hello,")
job.join() //外部也成让出执行权,执行 job 协程
//job 协程执行完后,才会执行后续代码
println("Hello,")
}
yiedld
用于让出当前线程,让其它协程执行。如果协程是通过 Dispatchers.Unconfined 启动的,yiedld 方法什么都不做。
fun main() {
GlobalScope.launch {
yield()
}
Thread.sleep(1000)
}
协程的退出
可以通过协程的 cancel 方法来退出一个可退出的(cancellable)协程。
那什么是可退出(cancellable)的协程呢?有两种情况:
情况一:协程中耗时任务都来自协程库中的 suspend 方法。
val job = launch {
repeat(1000) { i ->
println("I'm sleeping $i ...")
//程库中的 suspend 方法
delay(500L)
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancel() // cancels the job
job.join() // waits for job's completion
println("main: Now I can quit.")
协程库中的 suspend 方法在中断时都会抛出 CancellationException 异常,这个异常主要用于调试,通常无需单独处理。
情况二:使用 isActive 决定协程是否继续执行
val startTime = System.currentTimeMillis()
val job = launch(Dispatchers.Default) {
var nextPrintTime = startTime
var i = 0
while (isActive) { // cancellable computation loop
// print a message twice a second
if (System.currentTimeMillis() >= nextPrintTime) {
println("I'm sleeping ${i++} ...")
nextPrintTime += 500L
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
协程调用 cancle 后,isActive 会被置为 false。
有的时候,我们希望在协程被中断后做一些清理工作,可以使用 try finally
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
println("I'm running finally")
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
finally 中是不能调用 suspend 方法,虽然在 finally 中调用 suspend 情况很少,但是还是可以通过 withContext(NonCancellable) {...} 来实现
val job = launch {
try {
repeat(1000) { i ->
println("I'm sleeping $i ...")
delay(500L)
}
} finally {
withContext(NonCancellable) {
println("I'm running finally")
delay(1000L)
println("And I've just delayed for 1 sec because I'm non-cancellable")
}
}
}
delay(1300L) // delay a bit
println("main: I'm tired of waiting!")
job.cancelAndJoin() // cancels the job and waits for its completion
println("main: Now I can quit.")
异常处理
通过 launch 启动的协程使用 CoroutineExceptionHandler 处理异常
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught $exception")
}
val job = GlobalScope.launch(handler) {
throw AssertionError()
}
job.join()
通过 async 启动的协程使用 try catch 处理异常
val deferred = GlobalScope.async {
println("Throwing exception from async")
throw ArithmeticException() // Nothing is printed, relying on user to call await
}
try {
deferred.await()
println("Unreached")
} catch (e: ArithmeticException) {
println("Caught ArithmeticException")
}
当协程抛出异常时,协程的执行会结束,同时协程会将这个异常传递给父协程,父协程的其他子协程及父协程的执行也会结束。
val handler = CoroutineExceptionHandler { _, exception ->
println("Caught $exception")
}
val job = GlobalScope.launch(handler) {
launch { // the first child
try {
delay(Long.MAX_VALUE)
} finally {
withContext(NonCancellable) {
println("Children are cancelled, but exception is not handled until all children terminate")
delay(100)
println("The first child finished its non cancellable block")
}
}
}
launch { // the second child
delay(10)
println("Second child throws an exception")
throw ArithmeticException()
}
}
job.join()
但是,有一个特殊的异常 CancellationException,它只会导致抛出该异常的协程结束,不会传递给其他协程。
有的时候,我们希望抛出异常时,可以单独结束一个协程。这时候可以使用 Supervision job
import kotlinx.coroutines.*
fun main() = runBlocking {
val supervisor = SupervisorJob()
with(CoroutineScope(coroutineContext + supervisor)) {
// launch the first child -- its exception is ignored for this example (don't do this in practice!)
val firstChild = launch(CoroutineExceptionHandler { _, _ -> }) {
println("First child is failing")
throw AssertionError("First child is cancelled")
}
// launch the second child
val secondChild = launch {
firstChild.join()
// Cancellation of the first child is not propagated to the second child
println("First child is cancelled: ${firstChild.isCancelled}, but second one is still active")
try {
delay(Long.MAX_VALUE)
} finally {
// But cancellation of the supervisor is propagated
println("Second child is cancelled because supervisor is cancelled")
}
}
// wait until the first child fails & completes
firstChild.join()
println("Cancelling supervisor")
supervisor.cancel()
secondChild.join()
}
}
android 实用代码
这里是协程库中注释给出的代码:
class MyActivity : AppCompatActivity(), CoroutineScope {
lateinit var job: Job
override val coroutineContext: CoroutineContext
get() = Dispatchers.Main + job
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
job = Job()
}
override fun onDestroy() {
super.onDestroy()
job.cancel() // Cancel job on activity destroy. After destroy all children jobs will be cancelled automatically
}
/*
* Note how coroutine builders are scoped: if activity is destroyed or any of the launched coroutines
* in this method throws an exception, then all nested coroutines are cancelled.
*/
fun loadDataFromUI() = launch { // <- extension on current activity, launched in the main thread
val ioData = async(Dispatchers.IO) { // <- extension on launch scope, launched in IO dispatcher
// blocking I/O operation
}
// do something else concurrently with I/O
val data = ioData.await() // wait for result of I/O
draw(data) // can draw in the main thread
}
}
我觉得可以稍加改进下:
class MainScope : CoroutineScope, LifecycleObserver {
private val job = SupervisorJob()
override val coroutineContext: CoroutineContext
get() = job + Dispatchers.Main
@OnLifecycleEvent(Lifecycle.Event.ON_PAUSE)
fun destroy() = coroutineContext.cancelChildren()
}
// usage
class MainFragment : Fragment() {
private val uiScope = MainScope()
override fun onCreate(savedInstanceState: Bundle?) {
super.onCreate(savedInstanceState)
lifecycle.addObserver(mainScope)
}
private fun loadData() = uiScope.launch {
val result = withContext(bgDispatcher) {
// your blocking call
}
}
}
参考资料
- Guide to kotlinx.coroutines by example
- KotlinConf 2017 - Introduction to Coroutines by Roman Elizarov
- Blocking threads, suspending coroutines
- Coroutines: runBlocking vs coroutineScope
- Coroutine Context and Scope
- The reason to avoid GlobalScope
- Coroutines in Kotlin 1.3 explained: Suspending functions, contexts, builders and scopes
- Demystifying CoroutineContext
- Android Coroutine Recipes
网友评论