博客 / 詳情

返回

Kotlin coroutine 原理

Coroutine

lifecycleScope.launch {
            Log.d("testCoroutineScope","testCoroutineScope start $this")
            delay(2000)
            Log.d("testCoroutineScope","testCoroutineScope middle1")
            delay(2000)
            Log.d("testCoroutineScope","testCoroutineScope middle2")
            delay(2000)
            Log.d("testCoroutineScope","testCoroutineScope end")
        }

上邊的代碼展示了啓動協程的方法,通常在協程體中會調用到suspend函數。我們都瞭解kotlin中協程的支持除了應用到kotlin的一些語法特性,同時針對協程還進行了編譯器的修改,使得我們在使用協程時更加直觀方便。但是這也帶來了另一個問題,我們更難理解協程的具體工作細節。下面我們從最讓人費解的協程體開始入手。

一、探索協程體到底是什麼?

這裏認為通過launch、async啓動的block塊就是協程體。

協程體在經過編譯器編譯後會生成一個新的對象,具體對象的實現是什麼樣的呢?看看下面反編譯後的代碼與原代碼的比較:

//原來的kotlin代碼
private fun testCoroutineScope() {
        val block: suspend CoroutineScope.() -> Unit = {
            Log.d("testCoroutineScope", "testCoroutineScope start")
            delay(2000)
            Log.d("testCoroutineScope", "testCoroutineScope end")
        }

        lifecycleScope.launch(block = block)
    }

為了方便觀察,我把協程體單獨定義一個block變量。kotlin的協程體只是簡單的調用delay掛起方法並在方法調用前後添加了log。

//反編譯後得到的代碼
private final void testCoroutineScope() {
      Function2 block = (Function2)(new Function2((Continuation)null) {
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               Log.d("testCoroutineScope", "testCoroutineScope start");
               this.label = 1;
               if (DelayKt.delay(2000L, this) == var2) {
                  return var2;
               }
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               break;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            Log.d("testCoroutineScope", "testCoroutineScope end");
            return Unit.INSTANCE;
         }

         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }

         public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      });
      BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, block, 3, (Object)null);
   }

經過反編譯後的協程體是一個實現Function2接口類型的對象,Function2支持兩個形參的invoke方法。除了invoke方法還有create和invokeSuspend兩個方法,所以協程體更準確的描述應該是實現了Function2接口的對象。反編譯的代碼可以幫我們瞭解大體的思路,但是一些細節還是有問題的。我們在網上搜索關於協程體對象的介紹,介紹中説協程體是SuspendLambda類的子類。那麼它真的是SuspendLambda的子類嗎?我們可以修改代碼簡單驗證下:

private fun testCoroutineScope() {
        val block: suspend CoroutineScope.() -> Unit = {
            Log.d("testCoroutineScope", "testCoroutineScope start")
            delay(2000)
            Log.d("testCoroutineScope", "testCoroutineScope end")
        }
        //查看block對象的方法
        block.javaClass.let { clazz ->
            clazz.methods.forEach { method ->
                Log.d("testCoroutineScope", "method is ${method}")
            }
        }

        lifecycleScope.launch(block = block)
    }

我們通過打印block對象的方法基本可以驗證問題,下面部分是log輸出的內容:

