美文网首页Kotlin开发指南简书人物Kotlin
使用kotlin协程撸一个生成器

使用kotlin协程撸一个生成器

作者: 027f63d16800 | 来源:发表于2017-12-01 17:14 被阅读79次

今天偶然看到了关于js的生成器,代码如下:

function *g() {
  yield 1
  yield 2
  yield 3
}
var o = g()
console.log(o.next()) // {value: 1, done: false}
console.log(o.next()) // {value: 2, done: false}
console.log(o.next()) // {value: 3, done: false}
console.log(o.next()) // {value: undefined, done: true}

作者:luobo_tang
链接:http://www.jianshu.com/p/911f93208335
來源:简书
著作权归作者所有。商业转载请联系作者获得授权,非商业转载请注明出处。

现在很多语言都提供了生成器,包括kotlin,官方也基于协程提供了生成器,具体的使用可以参考官方的说明:
https://github.com/Kotlin/kotlin-coroutines/blob/master/kotlin-coroutines-informal.md
今天,我们来使用kotlin的协程机制,实现一个自己的生成器,最终的效果:

fun main(vararg args: String) {
    var g = Generater.build {
        yield(1)    //1
        println("resumed")
        yield(2)
    }

    println(g.next())
    println("resume next")  
    println(g.next())
}
#output:
1
resume next
resumed
2

根据输出结果,我们可以很明显的看到,在第一次调用next时,只执行了注释1处的代码
分析上面的代码,我们发现主要的方法有buildyieldnext,三个函数的作用很明显:

  • build:创建一个生成器
  • yield:产生一个值
  • next:获取下一个值

显而易见,一个生成器就是一个状态机,通过 next驱动状态机执行,当执行到resume时,状态机更换状态并返回,因此每次驱动状态机执行时,都处于不同的状态。

image.png
生成器(也是一种协程)的一种比较常见的实现原理则是将其编译成一个状态机:
image.png
因为声明生成器的方法与普通的方法不同,比如js中使用function *,解释器对这个方法进行特殊的处理,将其转换成一个状态机。

kotlin中,对协程的处理就是将其编译成一个状态机,使用kotlin的协程接口,我们完全可以自己实现一个生成器。

接下来,code show time!

首先,我们明确一点,我们的生成器不是对应特定类型的,因此我们首先定义一个泛型类:

class Generater<T : Any> private constructor() 

这里的泛型参数T上界强制指定为Any而不使用默认的Any?,这是另一个故事。我的个人习惯是泛型参数的上界都显示指定为不为空类型,如果需要可为空在使用泛型参数时显示使用T?
然后,我们需要提供一个build函数来获取一个生成器,这个函数接收一个lambda函数:

class Generater<T : Any> private constructor() {

    //使用一个AtomicReference来保存协程的执行体
    private var mContinuation: AtomicReference<Continuation<Unit>?> = AtomicReference(null)

    //使用一个ThreadLocal来保存每个线程产生的值
    private var values: ThreadLocal<T?> = ThreadLocal()

    /**
     * -1:结束
     *  0:未创建
     *  1:可执行
     */
    @Volatile
    private var status: Int = 0    //生成器的状态,考虑并发环境下的使用,将其声明为volatile

    companion object {
        fun <T : Any> build(block: suspend Generater<T>.() -> Unit): Generater<T> {
            val g = Generater<T>()  

            //用于协程结束之后的回调
            var c = object : Continuation<Unit> {
                override val context: CoroutineContext
                    get() = EmptyCoroutineContext  //不需要context,使用空实现

                override fun resume(value: Unit) {
                    g.status = -1  //正常结束则修改生成器状态为结束
                }


                override fun resumeWithException(exception: Throwable) {
                    g.status = -1
                    throw exception  //如果发生异常,则停止并抛出异常
                }
            }
            //创建一个协程,并将其保存到AtomicReference,初始值肯定为null
            g.mContinuation.compareAndSet(null, block.createCoroutine(g, c))
            g.status = 1  //设置状态为可执行
            return g
        }
    }
}

我们声明了一个AtomicReference来保存协程的执行体。协程本身会被编译成一个Continuation,并且驱动一个协程执行就是调用resume方法。

接着来实现next函数:

    fun next(): T? {
        while (true) {
            //如果已经结束,则返回null
            if (status ==-1) {
                values.set(null)
                break
            }  
            //如果不可执行,则抛出异常
            if (status == 0){
                throw IllegalStateException("生成器未启动")
            }
            //获取continuation并将AtomicReference的内容设置为null
            val c = mContinuation.getAndSet(null)   
            c ?: continue  //如果获取的是`null`,则表明当前有其他线程正在生成新值,重试
            //这里需要加线程锁,防止协程还未暂停就调用resume
             synchronized(this) {
                c.resume(Unit)
            }
            break
        }
        return values.get()    //返回ThreadLocal内的值,即新产生的值
    }

我们看是,上面返回的新值是从ThreadLocal中获取的,因此在yield,我们要做的就是将生成的值保存到ThreadLocal中,并暂停协程

    suspend fun yield(t: T?) {
        suspendCoroutine<Unit> {
            values.set(t)    //设置生成的值到ThreadLocal
            mContinuation.compareAndSet(null, it)    //重新设置AtomicReference的内容
            //我们没有执行it.resume(Unit),因此会暂停协程
        }
    }

我们来看suspendCoroutine的实现:

public inline suspend fun <T> suspendCoroutine(crossinline block: (Continuation<T>) -> Unit): T =
        suspendCoroutineOrReturn { c: Continuation<T> ->
            val safe = SafeContinuation(c)  //包装continuation
            block(safe)  //调用block,也就是我们上面的设置ThreadLocal和mContinuation内容
 //该方法会暂停协程,为了保证resume在协程暂停后执行,在next中我们需要使用线程锁
            safe.getResult() 
        }

我们进入SafeContinuation.getResult

internal fun getResult(): Any? {
        var result = this.result // atomic read
        //一开始的默认值是UNDECIDED,如果我们没有执行`it.resume(Unit)`则走这个分支
        if (result === UNDECIDED) {
            //设置为COROUTINE_SUSPENDED,并返回COROUTINE_SUSPENDED
            if (RESULT.compareAndSet(this, UNDECIDED, COROUTINE_SUSPENDED)) 
                //请求暂停协程
                return COROUTINE_SUSPENDED
            //如果执行下面这条语句,则说明result的值已经被更新,不需要暂停协程
            result = this.result // reread volatile var
        }
       ...
    }

协程的驱动是从resume进入的,在里面如果发现返回值是COROUTINE_SUSPENDED,则会更改状态机状态并返回,也就是暂停协程。
我们来看看COROUTINE_SUSPENDED

public val COROUTINE_SUSPENDED: Any = Any()  //这是一个对象,也就是唯一的

至此,我们的一个生成器就开发完成了,跑一个斐波那数列:

fun main(vararg args: String) {
    var g = Generater.build {
        yield(0)
        var i = 0
        var j = 1
        while (true) {
            yield(j)
            var next = i + j
            i = j
            j = next
        }
    }

    for (i in 1..10)
        println(g.next())
}
#output:
0
1
1
2
3
5
8
13
21
34

可以看到,kotlin提供的协程不是简单的像生成器这样的接口,而是一套可以让代码编译成状态机的机制,基于这个机制,我们可以实现自己的协程库,比如生成器,或者是异步调用接口( 使用Kotlin协程撸一个简易异步调用库),可见,kotlin的协程机制更加强大

相关文章

网友评论

    本文标题:使用kotlin协程撸一个生成器

    本文链接:https://www.haomeiwen.com/subject/hfbbbxtx.html