巧用kotlinx.coroutines玩转android

作者: 皮球二二 | 来源:发表于2018-06-05 14:31 被阅读103次

记得1年前我在慕课网看bennyhuo大神录制的《 Kotlin系统入门与进阶》中协程部分内容之后,觉得用协程实现异步数据操作真是好累赘,把简单的事情非要搞这么复杂才开心。当时官方文档还有视频部分并未提及kotlinx.coroutines的存在,所以就从入门到放弃啦。时隔1年之久,无意间看到这个好东西,所以赶紧学一波啦。
协程部分文章我将分为两部分,第一部分是kotlinx.coroutines的使用学习,第二部分是对基础库中协程API的介绍。本篇是系列第一篇,相对简单,涉及到的代码在github上,欢迎star、fork

kotlin

基本概念

Coroutine,翻译为协程,意思是各个子任务协作运行。一些耗时操作(如网络IO、文件IO、CPU/GPU密集型任务等)会阻塞线程直到操作完成。因此Kotlin提供一种避免阻塞且更廉价可控的操作:协程挂起(coroutine suspension)。它将复杂的异步操作放入底层库中,程序逻辑顺序表达,以此简化异步编程。该底层库将用户代码包装为回调/订阅事件在不同线程(甚至不同机器上)调度执行,而代码则保持如同顺序执行一样简单。Kotlin提供的扩展后的实验库为kotlinx.coroutines

launch
kotlinx.coroutines使用起来相对于标准库来说简单太多,而且也能够让我们更好理解协程的概念,所以我们就从它开始进入Coroutine的学习

线程和协程的区别(Blocking VS Suspending)

协程是通过编译技术来实现的(不需要虚拟机VM/操作系统OS的支持),通过插入相关代码来生效。与之相反,线程/进程是需要虚拟机VM/操作系统OS的支持,通过调度CPU执行生效
线程阻塞的代价昂贵,尤其在高负载时的可用线程很少,阻塞线程会导致一些重要任务因为缺少可用线程而被延迟。协程挂起几乎无代价,无需上下文切换或涉及OS。最重要的是协程挂起可由用户控制:可决定挂起时发生什么,并根据需求优化/记录日志/拦截

协程并不是为了取代线程,协程对线程进行抽象,你可以看成协程是一个异步调用的框架,解决了之前线程间协作代码繁琐的问题

初识协程

先来看一个协程的简单例子

val time = SimpleDateFormat("hh:MM:ss")

fun main(args: Array<String>) {
    launch {
        delay(1000)
        println("${time.format(Date())}  World!")
    }
    
    println("${time.format(Date())}  Hello, ")
    Thread.sleep(2000)
}

看不懂没关系,我们就猜一下这段代码的运行结果,它可能是:首先打印一个Hello,,过了1秒之后再打印一个World!,2秒后程序退出。
来验证一下我们的猜想是否正确

初识协程

大概每一个看官应该都能猜对这个程序的运行结果。那我就通过这段代码来让大家了解一下扩展库是如何使用的

main函数的流程分为两部分,主流程与协程流程
主流程做了三件事:通过launch函数来启动协程,完事之后打印了Hello,,主线程sleep2秒
协程流程做了两件事:delay了1秒后,打印World!

流程了解清楚之后,轮到新元素登场,首先是launch
launch函数是最常用的协程构建器,它不阻塞当前线程,而是在后台创建一个新协程。我们也可以对其指定协程调度器。关于协程调度器的知识我放在后面再说

launch里面的元素delay是用来干嘛的呢?根据运行结果我们发现,协程的delay与线程的sleep有异曲同工之处:都是休息一段时间。不过两者还是有点区别的,sleep是线程进入了等待环节,而delay则是执行流进入了等待环节,因此delay不会像sleep一样阻塞其他线程的执行。
来看看delay的源码,有没有注意到suspend?

public expect suspend fun delay(time: Int)