D: method is public kotlin.coroutines.Continuation kotlin.coroutines.jvm.internal.BaseContinuationImpl.create(kotlin.coroutines.Continuation)
D: method is public final kotlin.coroutines.Continuation com.dragon.testcoroutine.CoroutineActivity$testCoroutineScope$block$1.create(java.lang.Object,kotlin.coroutines.Continuation)
D: method is public boolean java.lang.Object.equals(java.lang.Object)
D: method is public int kotlin.coroutines.jvm.internal.SuspendLambda.getArity()
D: method is public kotlin.coroutines.jvm.internal.CoroutineStackFrame kotlin.coroutines.jvm.internal.BaseContinuationImpl.getCallerFrame()
D: method is public final java.lang.Class java.lang.Object.getClass()
D: method is public final kotlin.coroutines.Continuation kotlin.coroutines.jvm.internal.BaseContinuationImpl.getCompletion()
D: method is public kotlin.coroutines.CoroutineContext kotlin.coroutines.jvm.internal.ContinuationImpl.getContext()
D: method is public java.lang.StackTraceElement kotlin.coroutines.jvm.internal.BaseContinuationImpl.getStackTraceElement()
D: method is public int java.lang.Object.hashCode()
D: method is public final kotlin.coroutines.Continuation kotlin.coroutines.jvm.internal.ContinuationImpl.intercepted()
D: method is public java.lang.Object com.dragon.testcoroutine.CoroutineActivity$testCoroutineScope$block$1.invoke(java.lang.Object,java.lang.Object)
D: method is public final java.lang.Object com.dragon.testcoroutine.CoroutineActivity$testCoroutineScope$block$1.invoke(kotlinx.coroutines.CoroutineScope,kotlin.coroutines.Continuation)
D: method is public final java.lang.Object com.dragon.testcoroutine.CoroutineActivity$testCoroutineScope$block$1.invokeSuspend(java.lang.Object)
D: method is public final native void java.lang.Object.notify()
D: method is public final native void java.lang.Object.notifyAll()
D: method is public final void kotlin.coroutines.jvm.internal.BaseContinuationImpl.resumeWith(java.lang.Object)
D: method is public java.lang.String kotlin.coroutines.jvm.internal.SuspendLambda.toString()
D: method is public final void java.lang.Object.wait() throws java.lang.InterruptedException
D: method is public final void java.lang.Object.wait(long) throws java.lang.InterruptedException
D: method is public final native void java.lang.Object.wait(long,int) throws java.lang.InterruptedException

我們從log中可以找到SuspendLambda 的getArity()方法,所以block對象肯定是繼承了SuspendLambda。在log中我們也可以找到反編譯代碼中發現的create方法、invoke方法和invokeSuspend方法。

二、協程體是如何啓動的

這裏以launch方法的啓動過程為準來跟蹤啓動體的啓動過程。先看下啓動方法的實現:

public fun CoroutineScope.launch(
    context: CoroutineContext = EmptyCoroutineContext,
    start: CoroutineStart = CoroutineStart.DEFAULT,
    block: suspend CoroutineScope.() -> Unit
): Job {
    val newContext = newCoroutineContext(context)
    val coroutine = if (start.isLazy)
        LazyStandaloneCoroutine(newContext, block) else
        StandaloneCoroutine(newContext, active = true)
    coroutine.start(start, coroutine, block)
    return coroutine
}

我們這裏只關心協程體block的啓動,所以關於協程的某些細節暫時這裏不深究。從代碼中我們看到協程啓動的方法調用coroutine.start(),我們按照執行標準的協程StandaloneCoroutine來跟蹤查看它的代碼。

public fun <R> start(start: CoroutineStart, receiver: R, block: suspend R.() -> T) {
        start(block, receiver, this)
    }

這個start方法中調用了一個start方法,看着有奇怪。其實方法內部調用的是CoroutineStart類型的invoke方法

public operator fun <R, T> invoke(block: suspend R.() -> T, receiver: R, completion: Continuation<T>): Unit =
        when (this) {
            DEFAULT -> block.startCoroutineCancellable(receiver, completion)
            ATOMIC -> block.startCoroutine(receiver, completion)
            UNDISPATCHED -> block.startCoroutineUndispatched(receiver, completion)
            LAZY -> Unit // will start lazily
        }

我們按照默認的啓動類型走應該調用DEFAULT -> block.startCoroutineCancellable(receiver, completion)。block是協程體對象,這個方法就是啓動協程體的。completion就是前面創建的StandaloneCoroutine對象,receiver也是StandaloneCoroutine。startCoroutineCancellable方法做了什麼呢?

internal fun <R, T> (suspend (R) -> T).startCoroutineCancellable(
    receiver: R, completion: Continuation<T>,
    onCancellation: ((cause: Throwable) -> Unit)? = null
) =
    runSafely(completion) {
        createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)
    }

從代碼的語意上看就是創建協程體並使用攔截器進行啓動。但是協程體對象不是在調用launch方法的時候傳進來了嗎?為什麼這裏還要創建呢?其實協程體是按照狀態機方式來工作的,協程體中每個可掛起函數的調用都對應狀態機的一個狀態,這個狀態被記錄在協程體對象中。為了避免一個協程體被多次啓動後相互干擾狀態,所以這裏會每次都創建一個新的協程體對象再啓動。

