目录
一.Continuation
Continuation接口是协程中最核心的接口,代表着挂起点之后的续体,代码如下:
public interface Continuation<in T> { // 续体的上下文 public val context: CoroutineContext // 该方法用于恢复续体的执行 // result为挂起点执行完成的返回值,T为返回值的类型 public fun resumeWith(result: Result<T>) }
Continuation图解
二.ContinuationInterceptor
ContinuationInterceptor接口继承自Element接口,是协程中的续体拦截器,代码如下:
public interface ContinuationInterceptor : CoroutineContext.Element { // 拦截器的Key companion object Key : CoroutineContext.Key<ContinuationInterceptor> // 拦截器对续体进行拦截时会调用该方法,并对continuation进行缓存 // 拦截判断:根据传入的continuation对象与返回的continuation对象是否相同 public fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> // 当interceptContinuation方法拦截的协程执行完毕后,会调用该方法 public fun releaseInterceptedContinuation(continuation: Continuation<*>) { /* do nothing by default */ } // get方法多态实现 public override operator fun <E : CoroutineContext.Element> get(key: CoroutineContext.Key<E>): E? { @OptIn(ExperimentalStdlibApi::class) if (key is AbstractCoroutineContextKey<*, *>) { @Suppress("UNCHECKED_CAST") return if (key.isSubKey(this.key)) key.tryCast(this) as? E else null } @Suppress("UNCHECKED_CAST") return if (ContinuationInterceptor === key) this as E else null } // minusKey方法多态实现 public override fun minusKey(key: CoroutineContext.Key<*>): CoroutineContext { @OptIn(ExperimentalStdlibApi::class) if (key is AbstractCoroutineContextKey<*, *>) { return if (key.isSubKey(this.key) && key.tryCast(this) != null) EmptyCoroutineContext else this } return if (ContinuationInterceptor === key) EmptyCoroutineContext else this } }
三.CoroutineDispatcher
CoroutineDispatcher类继承自AbstractCoroutineContextElement类,实现了ContinuationInterceptor接口,是协程调度器的基类,代码如下:
public abstract class CoroutineDispatcher : AbstractCoroutineContextElement(ContinuationInterceptor), ContinuationInterceptor { // ContinuationInterceptor的多态实现,调度器本质上就是拦截器 @ExperimentalStdlibApi public companion object Key : AbstractCoroutineContextKey<ContinuationInterceptor, CoroutineDispatcher>( ContinuationInterceptor, { it as? CoroutineDispatcher }) // 用于判断调度器是否要调用dispatch方法进行调度,默认为true public open fun isDispatchNeeded(context: CoroutineContext): Boolean = true // 调度的核心方法,在这里进行调度,执行block public abstract fun dispatch(context: CoroutineContext, block: Runnable) // 如果调度是由Yield方法触发的,默认通过dispatch方法实现 @InternalCoroutinesApi public open fun dispatchYield(context: CoroutineContext, block: Runnable): Unit = dispatch(context, block) // ContinuationInterceptor接口的方法,将续体包裹成DispatchedContinuation,并传入当前调度器 public final override fun <T> interceptContinuation(continuation: Continuation<T>): Continuation<T> = DispatchedContinuation(this, continuation) // 释放父协程与子协程的关联。 @InternalCoroutinesApi public override fun releaseInterceptedContinuation(continuation: Continuation<*>) { (continuation as DispatchedContinuation<*>).reusableCancellableContinuation?.detachChild() } // 重载了"+"操作,直接返回others // 因为两个调度器相加没有意义,同一个上下文中只能有一个调度器 // 如果需要加的是调度器对象,则直接替换成最新的,因此直接返回 public operator fun plus(other: CoroutineDispatcher): CoroutineDispatcher = other override fun toString(): String = "$classSimpleName@$hexAddress" }
四.EventLoop
EventLoop类继承自CoroutineDispatcher类,用于协程中任务的分发执行,只在runBlocking方法中和Dispatchers.Unconfined调度器中使用。与Handler中的Looper类似,在创建后会存储在当前线程的ThreadLocal中。EventLoop本身不支持延时执行任务,如果需要可以自行继承EventLoop并实现Delay接口,EventLoop中预留了一部分变量和方法用于延时需求的扩展。
为什么协程需要EventLoop呢?协程的本质是续体传递,而续体传递的本质是回调,假设在Dispatchers.Unconfined调度下,要连续执行多个suspend方法,就会有多个续体传递,假设suspend方法达到一定数量后,就会造成StackOverflow,进而引起崩溃。同样的,我们知道调用runBlocking会阻塞当前线程,而runBlocking阻塞的原理就是执行“死循环”,因此需要在循环中做任务的分发,去执行内部协程在Dispatchers.Unconfined调度器下加入的任务。
EventLoop代码如下:
internal abstract class EventLoop : CoroutineDispatcher() { // 用于记录使用当前EventLoop的runBlocking方法和Dispatchers.Unconfined调度器的数量 private var useCount = 0L // 表示当前的EventLoop是否被暴露给其他的线程 // runBlocking会将EventLoop暴露给其他线程 // 因此,当runBlocking使用时,shared必须为true private var shared = false // Dispatchers.Unconfined调度器的任务执行队列 private var unconfinedQueue: ArrayQueue<DispatchedTask<*>>? = null // 处理任务队列的下一个任务,该方法只能在EventLoop所在的线程调用 // 返回值<=0,说明立刻执行下一个任务 // 返回值>0,说明等待这段时间后,执行下一个任务 // 返回值为Long.MAX_VALUE,说明队列里没有任务了 public open fun processNextEvent(): Long { if (!processUnconfinedEvent()) return Long.MAX_VALUE return 0 } // 队列是否为空 protected open val isEmpty: Boolean get() = isUnconfinedQueueEmpty // 下一个任务多长时间后执行 protected open val nextTime: Long get() { val queue = unconfinedQueue ?: return Long.MAX_VALUE return if (queue.isEmpty) Long.MAX_VALUE else 0L } // 任务的核心处理方法 public fun processUnconfinedEvent(): Boolean { // 若队列为空,则返回 val queue = unconfinedQueue ?: return false // 从队首取出一个任务,如果为空,则返回 val task = queue.removeFirstOrNull() ?: return false // 执行 task.run() return true } // 表示当前EventLoop是否可以在协程上下文中被调用 // EventLoop本质上也是协程上下文 // 如果EventLoop在runBlocking方法中使用,必须返回true public open fun shouldBeProcessedFromContext(): Boolean = false // 向队列中添加一个任务 public fun dispatchUnconfined(task: DispatchedTask<*>) { // 若队列为空,则创建一个新的队列 val queue = unconfinedQueue ?: ArrayQueue<DispatchedTask<*>>().also { unconfinedQueue = it } queue.addLast(task) } // EventLoop当前是否还在被使用 public val isActive: Boolean get() = useCount > 0 // EventLoop当前是否还在被Unconfined调度器使用 public val isUnconfinedLoopActive: Boolean get() = useCount >= delta(unconfined = true) // 判断队列是否为空 public val isUnconfinedQueueEmpty: Boolean get() = unconfinedQueue?.isEmpty ?: true // 下面三个方法用于计算使用当前的EventLoop的runBlocking方法和Unconfined调度器的数量 // useCount是一个64位的数, // 它的高32位用于记录Unconfined调度器的数量,低32位用于记录runBlocking方法的数量 private fun delta(unconfined: Boolean) = if (unconfined) (1L shl 32) else 1L fun incrementUseCount(unconfined: Boolean = false) { useCount += delta(unconfined) // runBlocking中使用,shared为true if (!unconfined) shared = true } fun decrementUseCount(unconfined: Boolean = false) { useCount -= delta(unconfined) // 如果EventLoop还在被使用 if (useCount > 0) return assert { useCount == 0L } // 如果EventLoop不被使用了,并且在EventLoop中使用过 if (shared) { // 关闭相关资源,并在ThreadLocal中移除 shutdown() } } protected open fun shutdown() {} }
协程中提供了EventLoopImplBase类,间接继承自EventLoop,实现了Delay接口,用来延时执行任务。同时,协程中还提供单例对象ThreadLocalEventLoop用于EventLoop在ThreadLocal中的存储。