再談協程之 Suspend 到底掛起了啥
Kotlin編譯器會給每一個suspend函數生成一個狀態機來管理協程的執行。
Coroutines簡化了Android上的異步操作。正如文檔中所解釋的,我們可以用它們來管理異步任務,否則可能會阻塞主線程,導致你的應用程序Crash。
Coroutines也有助于用命令式的代碼取代基于回調的API。
作為例子,我們先看看這個使用回調的異步代碼。
- // Simplified code that only considers the happy path
- fun loginUser(userId: String, password: String, userResult: Callback<User>) {
- // Async callbacks
- userRemoteDataSource.logUserIn { user ->
- // Successful network request
- userLocalDataSource.logUserIn(user) { userDb ->
- // Result saved in DB
- userResult.success(userDb)
- }
- }
- }
這些回調可以使用coroutines轉換為順序的函數調用。
- suspend fun loginUser(userId: String, password: String): User {
- val user = userRemoteDataSource.logUserIn(userId, password)
- val userDb = userLocalDataSource.logUserIn(user)
- return userDb
- }
在coroutines代碼中,我們給函數添加了suspend修飾符。這將告訴編譯器,這個函數需要在一個coroutine內執行。作為一個開發者,你可以把suspend函數看作是一個普通的函數,但它的執行可能被掛起,并在某個時候恢復。
簡而言之,suspend就是一種編譯器生成的回調。
與回調不同的是,coroutines提供了一種在線程之間切換和處理異常的簡單方法。
但是,當我們把函數標記為suspend時,編譯器實際上在幕后做了什么?
Suspend到底做了什么
回到loginUser的suspend函數,注意它調用的其他函數也是suspend函數。
- suspend fun loginUser(userId: String, password: String): User {
- val user = userRemoteDataSource.logUserIn(userId, password)
- val userDb = userLocalDataSource.logUserIn(user)
- return userDb
- }
- // UserRemoteDataSource.kt
- suspend fun logUserIn(userId: String, password: String): User
- // UserLocalDataSource.kt
- suspend fun logUserIn(userId: String): UserDb
簡而言之,Kotlin編譯器將使用有限狀態機(我們將在后面介紹)把suspend函數轉換為優化版本的回調實現。你說對了,編譯器會幫你寫這些回調,它們的本質,依然是回調!
Continuation的真面目
suspend函數之間的通信方式是使用Continuation對象。一個Continuation只是一個帶有一些額外信息的通用回調接口。正如我們稍后將看到的,它將代表一個suspend函數的生成狀態機。
讓我們看一下它的定義。
- interface Continuation<in T> {
- public val context: CoroutineContext
- public fun resumeWith(value: Result<T>)
- }
context是在continuation中使用的CoroutineContext。
resumeWith用一個Result來恢復Coroutine的執行,這個Result可以包含一個導致suspend的計算結果的值或者是一個異常。
注意:從Kotlin 1.3開始,你還可以使用擴展函數resume(value: T)和resumeWithException(exception: Throwable),它們是resumeWith調用的特殊版本。
編譯器將使用函數簽名中的額外參數completion(Continuation類型)替換suspend修飾符,該參數將用于將suspend函數的結果傳達給調用它的coroutine。
- fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
- val user = userRemoteDataSource.logUserIn(userId, password)
- val userDb = userLocalDataSource.logUserIn(user)
- completion.resume(userDb)
- }
為了簡單起見,我們的例子將返回Unit而不是User。User對象將在添加的Continuation參數中被 "返回"。
suspend函數的字節碼實際上返回 Any? 因為它是 (T | COROUTINE_SUSPENDED)的聯合類型。這允許函數在可以時同步返回。
注意:如果你用suspend修飾符標記一個不調用其他suspend函數的函數,編譯器也會添加額外的Continuation參數,但不會對它做任何事情,函數體的字節碼看起來就像一個普通函數。
你也可以在其他地方看到Continuation接口。
當使用suspendCoroutine或suspendCancellableCoroutine將基于回調的API轉換為coroutine時(你應該總是傾向于使用這種方法),你直接與Continuation對象交互,以恢復在運行時被suspend的作為參數傳遞的代碼塊。
你可以使用suspend函數上的startCoroutine擴展函數來啟動一個coroutine。它接收一個Continuation對象作為參數,當新的coroutine完成時,無論是結果還是異常,都會被調用。
切換不同的Dispatchers
你可以在不同的Dispatchers之間進行交換,在不同的線程上執行計算。那么Kotlin如何知道在哪里恢復一個暫停的計算?
Continuation有一個子類型,叫做DispatchedContinuation,它的resume函數可以對CoroutineContext中可用的Dispatcher進行調度調用。除了Dispatchers.Unconfined的isDispatchNeeded函數覆蓋(在dispatch之前調用)總是返回false,所有Dispatcher都會調用dispatch。
在協程中,有個不成文的約定,那就是,suspend函數默認是不阻塞線程的,也就是說,suspend函數的調用者,不用為suspend函數運行在哪個線程而擔心,suspend函數會自己處理它工作的線程,不大部分時候,都是通過withContext來進行切換的。
生成狀態機
免責聲明:文章其余部分所展示的代碼將不完全符合編譯器所生成的字節碼。它將是足夠準確的Kotlin代碼,使你能夠理解內部真正發生的事情。這種表示法是由Coroutines 1.3.3版本生成的,在該庫的未來版本中可能會發生變化。
Kotlin編譯器將識別函數何時可以在內部suspend。每個suspend point都將被表示為有限狀態機中的一個狀態。這些狀態由編譯器用標簽表示,前面示例中的suspend函數在編譯后,會產生類似下面的偽代碼。
- fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
- // Label 0 -> first execution
- val user = userRemoteDataSource.logUserIn(userId, password)
- // Label 1 -> resumes from userRemoteDataSource
- val userDb = userLocalDataSource.logUserIn(user)
- // Label 2 -> resumes from userLocalDataSource
- completion.resume(userDb)
- }
為了更好地表示狀態機,編譯器將使用一個when語句來實現不同的狀態。
- fun loginUser(userId: String, password: String, completion: Continuation<Any?>) {
- when(label) {
- 0 -> { // Label 0 -> first execution
- userRemoteDataSource.logUserIn(userId, password)
- }
- 1 -> { // Label 1 -> resumes from userRemoteDataSource
- userLocalDataSource.logUserIn(user)
- }
- 2 -> { // Label 2 -> resumes from userLocalDataSource
- completion.resume(userDb)
- }
- else -> throw IllegalStateException(...)
- }
- }
編譯器將suspend函數編譯成帶有Continuation參數的方法叫做CPS(Continuation-Passing-Style)變換。
這段代碼是不完整的,因為不同的狀態沒有辦法分享信息。編譯器會在函數中使用相同的Continuation對象來做這件事。這就是為什么Continuation的泛型是Any? 而不是原始函數的返回類型(即User)。
此外,編譯器將創建一個私有類,1)持有所需的數據,2)遞歸地調用loginUser函數以恢復執行。你可以看看下面這個生成的類的近似值。
免責聲明:注釋不是由編譯器生成的。我添加它們是為了解釋它們的作用,并使跟隨代碼更容易理解。
- fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
- class LoginUserStateMachine(
- // completion parameter is the callback to the function
- // that called loginUser
- completion: Continuation<Any?>
- ): CoroutineImpl(completion) {
- // Local variables of the suspend function
- var user: User? = null
- var userDb: UserDb? = null
- // Common objects for all CoroutineImpls
- var result: Any? = null
- var label: Int = 0
- // this function calls the loginUser again to trigger the
- // state machine (label will be already in the next state) and
- // result will be the result of the previous state's computation
- override fun invokeSuspend(result: Any?) {
- this.result = result
- loginUser(null, null, this)
- }
- }
- ...
- }
由于invokeSuspend將僅用Continuation對象的信息來再次調用loginUser,loginUser函數簽名中的其余參數都變成了空值。在這一點上,編譯器只需要添加如何在狀態之間轉移的信息。
它需要做的第一件事是知道1)這是函數第一次被調用,或者2)函數已經從之前的狀態恢復。它通過檢查傳入的continuation是否是LoginUserStateMachine類型來實現。
- fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
- ...
- val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)
- ...
- }
如果是第一次,它將創建一個新的LoginUserStateMachine實例,并將收到的完成實例作為一個參數存儲起來,這樣它就能記住如何恢復調用這個實例的函數。如果不是這樣,它將只是繼續執行狀態機(suspend函數)。
現在,讓我們看看編譯器為在狀態間移動和在狀態間共享信息而生成的代碼。
- /* Copyright 2019 Google LLC.
- SPDX-License-Identifier: Apache-2.0 */
- fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
- ...
- val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)
- when(continuation.label) {
- 0 -> {
- // Checks for failures
- throwOnFailure(continuation.result)
- // Next time this continuation is called, it should go to state 1
- continuation.label = 1
- // The continuation object is passed to logUserIn to resume
- // this state machine's execution when it finishes
- userRemoteDataSource.logUserIn(userId!!, password!!, continuation)
- }
- 1 -> {
- // Checks for failures
- throwOnFailure(continuation.result)
- // Gets the result of the previous state
- continuation.user = continuation.result as User
- // Next time this continuation is called, it should go to state 2
- continuation.label = 2
- // The continuation object is passed to logUserIn to resume
- // this state machine's execution when it finishes
- userLocalDataSource.logUserIn(continuation.user, continuation)
- }
- ... // leaving out the last state on purpose
- }
- }
花點時間瀏覽一下上面的代碼,看看你是否能發現與前面的代碼片斷的不同之處。讓我們看看編譯器生成了什么。
- when語句的參數是LoginUserStateMachine實例中的Label。
- 每次處理一個新的狀態時,都會有一個檢查,以防這個函數suspend時發生異常。
- 在調用下一個suspend函數(即logUserIn)之前,LoginUserStateMachine實例的Label將被更新為下一個狀態。
- 當在這個狀態機內部有一個對另一個suspend函數的調用時,continuation的實例(LoginUserStateMachine類型)被作為一個參數傳遞。要調用的suspend函數也已經被編譯器轉化了,它是另一個像這樣的狀態機,它把一個continuation對象也作為參數!當那個suspend函數的狀態機完成后,它將恢復這個狀態機的執行。
最后一個狀態是不同的,因為它必須恢復調用這個函數的執行,正如你在代碼中看到的,它對存儲在LoginUserStateMachine中的cont變量(在構造時)調用resume。
- /* Copyright 2019 Google LLC.
- SPDX-License-Identifier: Apache-2.0 */
- fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
- ...
- val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)
- when(continuation.label) {
- ...
- 2 -> {
- // Checks for failures
- throwOnFailure(continuation.result)
- // Gets the result of the previous state
- continuation.userDb = continuation.result as UserDb
- // Resumes the execution of the function that called this one
- continuation.cont.resume(continuation.userDb)
- }
- else -> throw IllegalStateException(...)
- }
- }
正如你所看到的,Kotlin編譯器為我們做了很多事情!從這個suspend函數功能來舉例。
- suspend fun loginUser(userId: String, password: String): User {
- val user = userRemoteDataSource.logUserIn(userId, password)
- val userDb = userLocalDataSource.logUserIn(user)
- return userDb
- }
編譯器為我們生成了下面這一切。
- /* Copyright 2019 Google LLC.
- SPDX-License-Identifier: Apache-2.0 */
- fun loginUser(userId: String?, password: String?, completion: Continuation<Any?>) {
- class LoginUserStateMachine(
- // completion parameter is the callback to the function that called loginUser
- completion: Continuation<Any?>
- ): CoroutineImpl(completion) {
- // objects to store across the suspend function
- var user: User? = null
- var userDb: UserDb? = null
- // Common objects for all CoroutineImpl
- var result: Any? = null
- var label: Int = 0
- // this function calls the loginUser again to trigger the
- // state machine (label will be already in the next state) and
- // result will be the result of the previous state's computation
- override fun invokeSuspend(result: Any?) {
- this.result = result
- loginUser(null, null, this)
- }
- }
- val continuation = completion as? LoginUserStateMachine ?: LoginUserStateMachine(completion)
- when(continuation.label) {
- 0 -> {
- // Checks for failures
- throwOnFailure(continuation.result)
- // Next time this continuation is called, it should go to state 1
- continuation.label = 1
- // The continuation object is passed to logUserIn to resume
- // this state machine's execution when it finishes
- userRemoteDataSource.logUserIn(userId!!, password!!, continuation)
- }
- 1 -> {
- // Checks for failures
- throwOnFailure(continuation.result)
- // Gets the result of the previous state
- continuation.user = continuation.result as User
- // Next time this continuation is called, it should go to state 2
- continuation.label = 2
- // The continuation object is passed to logUserIn to resume
- // this state machine's execution when it finishes
- userLocalDataSource.logUserIn(continuation.user, continuation)
- }
- 2 -> {
- // Checks for failures
- throwOnFailure(continuation.result)
- // Gets the result of the previous state
- continuation.userDb = continuation.result as UserDb
- // Resumes the execution of the function that called this one
- continuation.cont.resume(continuation.userDb)
- }
- else -> throw IllegalStateException(...)
- }
- }
Kotlin編譯器將每個suspend函數轉化為一個狀態機,在每次函數需要suspend時使用回調進行優化。
現在你知道了編譯器在編譯時到底做了什么,你就可以更好地理解為什么一個suspend函數在它執行完所有工作之前不會返回。另外,你也會知道,代碼是如何在不阻塞線程的情況下進行suspend的——這是因為,當函數恢復時需要執行的信息被存儲在Continuation對象中!
向大家推薦下我的網站 https://xuyisheng.top/ 專注 Android-Kotlin-Flutter 歡迎大家訪問