createCoroutineUnintercepted擴展方法的具體實現如下:

public actual fun <R, T> (suspend R.() -> T).createCoroutineUnintercepted(
    receiver: R,
    completion: Continuation<T>
): Continuation<Unit> {
    val probeCompletion = probeCoroutineCreated(completion)
    return if (this is BaseContinuationImpl)
        create(receiver, probeCompletion)
    else {
        createCoroutineFromSuspendFunction(probeCompletion) {
            (this as Function2<R, Continuation<T>, Any?>).invoke(receiver, it)
        }
    }
}

協程體繼承關係:協程體-》SuspendLambda-》ContinuationImpl-》BaseContinuationImpl-》Continuation

因為協程體繼承了BaseContinuationImpl,所以這裏調用create(receiver, probeCompletion)方法創建協程。我們在前面反編譯協程體的代碼中也看到了create方法的實現:

         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }

當然反編譯的代碼不太準確,實際就是創建協程體對象並返回。

createCoroutineUnintercepted(receiver, completion).intercepted().resumeCancellableWith(Result.success(Unit), onCancellation)

創建完新的協程體後調用協程體的intercepted()方法,這裏涉及到協程中線程切換的內容,這裏先不深究它,簡單可以認為他把協程體包裝成在某個線程池中運行的協程。resumeCancellableWith()方法便是真正的啓動了協程體。

三、協程體如何掛起,如何恢復

前面已經介紹過協程體就是SuspendLambda對象,它的內部按照狀態機的方式運行。協程體的每次喚醒都只能運行當前狀態的代碼並更改到下個狀態。

private final void testCoroutineScope() {
      Function2 block = (Function2)(new Function2((Continuation)null) {
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               Log.d("testCoroutineScope", "testCoroutineScope start");
               this.label = 1;
               if (DelayKt.delay(2000L, this) == var2) {
                  return var2;
               }
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               break;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            Log.d("testCoroutineScope", "testCoroutineScope end");
            return Unit.INSTANCE;
         }

         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }

         public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      });
      BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, block, 3, (Object)null);
   }

反編譯的代碼中可以看到label變量,這個變量在維護着狀態機的狀態。invokeSuspend方法調用一次,label的狀態變被推進一步。説到這裏大家就能夠理解了掛起和喚醒的實現原理了,首先invokeSuspend被調用一次只執行了一個狀態的代碼,後面狀態的代碼都沒有執行,這時進入掛起狀態。當異步耗時操作執行結束後再次調用invokeSuspend方法,既是協程體被喚醒。

支持喚醒掛起的對象都繼承了BaseContinuationImpl,在BaseContinuationImpl中我們可以找到調用invokeSuspend方法的地方。

// This implementation is final. This fact is used to unroll resumeWith recursion.
    public final override fun resumeWith(result: Result<Any?>) {
        // This loop unrolls recursion in current.resumeWith(param) to make saner and shorter stack traces on resume
        var current = this
        var param = result
        while (true) {
            // Invoke "resume" debug probe on every resumed continuation, so that a debugging library infrastructure
            // can precisely track what part of suspended callstack was already resumed
            probeCoroutineResumed(current)
            with(current) {
                val completion = completion!! // fail fast when trying to resume continuation without completion
                val outcome: Result<Any?> =
                    try {
                        val outcome = invokeSuspend(param)
                        if (outcome === COROUTINE_SUSPENDED) return
                        Result.success(outcome)
                    } catch (exception: Throwable) {
                        Result.failure(exception)
                    }
                releaseIntercepted() // this state machine instance is terminating
                if (completion is BaseContinuationImpl) {
                    // unrolling recursion via loop
                    current = completion
                    param = outcome
                } else {
                    // top-level completion reached -- invoke and return
                    completion.resumeWith(outcome)
                    return
                }
            }
        }
    }

從代碼我們瞭解到invokeSuspend的返回值為COROUTINE_SUSPENDED時,resumeWith才算真正結束並通過調用completion.resumeWith(outcome)來喚醒父協程的執行。invokeSuspend返回值不是COROUTINE_SUSPENDED時,當前的協程體沒有執行完並等待下次通過本協程體的resumeWith喚醒再次執行。