既然说到不会阻塞,那就不得不提suspend以特殊修饰符suspend修饰的函数被称为挂起函数。挂起函数只能在协程中和其他挂起函数中调用,不能在其他部分使用,所以要启动一个协程,挂起函数是必须的
delay是挂起函数,但是launch不是挂起函数,所以没有限制launch在哪里运行。请注意一下最后一个参数,它是suspend修饰的lambda表达式,所以launch内运行的那个函数是一个挂起函数

public actual fun launch(
    context: CoroutineContext = DefaultDispatcher,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    parent: Job? = null,
    block: suspend CoroutineScope.() -> Unit
): Job

你可以试试看把delay写在一般函数中,一般会有这个报错提示

没有把delay运行在协程或者挂起函数中

所以如果我们将这个函数分解得更细的话,会变成这样

val time = SimpleDateFormat("hh:MM:ss")

fun main(args: Array<String>) {
    launch {
        writeWorld()
    }

    println("${time.format(Date())}  Hello, ")
    Thread.sleep(2000)

}

suspend fun writeWorld() {
    delay(1000)
    println("${time.format(Date())}  World!")
}

suspend函数只是比普通函数只是多了个suspend关键字, 它的本质是异步返回,而不是通过回调的方式对数据进行操作,这样获取异步数据的代码在写法上看上去就跟同步异步一样。
对比一下写法上的差异

private interface DataImpl {
    fun getDataValue(value: String)
}

private fun getA(impl: DataImpl) {
    Thread(Runnable {
        Thread.sleep(2000)
        impl.getDataValue("value")
    }).start()
}

getA(object : DataImpl {
    override fun getDataValue(value: String) {
        println(value)
    }
})

异步的方式无非就是代码量比较大,与协程相比,流程就不够清晰了

private suspend fun getA1(): String {
    delay(2000)
    return "value"
}

runBlocking {
    val job = async {
        getA1()
    }
    println(job.await())
}

这里有个runBlocking,不用太在意它,它只是用来连接阻塞与非阻塞世界的梯子,但是它会阻塞当前线程一直到协程运行完。它主要是为main函数和测试设计的,一般情况下使用不多。在这个范例中,在runBlocking内通过async启动一个新的子协程去拉取数据,这个新的子协程启动后父协程就被挂起,此时数据还没有返回。过了一段时间,子协程把数据拉回来之后会恢复它的父协程。父协程继续执行,打印最后结果。这也是一般情况下suspend函数的执行流程。

这里我们又看到一个新的启动协程元素:async。与launch一样,async也是创建协程并立即启动,但是launch返回值类型是Job,没法携带返回值,而async的返回值类型是Deferred,是Job的子类,通过调用Deferred里的await函数,可以得到协程的返回值

public actual interface Deferred<out T> : Job {
    public actual suspend fun await(): T
}

我们可以通过async起多个协程,他们会同时运行

主线程如果不主动挂起或者阻塞以等待协程里面的代码执行完成就直接退出的话,那协程也会因此被直接结束,所以扩展库提供了join函数,可以使主线程等待协程执行完再执行。不过这种情况在Android中不太会存在,因为UI线程是一直在跑着的。

fun main(args: Array<String>) {
    val job = launch {
        delay(1000)
        println("World!")
        println(Thread.currentThread())
    }
    runBlocking {
        println("Hello,")
        job.join()
    }
}

协程可以被取消,也可以设置超时。刚才我们说了launch函数有一个Job类型的返回值。Job有一个cancel函数,调用cancel函数可以取消协程

fun main(args: Array<String>) {
    val job = launch {
        repeat(1000) {
            println("${time.format(Date())}  I'm sleeping $it ...")
            delay(1000)
        }
    }
    runBlocking {
        delay(1300)
        println("${time.format(Date())}  I'm tired of waiting!")
        job.cancel()
        job.join()
        println("${time.format(Date())}  Now I can quit.")
    }
}

来看看效果

协程取消

需要注意的是,协程job由于使用了delay函数挂起,所以在调用cancel之后实现了真正的取消;但如果你将delay函数去除,那该协程存在循环计算而没有发生挂起操作,在这种情况下即使调用了cancel,协程状态也变为停止,但是循环操作仍然在继续。在这种情况下,你可以使用isActive来判断是不是发生了取消

