使用kotlin实现lua的协程(非对称)

发布时间 2023-06-08 00:01:52作者: LCAC

一、非对称所以调用的参数和返回值可以是不同的类型

例如:我们创建一个调用的yield参数返回值

// 模板的参数是P,返回值是R
// yield对应的传入参数是R,返回值是P
interface CoroutineScope<P, R> {
    val parameter: P?
    suspend fun yield(value: R): P
}

如上,yield的参数类型是R,返回值类型是P

具体改yield的实现如下:

    private val scope = object : CoroutineScope<P, R> {
        override var parameter: P? = null

        // 这里通过yield的返回值为P,推出了suspendCoroutine的返回值也是为P
        // 从而也推出了continuation的模板参数也是P,因为把continuation传给了Status.Yielded,从而跟上面的
        // class Yielded<P>(val continuation: Continuation<P>): Status()刚好参数都能对上
        override suspend fun yield(value: R): P = suspendCoroutine { continuation ->
            val previousStatus = status.getAndUpdate {
                when(it) {
                    is Status.Created -> throw IllegalStateException("Never started")
                    is Status.Yielded<*> -> throw IllegalStateException("Already yielded")
                    is Status.Resumed<*> -> Status.Yielded(continuation)
                    is Status.Dead -> throw IllegalStateException("Already Dead")
                }
            }

            (previousStatus as? Status.Resumed<R>)?.continuation?.resume(value)
        }
    }

这里使用成员变量的方式来作为接收者的参数,而不是使用对称的方式原来的类来继承实现

    init {
        val coroutineBlock: suspend CoroutineScope<P, R>.() -> R = {
            block(parameter!!)
        }
        val start = coroutineBlock.createCoroutine(scope, this)
        status = AtomicReference(Status.Created(start))
    }

如上scope即为接收者

 

二、完整的非对称的代码

sealed class Status {
    class Created(val continuation: Continuation<Unit>): Status()
    class Yielded<P>(val continuation: Continuation<P>): Status()
    class Resumed<R>(val continuation: Continuation<R>): Status()
    object Dead: Status()
}

// 模板的参数是P,返回值是R
// yield对应的传入参数是R,返回值是P
interface CoroutineScope<P, R> {
    val parameter: P?
    suspend fun yield(value: R): P
}

class Coroutine<P, R>(
    override val context: CoroutineContext = EmptyCoroutineContext,
    private val block: suspend CoroutineScope<P, R>.(P) -> R
): Continuation<R> {
    companion object {
        fun <P, R> create(
            context: CoroutineContext = EmptyCoroutineContext,
            block: suspend CoroutineScope<P, R>.(P) -> R
        ): Coroutine<P, R> {
            return Coroutine(context, block)
        }
    }

    // 这里使用成员变量scope来作为block的协程创建的接收者
    private val scope = object : CoroutineScope<P, R> {
        override var parameter: P? = null

        // 这里通过yield的返回值为P,推出了suspendCoroutine的返回值也是为P
        // 从而也推出了continuation的模板参数也是P,因为把continuation传给了Status.Yielded,从而跟上面的
        // class Yielded<P>(val continuation: Continuation<P>): Status()刚好参数都能对上
        override suspend fun yield(value: R): P = suspendCoroutine { continuation ->
            val previousStatus = status.getAndUpdate {
                when(it) {
                    is Status.Created -> throw IllegalStateException("Never started")
                    is Status.Yielded<*> -> throw IllegalStateException("Already yielded")
                    is Status.Resumed<*> -> Status.Yielded(continuation)
                    is Status.Dead -> throw IllegalStateException("Already Dead")
                }
            }

            (previousStatus as? Status.Resumed<R>)?.continuation?.resume(value)
        }
    }

    private val status: AtomicReference<Status>
    val isActive: Boolean
    get() = status.get() != Status.Dead

    init {
        val coroutineBlock: suspend CoroutineScope<P, R>.() -> R = {
            block(parameter!!)
        }
        val start = coroutineBlock.createCoroutine(scope, this)
        status = AtomicReference(Status.Created(start))
    }

    // 这里同yield
    suspend fun resume(value: P): R = suspendCoroutine { continuation ->
        val previousStatus = status.getAndUpdate {
            when(it) {
                is Status.Created -> {
                    scope.parameter = value
                    Status.Resumed(continuation)
                }
                is Status.Yielded<*> -> {
                    Status.Resumed(continuation)
                }
                is Status.Resumed<*> -> throw IllegalStateException("Already Resumed")
                is Status.Dead -> throw IllegalStateException("Already Dead")
            }
        }

        when(previousStatus) {
            is Status.Created -> previousStatus.continuation.resume(Unit)
            is Status.Yielded<*> -> (previousStatus as Status.Yielded<P>).continuation.resume(value)
            else -> Unit
        }
    }

    override fun resumeWith(result: Result<R>) {
        val previousStatus = status.getAndUpdate {
            when(it) {
                is Status.Created ->  throw IllegalStateException("Never started")
                is Status.Yielded<*> -> throw IllegalStateException("Already Yielded")
                is Status.Resumed<*> -> Status.Dead
                Status.Dead -> throw IllegalStateException("Already Dead")
            }
        }
        (previousStatus as? Status.Resumed<R>)?.continuation?.resumeWith(result)
    }

}
View Code

 

三、对如上的调用

suspend fun testLua() {
    val producer = Coroutine.create<Unit, Int> {
        for (i in 0..3) {
            println("send $i")
            yield(i)
        }
        200
    }

    val consumer = Coroutine.create<Int, Unit> { param: Int ->
        println("start $param")
        for (i in 0..3) {
            val value = yield(Unit)
            println("receive1 $value")
        }
    }

    while (producer.isActive && consumer.isActive) {
        val  result = producer.resume(Unit)
        consumer.resume(result)
    }
}