美文网首页KotlinAndroid技术知识Kotlin
Android Kotlin(8)之《协程3》

Android Kotlin(8)之《协程3》

作者: 小强彬 | 来源:发表于2017-08-10 18:31 被阅读1588次

    Android Kotlin第八篇 协程3。Kotlin系列源码在源码下载这里下载。我们一起来了解下Kotlin的协程,协程也是Kotlin重点。也许我有的地方没有写好,也欢迎大家提出问题,纠正问题。

    前面我们已经简单了解了协程,这篇我们来看下协程的调度线程与Channels(通道)

    一、调度线程

    在前面我们经常用到“CommonPool”共享的线程池,那么除了共享的线程池以为还有哪些呢,如下:

    fun test21() = runBlocking<Unit> {
            val jobs = arrayListOf<Job>()
            jobs += launch(Unconfined) { // not confined -- will work with main thread
                log("      'Unconfined': I'm working in thread ${Thread.currentThread().name}")
            }
            jobs += launch(coroutineContext) { // context of the parent, runBlocking coroutine
                log("'coroutineContext': I'm working in thread ${Thread.currentThread().name}")
            }
            jobs += launch(CommonPool) { // will get dispatched to ForkJoinPool.commonPool (or equivalent)
                log("      'CommonPool': I'm working in thread ${Thread.currentThread().name}")
            }
            jobs += launch(newSingleThreadContext("MyOwnThread")) { // will get its own new thread
                log("          'newSTC': I'm working in thread ${Thread.currentThread().name}")
            }
            jobs.forEach { it.join() }
        }
    
    • Unconfined:无限制的,自由的,不限于任何特定线程
    • coroutineContext:返回这个coroutine的上下文。
    • CommonPool:共享线程的公共池,类似java里线程池,只不过这里是公共的
    • newSingleThreadContext:创建一个单独的线程,在作业被取消时回收
      目前线程调度还未完全搞懂,只了解了部分,不敢妄加猜测,调度线程就先了解到这里,后续我在补充

    二、Channels(通道)

    Channels,它不是一个阻止放操作,而是一个挂起发送,而不是一个阻塞操作,它有一个暂停接收。可用于线程间传递数据。
    我们先来看一个简单的示例:

    fun test22() = runBlocking<Unit> {
            //定义一个通道
            val channel = Channel<Int>()
            launch(CommonPool) {
                for (x in 1..5) {
                    delay(1000L)
                    //在这里发送
                    channel.send(x * x)
                }
            }
            repeat(5) {
                //在这里接收
                log(channel.receive().toString())
            }
            log("Done!")
        }
    

    输出:

    08-10 14:48:52.452 10568-10568/com.xiaoqiang.kotlin I/test: 1
    08-10 14:48:53.452 10568-10568/com.xiaoqiang.kotlin I/test: 4
    08-10 14:48:54.452 10568-10568/com.xiaoqiang.kotlin I/test: 9
    08-10 14:48:55.452 10568-10568/com.xiaoqiang.kotlin I/test: 16
    08-10 14:48:56.462 10568-10568/com.xiaoqiang.kotlin I/test: 25
    08-10 14:48:56.462 10568-10568/com.xiaoqiang.kotlin I/test: Done!
    08-10 14:48:56.462 10568-10568/com.xiaoqiang.kotlin I/test: 结束
    

    1、关闭通道

    假设发送的时候,遇到特殊情况,不在发生了,要关闭通道,怎么做呢,例如:

    fun test23() = runBlocking<Unit> {
            //定义一个通道
            val channel = Channel<Int>()
            launch(CommonPool) {
                for (x in 1..5) {
                    delay(1000L)
                    //在这里发送
                    channel.send(x * x)
                    if(x == 4){
                        log("关闭通道")
                        channel.close()
                    }
                }
            }
            repeat(5) {
                //在这里接收
                try {
                    var a = channel.receive()
                    log(a.toString())
                }catch (e: ClosedReceiveChannelException){
                    log("关闭通道报出异常ClosedReceiveChannelException")
                }
                log("等待接收")
            }
            log("Done!")
        }
    

    注意:我实际测试的时候发现,如果不抛出ClosedReceiveChannelException异常,那么会导致程序直接奔溃,所有这里你只需要在接收方抛出异常即可
    我们也可把通道写成方法,实际测试发现方法里结束通道时在接收方可以不写抛出异常,例如:

    fun produceSquares() = produce<Int>(CommonPool) {
            for (x in 1..5) {
                delay(1000L)
                send(x * x)
                if (x == 4){
                    channel.close()
                }
            }
        }
        fun test24() = runBlocking<Unit> {
            val squares = produceSquares()
            squares.consumeEach { log(it.toString()) }
            log("Done!")
        }
    

    2、Pipelines

    在这里我理解为管道运输,也就是说上面produce生成通道是可以两个链接起来,每块做不同的工作,然后输出,例如:

    fun produceSquares1() = produce<Int>(CommonPool) {
            for (x in 1..5) {
                delay(1000L)//这里延迟就失效了,具体原因不清楚为啥会失效
                send(x)
            }
        }
        fun produceSquares2(numbers: ReceiveChannel<Int>) = produce<Int>(CommonPool) {
            for (x in 1..5) {
                delay(1000L)
                send((x * x)+1)
            }
        }
        fun test25() = runBlocking<Unit> {
            //始发地
            val numbers = produceSquares1()
            //通道二次加工
            val squares = produceSquares2(numbers)
            //最后输出
            squares.consumeEach { log(it.toString()) }
            log("Done!")
            //关闭通道,回收
            numbers.cancel()
            squares.cancel()
        }
    

    输出:

    08-10 14:56:31.742 19083-19083/com.xiaoqiang.kotlin I/test: 2
    08-10 14:56:32.742 19083-19083/com.xiaoqiang.kotlin I/test: 5
    08-10 14:56:33.742 19083-19083/com.xiaoqiang.kotlin I/test: 10
    08-10 14:56:34.742 19083-19083/com.xiaoqiang.kotlin I/test: 17
    08-10 14:56:35.752 19083-19083/com.xiaoqiang.kotlin I/test: 26
    08-10 14:56:35.752 19083-19083/com.xiaoqiang.kotlin I/test: Done!
    08-10 14:56:35.752 19083-19083/com.xiaoqiang.kotlin I/test: 结束
    

    3、Fan-out

    多个coroutines 可以从一个通道接收,例如:

    fun produceNumbers() = produce<Int>(CommonPool) {
            var a = 1
            while (true){
                delay(100L)
                send(a++)
            }
        }
        fun launchProcessor(id: Int, channel: ReceiveChannel<Int>) = launch(CommonPool) {
            channel.consumeEach {
                log("launch#$id:收到: $it")
            }
        }
        fun test26() = runBlocking<Unit> {
            val producer = produceNumbers()
            repeat(5) { launchProcessor(it, producer) }
            delay(950)
            producer.cancel() // cancel producer coroutine and thus kill them all
        }
    

    输出:

    08-10 15:01:07.228 24523-24563/? I/test: launch#0:收到: 1
    08-10 15:01:07.338 24523-24563/? I/test: launch#1:收到: 2
    08-10 15:01:07.438 24523-24562/? I/test: launch#2:收到: 3
    08-10 15:01:07.538 24523-24563/? I/test: launch#4:收到: 4
    08-10 15:01:07.638 24523-24562/? I/test: launch#3:收到: 5
    08-10 15:01:07.738 24523-24563/? I/test: launch#0:收到: 6
    08-10 15:01:07.838 24523-24563/? I/test: launch#1:收到: 7
    08-10 15:01:07.938 24523-24562/? I/test: launch#2:收到: 8
    08-10 15:01:08.038 24523-24563/com.xiaoqiang.kotlin I/test: launch#4:收到: 9
    08-10 15:01:08.088 24523-24523/com.xiaoqiang.kotlin I/test: 结束
    

    4、Fan-in

    多个coroutines 可以发送到一个通道,例如

    suspend fun sendString(channel: SendChannel<String>, s: String, time: Long) {
            while (true) {
                delay(time)
                channel.send(s)
            }
        }
        fun test27() = runBlocking<Unit> {
            val channel = Channel<String>()
            //这里两个launch间隔着向通道发送数据
            launch(coroutineContext) { sendString(channel, "foo", 200L) }
            launch(coroutineContext) { sendString(channel, "BAR!", 500L) }
            repeat(6) {
                //循环接收数据
                log(channel.receive())
            }
        }
    

    输出:

    08-10 14:58:46.088 21748-21748/com.xiaoqiang.kotlin I/test: foo
    08-10 14:58:46.288 21748-21748/com.xiaoqiang.kotlin I/test: foo
    08-10 14:58:46.388 21748-21748/com.xiaoqiang.kotlin I/test: BAR!
    08-10 14:58:46.488 21748-21748/com.xiaoqiang.kotlin I/test: foo
    08-10 14:58:46.688 21748-21748/com.xiaoqiang.kotlin I/test: foo
    08-10 14:58:46.888 21748-21748/com.xiaoqiang.kotlin I/test: BAR!
    08-10 14:58:46.888 21748-21748/com.xiaoqiang.kotlin I/test: 结束
    

    5、通道缓冲区

    到目前为止所显示的通道没有缓冲区。当发送方和接收方相遇(又称会合)时,未缓冲通道传输元素。如果首先调用send,那么它将被挂起,直到被调用接收,如果首先调用接收,它将暂停,直到调用send。例如:

    fun test28() = runBlocking<Unit> {
            //创建缓冲区大小为4的通道
            val channel = Channel<Int>(2)
            launch(CommonPool) {
                repeat(5) {
                    log("Sending $it")
                    channel.send(it) //当通道缓冲区满的时候挂起,直到下方接收的时候在继续
                    delay(100L)
                }
            }
            delay(3000)
            log("开始接收")
            launch(CommonPool) {
                repeat(5) {
                    val a= channel.receive()//开始接收通道缓冲区里数据,上方发送被激活,继续发送
                    log("Receive $a")
                }
            }
        }
    

    输出:

    08-10 18:29:07.920 21685-21744/com.xiaoqiang.kotlin I/test: Sending 0
    08-10 18:29:08.020 21685-21744/com.xiaoqiang.kotlin I/test: Sending 1
    08-10 18:29:08.130 21685-21744/com.xiaoqiang.kotlin I/test: Sending 2
    08-10 18:29:10.920 21685-21685/com.xiaoqiang.kotlin I/test: 开始接收
    08-10 18:29:10.920 21685-21685/com.xiaoqiang.kotlin I/test: 结束
    08-10 18:29:10.920 21685-21744/com.xiaoqiang.kotlin I/test: Receive 0
    08-10 18:29:10.920 21685-21744/com.xiaoqiang.kotlin I/test: Receive 1
    08-10 18:29:10.920 21685-21744/com.xiaoqiang.kotlin I/test: Receive 2
    08-10 18:29:11.030 21685-21744/com.xiaoqiang.kotlin I/test: Sending 3
    08-10 18:29:11.030 21685-21745/com.xiaoqiang.kotlin I/test: Receive 3
    08-10 18:29:11.130 21685-21744/com.xiaoqiang.kotlin I/test: Sending 4
    08-10 18:29:11.130 21685-21745/com.xiaoqiang.kotlin I/test: Receive 4
    

    如果我有未写好的地方,欢迎大家提出意见,谢谢大家的观赏!

    全套源码下载这里源码会随着后面发布的Kotlin逐渐完善

    a

    相关文章

      网友评论

        本文标题:Android Kotlin(8)之《协程3》

        本文链接:https://www.haomeiwen.com/subject/oxsolxtx.html