fun main(args: Array<String>) {
    val job = launch {
        var i = 0
        while (i< 1000 && isActive) {
            println("${time.format(Date())}  I'm sleeping $i ...")
            i++
        }
    }
    runBlocking {
        delay(2)
        println("${time.format(Date())}  I'm tired of waiting!")
        job.cancel()
        job.join()
        println("${time.format(Date())}  Now I can quit.")
    }
}

来看看效果

isActive标志位

也可以将canceljoin函数合并成一个cancelAndJoin函数来使用

协程的超时设置也很简单,直接在协程代码块中包裹一个withTimeout函数

fun main(args: Array<String>) {
    runBlocking {
        withTimeout(1300) {
            repeat(1000) {
                println("I'm sleeping $it ...")
                delay(500)
            }
        }
    }
}

withTimeout函数在执行完之后会抛出CancellationException异常,如果你不想处理这个异常,你可以使用withTimeoutOrNull函数屏蔽它

作为一个轻量级的线程,协程也有自己运行所在的环境。CoroutineContext包含一个协程调度器CoroutineDispatcher,它代表一个协程执行的场环境。CoroutineDispatcher可以决定协程所在的线程或线程池,它可以指定协程运行于特定的一个线程、一个线程池或者不指定任何线程。
launchasync一样,所有协程构造器都实现了CoroutineContext接口来显式的为新协程或者其他上下文元素指定调度器。默认的调度器为CommonPool

public actual val DefaultDispatcher: CoroutineDispatcher = CommonPool

除了CommonPool之外,还有UnconfinedcoroutineContextnewSingleThreadContext等。

fun main(args: Array<String>) {
    runBlocking {
        val jobs = arrayListOf<Job>()
        jobs.add(launch(Unconfined) {
            println("'Unconfined': I'm working in thread ${Thread.currentThread()}")
            delay(500)
            println("'Unconfined': after I'm working in thread ${Thread.currentThread()}")
        })
        jobs.add(launch(coroutineContext) {
            println("'coroutineContext': I'm working in thread ${Thread.currentThread()}")
            delay(500)
            println("'coroutineContext': after I'm working in thread ${Thread.currentThread()}")
        })
        jobs.add(launch(CommonPool) {
            println("'CommonPool': I'm working in thread ${Thread.currentThread()}")
            delay(500)
            println("'CommonPool': after I'm working in thread ${Thread.currentThread()}")
        })
        jobs.add(launch(newSingleThreadContext("MyOwnThread")) {
            println("'newSingleThreadContext': I'm working in thread ${Thread.currentThread()}")
            delay(500)
            println("'newSingleThreadContext': after I'm working in thread ${Thread.currentThread()}")
        })
        jobs.forEach {
            it.join()
        }
    }
}

来看看返回结果

协程上下文

来对运行结果做一个说明:
CommonPool协程调度器使用的是ForkJoinPool线程池。
Unconfined协程调度器是一种无限制上下文,协程会在当前调用栈中执行直到第一次挂起。恢复运行后线程由被调用的挂起函数决定。当协程没有耗费CPU时间或者没有更新任何局限在特定线程内的共享数据(例如 UI),无限制的调度器是合适的。
coroutineContext继承父调用器的协程调度器,这里由于runBlocking的协程调度器EmptyCoroutineContext运行在主线程,所以结果返回的线程也是在主线程
newSingleThreadContext这个没什么好说的了,它是在单进程中运行

来看看线程切换的例子,这里就是从newSingleThreadContext切换到CommonPool

fun main(args: Array<String>) {
    val job = launch(newSingleThreadContext("ctx1")) {
        println("'ctx1 ${Thread.currentThread()}")
        val job2 = launch(CommonPool) {
            println("'ctx2 ${Thread.currentThread()}")
        }
        job2.join()
        println("'ctx3 ${Thread.currentThread()}")
    }

    runBlocking {
        job.join()
    }
}

来看看结果

线程切换

在上面的代码中,如果不加join函数,ctx2势必会在ctx3后打印,因为它们是同步执行的,还有一个函数可以来解决这个问题,就是withContextwithContext不会创建新的协程,它在指定协程上运行挂起代码块,并挂起该协程直至代码块运行完成。

