在 Android 開發中使用協程 | 代碼實戰
本文是介紹 Android 協程系列中的第三部分,這篇文章通過發送一次性請求來介紹如何使用協程處理在實際編碼過程中遇到的問題。在閱讀本文之前,建議您先閱讀本系列的前兩篇文章,關于在 Android 開發中使用協程的背景介紹和上手指南。
使用協程解決實際編碼問題
前兩篇文章主要是介紹了如何使用協程來簡化代碼,在 Android 上保證主線程安全,避免任務泄漏。以此為背景,我們認為使用協程是在處理后臺任務和簡化 Android 回調代碼的絕佳方案。
目前為止,我們主要集中在介紹協程是什么,以及如何管理它們,本文我們將介紹如何使用協程來完成一些實際任務。協程同函數一樣,是在編程語言特性中的一個常用特性,您可以使用它來實現任何可以通過函數和對象能實現的功能。但是,在實際編程中,始終存在兩種類型的任務非常適合使用協程來解決:
- 一次性請求 (one shot requests) 是那種調用一下就請求一下,請求獲取到結果后就結束執行;
- 流式請求 (streaming request) 在發出請求后,還一直監聽它的變化并返回給調用方,在拿到第一個結果之后它們也不會結束。
協程對于處理這些任務是一個絕佳的解決方案。在這篇文章中,我們將會深入介紹一次性請求,并探索如何在 Android 中使用協程實現它們。
一次性請求
一次性請求會調用一次就請求一次,獲取到結果后就結束執行。這個模式同調用常規函數很像 —— 調用一次,執行,然后返回。正因為同函數調用相似,所以相對于流式請求它更容易理解。
一次性請求會調用一次就請求一次,獲取到結果后就結束執行。
舉例來說,您可以把它類比為瀏覽器加載頁面。當您點擊了這篇文章的鏈接后,瀏覽器向服務器發送了網絡請求,然后進行頁面加載。一旦頁面數據傳輸到瀏覽器后,瀏覽器就有了所有需要的數據,然后停止同后端服務的對話。如果服務器后來又修改了這篇文章的內容,新的更改是不會顯示在瀏覽器中的,除非您主動刷新了瀏覽器頁面。
盡管這樣的方式缺少了流式請求那樣的實時推送特性,但是它還是非常有用的。在 Android 的應用中您可以用這種方式解決很多問題,比如對數據的查詢、存儲或更新,它還很適用于處理列表排序問題。
問題:展示一個有序列表
我們通過一個展示有序列表的例子來探索一下如何構建一次性請求。為了讓例子更具體一些,我們來構建一個用于商店員工使用的庫存應用,使用它能夠根據上次進貨的時間來查找相應商品,并能夠以升序和降序的方式排列。因為這個倉庫中存儲的商品很多,所以對它們進行排序要花費將近 1 秒鐘,因此我們需要使用協程來避免阻塞主線程。
在應用中,所有的數據都會存儲到 Room 數據庫中。由于不涉及到網絡請求,因此我們不需要進行網絡請求,從而專注于一次性請求這樣的編程模式。由于無需進行網絡請求,這個例子會很簡單,盡管如此它仍然展示了該使用怎樣的模式來實現一次性請求。
為了使用協程來實現此需求,您需要在協程中引入 ViewModel、Repository 和 Dao。讓我們逐個進行介紹,看看如何把它們同協程整合在一起。
- class ProductsViewModel(val productsRepository: ProductsRepository): ViewModel() {
- private val _sortedProducts = MutableLiveData<List<ProductListing>>()
- val sortedProducts: LiveData<List<ProductListing>> = _sortedProducts
- /**
- * 當用戶點擊相應排序按鈕后,UI 進行調用
- */
- fun onSortAscending() = sortPricesBy(ascending = true)
- fun onSortDescending() = sortPricesBy(ascending = false)
- private fun sortPricesBy(ascending: Boolean) {
- viewModelScope.launch {
- // suspend 和 resume 使得這個數據庫請求是主線程安全的,所以 ViewModel 不需要關心線程安全問題
- _sortedProducts.value =
- productsRepository.loadSortedProducts(ascending)
- }
- }
- }
ProductsViewModel 負責從 UI 層接受事件,然后向 repository 請求更新的數據。它使用 LiveData 來存儲當前排序的列表數據,以供 UI 進行展示。當出現某個新事件時,sortProductsBy 會啟動一個新的協程對列表進行排序,當排序完成后更新 LiveData。在這種架構下,通常都是使用 ViewModel 啟動協程,因為這樣做的話可以在 onCleared 中取消所啟動的協程。當用戶離開此界面后,這些任務就沒必要繼續進行了。
- LiveData:https://developer.android.google.cn/topic/libraries/architecture/livedata
- ViewModel:https://developer.android.google.cn/topic/libraries/architecture/viewmodel
*如果您之前沒有用過 LiveData,您可以看看這篇由 @CeruleanOtter 寫的文章,它介紹了 LiveData 是如何為 UI 保存數據的 —— ViewModels: A Simple Example。
- @CeruleanOtter:https://twitter.com/CeruleanOtter
- ViewModels: A Simple Example:https://medium.com/androiddevelopers/viewmodels-a-simple-example-ed5ac416317e
這是在 Android 上使用協程的通用模式。由于 Android framework 不會主動調用掛起函數,所以您需要配合使用協程來響應 UI 事件。最簡單的方法就是來一個事件就啟動一個新的協程,最適合處理這種情況的地方就是 ViewModel 了。
在 ViewModel 中啟動協程是很通用的模式。
ViewModel 實際上使用了 ProductsRepository 來獲取數據,示例代碼如下:
- class ProductsRepository(val productsDao: ProductsDao) {
- /**
- 這是一個普通的掛起函數,也就是說調用方必須在一個協程中。repository 并不負責啟動或者停止協程,因為它并不負責對協程生命周期的掌控。
- 這可能會在 Dispatchers.Main 中調用,同樣它也是主線程安全的,因為 Room 會為我們保證主線程安全。
- */
- suspend fun loadSortedProducts(ascending: Boolean): List<ProductListing> {
- return if (ascending) {
- productsDao.loadProductsByDateStockedAscending()
- } else {
- productsDao.loadProductsByDateStockedDescending()
- }
- }
- }
ProductsRepository 提供了一個合理的同商品數據進行交互的接口,此應用中,所有內容都存儲在本地 Room 數據庫中,它為 @Dao 提供了針對不同排序具有不同功能的兩個接口。
repository 是 Android 架構組件中的一個可選部分,如果您在應用中已經集成了它或者其他的相似功能的模塊,那么它應該更偏向于使用掛起函數。因為 repository 并沒有生命周期,它僅僅是一個對象,所以它不能處理資源的清理工作,所以默認情況下,repository 中啟動的所有協程都有可能出現泄漏。
使用掛起函數除了避免泄漏之外,在不同的上下文中也可以重復使用 repository,任何知道如何創建協程的都可以調用 loadSortedProducts,例如 WorkManager 所調度管理的后臺任務就可以直接調用它。
repository 應該使用掛起函數來保證主線程安全。
注意: 當用戶離開界面后,有些在后臺中處理數據保存的操作可能還要繼續工作,這種情況下脫離了應用生命周期來運行是沒有意義的,所以大部分情況下 viewModelScope 都是一個好的選擇。
再來看看 ProductsDao,示例代碼如下:
- @Dao
- interface ProductsDao {
- // 因為這個方法被標記為了 suspend,Room 將會在保證主線程安全的前提下使用自己的調度器來運行這個查詢
- @Query("select * from ProductListing ORDER BY dateStocked ASC")
- suspend fun loadProductsByDateStockedAscending(): List<ProductListing>
- // 因為這個方法被標記為了 suspend,Room 將會在保證主線程安全的前提下使用自己的調度器來運行這個查詢
- @Query("select * from ProductListing ORDER BY dateStocked DESC")
- suspend fun loadProductsByDateStockedDescending(): List<ProductListing>
- }
ProductsDao 是一個 Room @Dao,它對外提供了兩個掛起函數,因為這些函數都增加了 suspend 修飾,所以 Room 會保證它們是主線程安全的,這也意味著您可以直接在 Dispatchers.Main 中調用它們。
*如果您沒有在 Room 中使用過協程,您可以先看看這篇由 @FMuntenescu 寫的文章: Room 🔗 Coroutines
- @FMuntenescu:https://twitter.com/FMuntenescu
- Room 🔗 Coroutines:https://medium.com/androiddevelopers/room-coroutines-422b786dc4c5
不過要注意的是,調用它的協程將會在主線程上執行。所以,如果您要對執行結果做一些比較耗時的操作,比如對列表內容進行轉換,您要確保這個操作不會阻塞主線程。
注意: Room 使用了自己的調度器在后臺線程上進行查詢操作。您不應該再使用 withContext(Dispatchers.IO) 來調用 Room 的 suspend 查詢,這只會讓您的代碼變復雜,也會拖慢查詢速度。
Room 的掛起函數是主線程安全的,并運行于自定義的調度器中。
一次性請求模式
這是在 Android 架構組件中使用協程進行一次性請求的完整模式,我們將協程添加到了 ViewModel、Repository 和 Room 中,每一層都有著不同的責任分工。
- ViewModel 在主線程上啟動了協程,一旦有結果后就結束執行;
- Repository 提供了保證主線程安全的掛起函數;
- 數據庫和網絡層提供了保證主線程安全的掛起函數。
ViewModel 負責啟動協程,并保證用戶離開了相應界面時它們就會被取消。它本身并不會做一些耗時的操作,而是依賴別的層級來做。一旦有了結果,就使用 LiveData 將數據發送到 UI 層。因為 ViewModel 并不做一些耗時操作,所以它是在主線程啟動協程的,以便能夠更快地響應用戶事件。
Repository 提供了掛起函數用來訪問數據,它通常不會啟動一些生命周期比較長的協程,因為它們一旦啟動了便無法取消。無論何時 Repository 想要做一些耗時操作,比如對列表內容進行轉換,都應該使用 withContext 來提供主線程安全的接口。
數據層 (網絡或數據庫) 總是會提供掛起函數,使用 Kotlin 協程的時候要保證這些掛起函數是主線程安全的,Room 和 Retrofit 都遵循了這一點。
在一次性請求中,數據層只提供掛起函數,調用方如果想要獲取最新的值,只能再次進行調用,這就像瀏覽器中的刷新按鈕一樣。
花點時間讓您了解一次性請求的模式是值得,它在 Android 協程中是比較通用的模式,您會一直用到它。
第一個 bug 出現了
在經過測試后,您部署到了生產環境,運行了幾周都感覺良好,直到您收到了一個很奇怪的 bug 報告:
標題:🐞 — 排序錯誤!
錯誤報告: 當我非常快速地點擊排序按鈕時,排序的結果偶爾是錯的,這還不是每次都能復現的🙃。
您研究了一下,不禁問自己哪里出錯了?這個邏輯很簡單:
- 開始執行用戶請求的排序操作;
- 在 Room 調度器中開始進行排序;
- 展示排序結果。
您覺得這個 bug 不存在準備關閉它,因為解決方案很簡單,"不要那么快地點擊按鈕",但是您還是很擔心,覺得還是哪個地方出了問題。于是在代碼中加入一些日志,并跑了一堆測試用例后,您終于知道問題出在什么地方了!
看起來應用內展示的排序結果并不是真正的 "排序結果",而是上一次完成排序的結果。當用戶快速點擊按鈕時,就會同時觸發多個排序操作,這些操作可能以任意順序結束。
當啟動一個新的協程來響應 UI 事件時,要去考慮一下用戶若在上一個任務未完成之前又開始了新的任務,會有什么樣的后果。
這其實是一個并發導致的問題,它和是否使用了協程其實沒有什么關系。如果您使用回調、Rx 或者是 ExecutorService,還是可能會遇到這樣的 bug。
有非常多方案能夠解決這個問題,既可以在 ViewModel 中解決,又可以在 Repository 中解決。我們來看看怎么才能讓一次性請求按照我們所期望的順序返回結果。
最佳解決方案:禁用按鈕
核心問題出在我們做了兩次排序,要修復的話我們可以只讓它排序一次。最簡單的解決方法就是禁用按鈕,不讓它發出新的事件就可以了。
這看起來很簡單,而且確實是個好辦法。實現起來的代碼也很簡單,還容易測試,只要它能在 UI 中體現出來這個按鈕的狀態,就完全可以解決問題。
要禁用按鈕,只需要告訴 UI 在 sortPricesBy 中是否有正在處理的排序請求,示例代碼如下:
- // 方案 0: 當有任何排序正在執行時,禁用排序按鈕
- class ProductsViewModel(val productsRepository: ProductsRepository): ViewModel() {
- private val _sortedProducts = MutableLiveData<List<ProductListing>>()
- val sortedProducts: LiveData<List<ProductListing>> = _sortedProducts
- private val _sortButtonsEnabled = MutableLiveData<Boolean>()
- val sortButtonsEnabled: LiveData<Boolean> = _sortButtonsEnabled
- init {
- _sortButtonsEnabled.value = true
- }
- /**
- 當用戶點擊排序按鈕時,調用
- */
- fun onSortAscending() = sortPricesBy(ascending = true)
- fun onSortDescending() = sortPricesBy(ascending = false)
- private fun sortPricesBy(ascending: Boolean) {
- viewModelScope.launch {
- // 只要有排序在進行,禁用按鈕
- _sortButtonsEnabled.value = false
- try {
- _sortedProducts.value =
- productsRepository.loadSortedProducts(ascending)
- } finally {
- // 排序結束后,啟用按鈕
- _sortButtonsEnabled.value = true
- }
- }
- }
- }
使用 sortPricesBy 中的 _sortButtonsEnabled 在排序時禁用按鈕
好了,這看起來還行,只需要在調用 repository 時在 sortPricesBy 內部禁用按鈕就好了。
大部分情況下,這都是最佳解決方案,但是如果我們想在保持按鈕可用的前提下解決 bug 呢?這樣的話有一點困難,在本文剩余的部分看看該怎么做。
注意: 這段代碼展示了從主線程啟動的巨大優勢,點擊之后按鈕立刻變得不可點了。但如果您換用了其他的調度程序,當出現某個手速很快的用戶在運行速度較慢的手機上操作時,還是可能出現發送多次點擊事件的情況。
并發模式
下面幾個章節我們探討一些比較高級的話題,如果您才剛剛接觸協程,可以不去理解這一部分,使用禁用按鈕這一方案就是解決大部分類似問題的最佳方案。
在剩余部分我們將探索在不禁用按鈕的前提下,確保一次性請求能夠正常運行。我們可以通過控制何時讓協程運行 (或者不運行) 來避免剛剛出現的并發問題。
有三個基本的模式可以讓我們確保在同一時間只會有一次請求進行:
- 在啟動更多協程之前取消之前的任務;
- 讓下一個任務排隊等待前一個任務執行完成;
- 如果有一個任務正在執行,返回該任務,而不是啟動一個新的任務。
當介紹完這三個方案后,您可能會發現它們的實現都挺復雜的。為了專注于設計模式而不是實現細節,我創建了一個 gist 來提供這三個模式的實現作為可重用抽象 。
方案 1:取消之前的任務
在排序這種情況下,獲取新的事件后就意味著可以取消上一個排序任務了。畢竟用戶通過這樣的行為已經表明了他們不想要上次的排序結果了,繼續進行上一次排序操作沒什么意義了。
要取消上一個請求,我們首先要以某種方式追蹤它。在 gist 中的 cancelPreviousThenRun 函數就做到了這個。
來看看如何使用它修復這個 bug:
- // 方案 1: 取消之前的任務
- // 對于排序和過濾的情況,新請求進來,取消上一個,這樣的方案是很適合的。
- class ProductsRepository(val productsDao: ProductsDao, val productsApi: ProductsService) {
- var controlledRunner = ControlledRunner<List<ProductListing>>()
- suspend fun loadSortedProducts(ascending: Boolean): List<ProductListing> {
- // 在開啟新的排序之前,先取消上一個排序任務
- return controlledRunner.cancelPreviousThenRun {
- if (ascending) {
- productsDao.loadProductsByDateStockedAscending()
- } else {
- productsDao.loadProductsByDateStockedDescending()
- }
- }
- }
- }
使用 cancelPreviousThenRun 來確保同一時間只有一個排序任務在進行
看一下 gist 中 cancelPreviousThenRun 中的代碼實現,您可以學習到如何追蹤正在工作的任務。
- // see the complete implementation at
- // 在 https://gist.github.com/objcode/7ab4e7b1df8acd88696cb0ccecad16f7 中查看完整實現
- suspend fun cancelPreviousThenRun(block: suspend () -> T): T {
- // 如果這是一個 activeTask,取消它,因為它的結果已經不需要了
- activeTask?.cancelAndJoin()
- // ...
簡而言之,它會通過成員變量 activeTask 來保持對當前排序的追蹤。無論何時開始一個新的排序,都立即對當前 activeTask 中的所有任務執行 cancelAndJoin 操作。這樣會在開啟一次新的排序之前就會把正在進行中的排序任務給取消掉。
使用類似于 ControlledRunner 這樣的抽象實現來對邏輯進行封裝是比較好的方法,比直接混雜并發與應用邏輯要好很多。
選擇使用抽象來封裝代碼邏輯,避免混雜并發和應用邏輯代碼。
注意: 這個模式不適合在全局單例中使用,因為不相關的調用方是不應該相互取消。
- gist:https://gist.github.com/objcode/7ab4e7b1df8acd88696cb0ccecad16f7#file-concurrencyhelpers-kt-L19
- 代碼實現:https://gist.github.com/objcode/7ab4e7b1df8acd88696cb0ccecad16f7#file-concurrencyhelpers-kt-L91
- cancelAndJoin:https://kotlin.github.io/kotlinx.coroutines/kotlinx-coroutines-core/kotlinx.coroutines/cancel-and-join.html
方案 2::讓下一個任務排隊等待
這里有一個對并發問題總是有效的解決方案。
讓任務去排隊等待依次執行,這樣同一時間就只會有一個任務會被處理。就像在商場里進行排隊,請求將會按照它們排隊的順序來依次處理。
對于這種特定的排序問題,其實選擇方案 1 比使用本方案要更好一些,但還是值得介紹一下這個方法,因為它總是能夠有效的解決并發問題。
- // 方案 2: 使用互斥鎖
- // 注意: 這個方法對于排序或者是過濾來說并不是一個很好的解決方案,但是它對于解決網絡請求引起的并發問題非常適合。
- class ProductsRepository(val productsDao: ProductsDao, val productsApi: ProductsService) {
- val singleRunner = SingleRunner()
- suspend fun loadSortedProducts(ascending: Boolean): List<ProductListing> {
- // 開始新的任務之前,等待之前的排序任務完成
- return singleRunner.afterPrevious {
- if (ascending) {
- productsDao.loadProductsByDateStockedAscending()
- } else {
- productsDao.loadProductsByDateStockedDescending()
- }
- }
- }
- }
無論何時進行一次新的排序, 都使用一個 SingleRunner 實例來確保同時只會有一個排序任務在進行。
它使用了 Mutex,可以把它理解為一張單程票 (或是鎖),協程在必須要獲取鎖才能進入代碼塊。如果一個協程在運行時,另一個協程嘗試進入該代碼塊就必須掛起自己,直到所有的持有 Mutex 的協程完成任務,并釋放 Mutex 后才能進入。
Mutex 保證同時只會有一個協程運行,并且會按照啟動的順序依次結束。
- Mutex:https://gist.github.com/objcode/7ab4e7b1df8acd88696cb0ccecad16f7#file-concurrencyhelpers-kt-L49
方案 3:復用前一個任務
第三種可以考慮的方案是復用前一個任務,也就是說新的請求可以重復使用之前存在的任務,比如前一個任務已經完成了一半進來了一個新的請求,那么這個請求直接重用這個已經完成了一半的任務,就省事很多。
但其實這種方法對于排序來說并沒有多大意義,但是如果是一個網絡數據請求的話,就很適用了。
對于我們的庫存應用來說,用戶需要一種方式來從服務器獲取最新的商品庫存數據。我們提供了一個刷新按鈕這樣的簡單操作來讓用戶點擊一次就可以發起一次新的網絡請求。
當請求正在進行時,禁用按鈕就可以簡單地解決問題。但是如果我們不想這樣,或者說不能這樣,我們就可以選擇這種方法復用已經存在的請求。
查看下面的來自 gist 的使用了 joinPreviousOrRun 的示例代碼:
- class ProductsRepository(val productsDao: ProductsDao, val productsApi: ProductsService) {
- var controlledRunner = ControlledRunner<List<ProductListing>>()
- suspend fun fetchProductsFromBackend(): List<ProductListing> {
- // 如果已經有一個正在運行的請求,那么就返回它。如果沒有的話,開啟一個新的請求。
- return controlledRunner.joinPreviousOrRun {
- val result = productsApi.getProducts()
- productsDao.insertAll(result)
- result
- }
- }
- }
上面的代碼行為同 cancelPreviousAndRun 相反,它會直接使用之前的請求而放棄新的請求,而 cancelPreviousAndRun 則會放棄之前的請求而創建一個新的請求。如果已經存在了正在運行的請求,它會等待這個請求執行完成,并將結果直接返回。只有不存在正在運行的請求時才會創建新的請求來執行代碼塊。
您可以在 joinPreviousOrRun 開始時看到它是如何工作的,如果 activeTask 中存在任何正在工作的任務,就直接返回它。
- // 在 https://gist.github.com/objcode/7ab4e7b1df8acd88696cb0ccecad16f7#file-concurrencyhelpers-kt-L124 中查看完整實現
- suspend fun joinPreviousOrRun(block: suspend () -> T): T {
- // 如果存在 activeTask,直接返回它的結果,并不會執行代碼塊
- activeTask?.let {
- return it.await()
- }
- // ...
這個模式很適合那種通過 id 來查詢商品數據的請求。您可以使用 map 來建立 id 到 Deferred 的映射關系,然后使用相同的邏輯來追蹤同一個產品之前的請求數據。
直接復用之前的任務可以有效避免重復的網絡請求。
- joinPreviousOrRun:https://gist.github.com/objcode/7ab4e7b1df8acd88696cb0ccecad16f7#file-concurrencyhelpers-kt-L124
下一步
在這篇文章中,我們探討了如何使用 Kotlin 協程來實現一次性請求。我們實現了如何在 ViewModel 中啟動協程,然后在 Repository 和 Room Dao 中提供公開的 suspend function,這樣形成了一個完整的編程范式。
對于大部分任務來說,在 Android 上使用 Kotlin 協程按照上面這些方法就已經足夠了。這些方法就像上面所說的排序一樣可以應用在很多場景中,您也可以使用這些方法來解決查詢、保存、更新網絡數據等問題。
然后我們探討了一下可能出現 bug 的地方,并給出了解決方案。最簡單 (往往也是最好的) 的方案就是從 UI 上直接更改,排序運行時直接禁用按鈕。
最后,我們探討了一些高級并發模式,并介紹了如何在 Kotlin 協程中實現它們。雖然這些代碼有點復雜,但是為一些高級協程方面的話題做了很好的介紹。
【本文是51CTO專欄機構“谷歌開發者”的原創稿件,轉載請聯系原作者(微信公眾號:Google_Developers)】