四、suspend 函數是什麼

launch、async方法啓動的協程體就是一個suspend函數,他的定義如下:

val block: suspend CoroutineScope.() -> Unit = {
            Log.d("testCoroutineScope", "testCoroutineScope start")
            delay(2000)
            Log.d("testCoroutineScope", "testCoroutineScope end")
        }

它經過編譯後就是SuspendLambda對象。

那麼普通的suspend是怎麼樣的呢?

首先我們先看下這段測試代碼:

private fun testCoroutineScope2() {
        val block2: suspend CoroutineScope.() -> Unit = {
            testSuspendMethod()
        }
        lifecycleScope.launch(block = block2)
    }

    private suspend fun testSuspendMethod() {
        Log.d("testSuspendMethod", "testSuspendMethod start")
        delay(3000)
        Log.d("testSuspendMethod", "testSuspendMethod end")
    }

我們在協程中調用了掛起函數,並在掛起函數中調用了delay方法。這段代碼反編譯後的結果入下:

private final void testCoroutineScope2() {
      Function2 block2 = (Function2)(new Function2((Continuation)null) {
         int label;

         @Nullable
         public final Object invokeSuspend(@NotNull Object $result) {
            Object var2 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
            switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               CoroutineActivity var10000 = CoroutineActivity.this;
               this.label = 1;
               if (var10000.testSuspendMethod(this) == var2) {
                  return var2;
               }
               break;
            case 1:
               ResultKt.throwOnFailure($result);
               break;
            default:
               throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
            }

            return Unit.INSTANCE;
         }

         @NotNull
         public final Continuation create(@Nullable Object value, @NotNull Continuation completion) {
            Intrinsics.checkNotNullParameter(completion, "completion");
            Function2 var3 = new <anonymous constructor>(completion);
            return var3;
         }

         public final Object invoke(Object var1, Object var2) {
            return ((<undefinedtype>)this.create(var1, (Continuation)var2)).invokeSuspend(Unit.INSTANCE);
         }
      });
      BuildersKt.launch$default((CoroutineScope)LifecycleOwnerKt.getLifecycleScope(this), (CoroutineContext)null, (CoroutineStart)null, block2, 3, (Object)null);
   }

協程體的內容我們之前已經瞭解過了,但是調用掛起函數的地方有些不同。在調用掛起函數testSuspendMethod的時候多了一個this參數,實際聲明testSuspendMethod的時候沒有這個參數。我們再看下testSuspendMethod的反編譯結果:

private final Object testSuspendMethod(Continuation var1) {
      Object $continuation;
      label20: {
         if (var1 instanceof <undefinedtype>) {
            $continuation = (<undefinedtype>)var1;
            if ((((<undefinedtype>)$continuation).label & Integer.MIN_VALUE) != 0) {
               ((<undefinedtype>)$continuation).label -= Integer.MIN_VALUE;
               break label20;
            }
         }

         $continuation = new ContinuationImpl(var1) {
            // $FF: synthetic field
            Object result;
            int label;

            @Nullable
            public final Object invokeSuspend(@NotNull Object $result) {
               this.result = $result;
               this.label |= Integer.MIN_VALUE;
               return CoroutineActivity.this.testSuspendMethod(this);
            }
         };
      }

      Object $result = ((<undefinedtype>)$continuation).result;
      Object var4 = IntrinsicsKt.getCOROUTINE_SUSPENDED();
      switch(((<undefinedtype>)$continuation).label) {
      case 0:
         ResultKt.throwOnFailure($result);
         Log.d("testSuspendMethod", "testSuspendMethod start");
         ((<undefinedtype>)$continuation).label = 1;
         if (DelayKt.delay(3000L, (Continuation)$continuation) == var4) {
            return var4;
         }
         break;
      case 1:
         ResultKt.throwOnFailure($result);
         break;
      default:
         throw new IllegalStateException("call to 'resume' before 'invoke' with coroutine");
      }

      Log.d("testSuspendMethod", "testSuspendMethod end");
      return Unit.INSTANCE;
   }