fun main(args: Array<String>) {
    val job = launch(newSingleThreadContext("ctx1")) {
        println("'ctx1 ${Thread.currentThread()}")
        val job2 = withContext(CommonPool) {
            println("'ctx2 ${Thread.currentThread()}")
            "job2"
        }
        println(job2)
        println("'ctx3 ${Thread.currentThread()}")
    }

    runBlocking {
        job.join()
    }
}

withContext会将异步返回的返回值给返回出来

withContext

基本介绍就此结束,回归到android开发上来

在android中使用协程

我们先在gradle中添加扩展库,当前最新的扩展库版本是0.22.5。我们只需将android的库放进去即可

implementation "org.jetbrains.kotlinx:kotlinx-coroutines-android:0.22.5"

相比默认扩展库core,它特别的为android app添加了UI线程环境。后面我们在更新UI的时候就可以将CoroutineDispatcher指定为UI

val UI = HandlerContext(Handler(Looper.getMainLooper()), "UI")

再添加一句这个,让IDE不再提示kotlinx-coroutines是实验性质的功能

kotlin {
    experimental {
        coroutines 'enable'
    }
}

开始写代码了。我们在日常开发中,最常见的异步操作无非是网络请求。常规情况下,我们创建一个子线程去请求数据,然后通过接口,将数据通过handler传递回UI线程。现在我们来看看协程的写法

launch(UI, parent = job) {
    showNetworkDialog("正在加载数据")

    val result = withContext(CommonPool) {
        val responseBody = httpHelper.okHttpUtils.syncGet("http://www.mocky.io/v2/5943e4dc1200000f08fcb4d4").body()
        if (responseBody == null) {
            throw Exception("出现异常")
        }
        else {
            responseBody.string()
        }
    }

    try {
        tv_main.text = result
    } catch (e: Exception) {

    } finally {
        dismissNetworkDialog()
    }

}.invokeOnCompletion {
    if (it != null) {
        dismissNetworkDialog()
    }
}

我把代码分成了4个部分
第一部分是在UI线程上启动加载框,一个转圈的弹出框
第二部分是在CommonPool线程池中进行数据请求,并判断数据格式是否正确
第三部分是回到UI线程,将结果显示到textview中,并将加载框关闭
第四部分是处理协程被取消的情况。协程被取消的话会执行invokeOnCompletion函数,并且抛出CancellationException异常

来看看运行效果

网络请求

这里还有一个补充,我们一般通过launch(coroutineContext, parent = job)这种形式为当前协程明确指定它的父协程,这样一旦父协程发送取消,所有子协程都被取消了

其实这些操作最好能够跟Lifecycle绑定起来进行操作。我把代码稍微改造一下。先监听一下onDestory的场景,把请求deferred送进来。在onDestory的时候将其取消

class CoroutineLifecycleListener<T>(val deferred: Deferred<T>) : LifecycleObserver {
    @OnLifecycleEvent(Lifecycle.Event.ON_DESTROY)
    fun cancelCoroutine() {
        if (!deferred.isCancelled)
            deferred.cancel()
    }
}

请求部分改造如下,由之前的withContext替换成async,这样可以得到deferred

val deferred = async(CommonPool) {
    val responseBody = httpHelper.okHttpUtils.syncGet("http://www.mocky.io/v2/5943e4dc1200000f08fcb4d4").body()
    if (responseBody == null) {
        throw Exception("出现异常")
    }
    else {
        responseBody.string()
    }
}
lifecycle.addObserver(CoroutineLifecycleListener(deferred))

赋值部分就是tv_main.text = deferred.await()

简单吧?确实简单,更多深入的部分需要我们自行学习,这里是官方提供的example

参考文章

开始使用Kotlin协程
Kotlin-24.协程和线程(Coroutine & Thread)
kotlinx.coroutines

相关文章

网友评论

本文标题:巧用kotlinx.coroutines玩转android

本文链接:https://www.haomeiwen.com/subject/mqdesftx.html