testSuspendMethod多出的參數是continuation類型的,在調用的地方會把SuspendLambda對象傳遞過來。testSuspendMethod也通過狀態機的方式來管理掛起和喚醒,但是與協程體有所不同。協程體是SuspendLambda對象,協程體的狀態由SuspendLambda對象保存。testSuspendMethod方法的狀態通過ContinuationImpl對象來保存。

五、suspendCoroutineCancelable到底是什麼

我們對下面這段代碼進行反編譯

//原始代碼
private fun testCoroutineScope3() {
        val block: suspend CoroutineScope.() -> Unit = {
            Log.d("testCoroutineScope3", "testCoroutineScope3 start")

            val value = suspendCancellableCoroutine<Int> {
                Log.d("testCoroutineScope3", "suspendCancellableCoroutine start")
                Handler(Looper.getMainLooper()).postDelayed({ it.resume(202) }, 2000)
                Log.d("testCoroutineScope3", "suspendCancellableCoroutine end")
            }

            Log.d("testCoroutineScope3", "testCoroutineScope3 end value:$value")
        }
        lifecycleScope.launch(block = block)
    }
//編譯後的代碼,這裏只把suspendCancellableCoroutine調用的地方截取出來
switch(this.label) {
            case 0:
               ResultKt.throwOnFailure($result);
               Log.d("testCoroutineScope3", "testCoroutineScope3 start");
               $i$f$suspendCancellableCoroutine = false;
               this.L$0 = this;
               this.label = 1;
               int var6 = false;
               CancellableContinuationImpl cancellable$iv = new CancellableContinuationImpl(IntrinsicsKt.intercepted(this), 1);
               cancellable$iv.initCancellability();
               CancellableContinuation it = (CancellableContinuation)cancellable$iv;
               int var9 = false;
               Log.d("testCoroutineScope3", "suspendCancellableCoroutine start");
               (new Handler(Looper.getMainLooper())).postDelayed((Runnable)(new CoroutineActivity$testCoroutineScope3$block$1$value$1$1(it)), 2000L);
               Log.d("testCoroutineScope3", "suspendCancellableCoroutine end");
               var10000 = cancellable$iv.getResult();
               if (var10000 == IntrinsicsKt.getCOROUTINE_SUSPENDED()) {
                  DebugProbesKt.probeCoroutineSuspended(this);
               }

               if (var10000 == var10) {
                  return var10;
               }
               break;

從代碼可以看出suspendCancellableCoroutine的本質就是創建了一個CancellableContinuationImpl對象,這個對象維護的狀態機結束後會喚醒當前的狀態機。CancellableContinuationImpl的運行沒有結束時,getResult返回COROUTINE_SUSPENDED並使當前狀態機掛起。

簡單理解suspendCancellableCoroutine就是為用户提供了自定義的協程掛起方式,用户通過他可以方便的把線程的異步調用轉換成協程的掛起調用。有類似功能的還有suspendCoroutine方法。

六、總結

  • 協程實現的核心機制就是狀態機,協程中所有的掛起點都被分解成一個單個狀態,狀態機的每次執行都只能執行當前狀態對應的代碼並更改狀態機到下一個狀態,然後狀態機等待再次啓動,也就是所謂的協程掛起。當異步任務執行完成後會再次啓動狀態機,這時狀態機便執行下一個狀態的代碼,按照這樣的規律以此類推。
  • 協程可以嵌套調用,那麼就會生成狀態機嵌套的結構。狀態 機創建時可以指定狀態機執行完成後要執行的父狀態機,這樣保證了樹狀結構組織的狀態機能夠有序執行。
  • 協程中定義的狀態機對象都實現了Continuation接口,在協程的創建時,父協程也是通過Continuation接口把自己傳遞給子協程的。在調用Continuation接口的resumeWith方法便啓動協程執行,也就是啓動了狀態機。BaseContinuationImpl就是狀態機的模板類,控制着狀態機的執行流程,在狀態機結束時還會調用到父協程的狀態機對象。

我的公眾號已經開通,公眾號會同步發佈。
請關注我的公眾號

user avatar
0 位用戶收藏了這個故事!

發佈 評論

Some HTML is okay.