DDIA:批處理和 MPP 數據庫千絲萬縷
批處理工作流的輸出
我們已經討論了串起 MapReduce 工作流的一些算法,但我們忽略了一個重要的問題:當工作流結束后,處理結果是什么?我們一開始是為什么要跑這些任務來著?
對于數據庫查詢場景,我們會區分事務型處理場景(OLTP)和分析性場景(OLAP,參見事務型還是分析型)。我們觀察到,OLTP 場景下的查詢通常只會涉及很小的一個數據子集,因此通常會使用索引加速查詢,然后將結果展示給用戶(例如,使用網頁展示)。另一方面,分析型查詢通常會掃描大量的數據記錄,執行分組(grouping)和聚集(aggregating)等統計操作,然后以報表的形式呈現給用戶:比如某個指標隨時間的變化曲線、依據某種排序方式的前十個數據條目、將數據按子類分解并統計其分布。這些報表通常會用于輔助分析員或者經理進行商業決策。
那批處理處于一個什么位置呢?它既不是事務型,也不是分析型。當讓,從輸入數據量的角度來說,批處理更接近分析型任務。然而,一組 MapReduce 任務組成的執行流通常和用于分析型的 SQL 查詢并不相同(參見 Hadoop 和分布式數據庫的對比)。批處理的輸出通常不是一個報表,而是另外某種格式的數據。
構建查詢索引
谷歌發明 MapReduce 大數據處理框架的最初動機就是解決搜索引擎的索引問題,開始時通過 5~10 個 MapReduce 工作流來為搜索引擎來構建索引。盡管谷歌后面將 MapReduce 使用拓展到了其他場景,仔細考察構建搜索引擎索引的過程,有助于深入地了解 MapReduce(當然,即使到今天, Hadoop MapReduce 仍不失為一個給 Lucene/Solr 構建索引的好辦法)。
我們在“全文索引和模糊索引”一節粗策略的探討過像 Lucene 這樣的全文索引引擎是如何工作的:倒排索引是一個詞表(the term dictionary),利用該詞表,你可以針對關鍵詞快速地查出對應文檔列表(the postings list)。當然,這是一個很簡化的理解,在實踐中,索引還需要很多其他信息,包括相關度,拼寫訂正,同義詞合并等等,但其背后的原理是不變的。
如果你想在一個固定文檔集合上構建全文索引,批處理非常合適且高效:
- Mapper 會將文檔集合按合適的方式進行分區
- Reducer 會對每個分區構建索引
- 最終將索引文件寫回分布式文件系統
構建這種按文檔分區(document-partitioned,與 term-partitioned 相對,參見分片和次級索引)的索引,可以很好地并發生成。由于使用關鍵詞進行索引查詢是一種只讀操作,因此,這些索引文件一旦構建完成,就是不可變的(immutable)。
如果被索引的文檔集發生變動,一種應對策略是,定期針對所有文檔重跑全量索引構建工作流(workflow),并在索引構建完時使用新的索引對舊的進行整體替換。如果兩次構建之間,僅有一小部分文檔發生了變動,則這種方法代價實在有點高。但也有優點,索引構建過程很好理解:文檔進去,索引出來。
當然,我們也可以增量式的構建索引。我們在第三章討論過,如果你想增加、刪除或者更新文檔集,Lucene 就會構建新的索引片段,并且異步地將其與原有索引進行歸并(merge)和壓實(compact)。我們將會在第十一章就增量更新進行更深入的討論。
以 KV 存儲承接批處理輸出
搜索索引只是批處理工作流一種可能的輸出。批處理其他的用途還包括構建機器學習系統,如分類器(classifiers,如 垃圾郵件過濾,同義詞檢測,圖片識別)和推薦系統(recommendation system,如你可能認識的人,可能感興趣的產品或者相關的檢索)。
這些批處理任務的輸出通常在某種程度是數據庫:如,一個可以通過用戶 ID 來查詢其可能認識的人列表的數據庫,或者一個可以通過產品 ID 來查詢相關產品的數據庫。
web 應用會查詢這些數據庫來處理用戶請求,這些應用通常不會跟 Hadooop 生態部署在一塊。那么,如何讓批處理的輸出寫回數據庫,以應對 web 應用的查詢?
最直觀的做法是,在 Mapper 或者 Reducer 代碼邏輯中,使用相關數據庫的客戶端庫,將 Mapper 或者 Reducer 的輸出直接寫入數據庫服務器,每次一個記錄。這種方式能夠工作(須假設防火墻允許我們直接從 Hadoop 中訪問生產環境的數據庫服務器),但往往并不是一個好的做法:
- 吞吐不匹配。正如之前 join 一節中所討論的,通過網絡一條條寫入記錄的吞吐要遠小于一個批處理任務的吞吐。即使數據庫的客戶端通常支持將多個 record 寫入 batch 成一個請求,性能仍然會比較差。
- 數據庫過載。一個 MapReduce 任務通常會并行地跑很多個子任務。如果所有 Mapper 和 Reducer ,以批處理產生輸出的速率,并發地將輸出寫到同一個數據庫,則該數據庫很容會被打爆(overwhelmed)。此時,其查詢性能大概率非常差,也因此難以對外提供正常服務,從而給系統的其他組件帶來運維問題。
- 可能產生副作用。通常來說,MapReduce 對外提供簡單的“全有或全無(all-or-nothing)”的輸出保證:如果整個任務成功,即使子任務一時失敗重試,但最終的輸出也會看起來像運行了一次;如果整個任務失敗,則沒有任何輸出。但直接從任務內部將輸出寫入外部服務,會產生外部可見的副作用。在這種情況下,你就必須考慮任務的部分成功狀態可能會暴露給其他系統,并要理解 Hadoop 內部重試和推測執行的復雜機制。
一個更好的方案是,在批處理任務內部生成全新的數據庫,并將其以文件的形式寫入分布式系統的文件夾中。一旦任務成功執行,這些數據文件就會稱為不可變的(immutable),并且可以批量加載(bulk loading)進只處理只讀請求的服務中。很多 KV 存儲都支持使用 MapReduce 任務構建數據庫文件,比如 Voldemort,Terrapin, ElephantDB 和 HBase bulk loading。另外 RocksDB 支持 ingest SST 文件,也是類似的情況。
直接構建數據庫底層文件,就是一個 MapReduce 應用的絕佳案例:使用 Mapper 抽取 key,然后利用該 key 進行排序,已經覆蓋了構建索引中的大部分流程。由于大部 KV 存儲都是只讀的(通過批處理任務一次寫入后,即不可變),這些存儲的底層數據結構可以設計的非常簡單。例如,不需要 WAL(參見讓 B 樹更可靠)。
當數據加載進 Voldemort 時,服務器可以利用老文件繼續對外提供服務,新文件會從分布式文件系統中拷貝的 Voldemort 服務本地。一旦拷貝完成,服務器可以立即將外部查詢請求原子地切到新文件上。如果導入過程中發生了任何問題,也可以快速地切回,使用老文件提供服務。因為老文件是不可變的,且沒有立即被刪除。
批處理輸出的哲學
本章稍早我們討論過 Unix 的設計哲學,它鼓勵在做實驗時使用顯式的數據流:每個程序都會讀取輸入,然后將輸出寫到其他地方。在這個過程中,輸入保持不變,先前的輸出被變換為新的輸出,并且沒有任何其他的副作用。這意味著,你可以任意多次的重新跑一個命令,每次可以對命令或者參數進行下微調,或者查看中間結果進行調試,而不用擔心對你原來系統的狀態造成任何影響。
MapReduce 任務在處理輸出時,遵從同樣的哲學。通過不改變輸入、不允許副作用(比如輸出到外部文件),批處理不僅可以獲得較好的性能,同時也變得容易維護:
- 容忍人為錯誤。如果你在代碼中不小心引入了 bug,使得輸出出錯,你可以簡單地將代碼回滾到最近一個正確的版本,然后重新運行任務,則輸出就會變正確?;蛘?,更簡單地,你可將之前正確的輸出保存在其他的文件夾,然后在遇到問題時簡單的切回去即可。使用讀寫事務的數據庫是沒法具有這種性質的:如果你部署了有 bug 的代碼,并且因此往數據庫中寫入了錯誤的數據,回滾代碼版本也并不能修復這些損壞的數據。(從有 bug 的代碼中恢復,稱為容忍人為錯誤,human fault tolerance)。這其實是通過犧牲空間換來的,也是經典的增量更新而非原地更新。
- 便于敏捷開發。相比可能會造成不可逆損壞的環境,由于能夠很方便地進行回滾,可以大大加快功能迭代的速度(因為不需要進行嚴密的測試即可上生產)。最小化不可逆性(minimizing irreversibility)的原則,有助于敏捷軟件開發。
- 簡單重試就可以容錯。如果某個 map 或者 reduce 任務失敗了,MapReduce 框架會自動在相同輸入上對其重新調度。如果失敗是由代碼 bug 引起的,在重試多次后(可以設置某個閾值),會最終引起任務失??;但如果失敗是暫時的,該錯誤就能夠被容忍。這種自動重試的機制之所以安全,是因為輸入是不可變的,且失敗子任務的輸出會被自動拋棄。
- 數據復用。同一個文件集能夠作為不同任務的輸入,包括用于計算指標的監控任務、評估任務的輸出是否滿足預期性質(如,和之前一個任務的比較并計算差異)。
- 邏輯布線分離。和 Unix 工具一樣,MapReduce 也將邏輯和接線分離(通過配置輸入、輸出文件夾),從而分拆復雜度并且提高代碼復用度:一些團隊可以專注于實現干好單件事的任務開發;另一些團隊可以決定在哪里、在何時來組合跑這些代碼。
在上述方面,Unix 中用的很好地一些設計原則也適用 Hadoop——但 Unix 工具和 Hadoop 也有一些不同的地方。比如,大部分 Unix 工具假設輸入輸出是無類型的文本,因此不得不花一些時間進行輸入解析(比如之前的例子中,需要按空格分割,然后取第 7 個字段,以提取 URL)。在 Hadoop 中,通過使用更結構化的數據格式,消除了底層的一些低價值的語法解析和轉換:Avro (參見Avro)和 Parquet 是較常使用的兩種編碼方式,他們提供基于模式的高效編碼方式,并且支持模式版本的演進。
對比 Hadoop 和分布式數據庫
從之前討論我們可以感覺到,Hadoop 很像一個分布式形態的 Unix。其中,HDFS 對標 Unix 中的文件系統,MapReduce 類似于 Unix 進程的一個奇怪實現(在 map 階段和 reduce 階段間必須要進行排序)。在這些源語之上,我們可以實現各種 join 和 group 語義。
MapReduce 被提出時,并非是一種全新的思想。在其十多年前,所有前述小節我們提到的一些并行 join 算法都已經被 MPP (massive parallel processing)數據庫所實現了。如,Gamma data base machine、Teradata 和 Tandem NonStop SQL 都是這個領域的先驅。
當然,如果硬要區分的話:
- MPP 數據庫是在一組機器上分布式地、并行執行分析型的 SQL
- MapReduce 和分布式文件系統提供了一種類似于操作系統的、更為通用的計算方式
存儲類型更為多樣
數據庫要求用戶遵循特定的模式(schema,數據模型,如關系型或者文檔型)組織數據,但分布式系統中的文件是面向字節序列(byte arrary,即內容對于系統是黑盒),用戶可以使用任何必要的方式進行建模和編碼。因此,這些文件既可以是數據庫記錄的集合,也可以是文本、圖像、視頻、傳感器數值、稀疏矩陣、特征向量、基因序列,或者其他任意類型的數據。
換句話說,Hadoop 允許你以任意格式的數據灌入 HDFS,將如何處理的靈活性推到之后(對應之前討論過的 schema-less,或者 schema-on-read )。與之相反,MPP 數據庫通常要求用戶在數據導入之前,就要針對數據類型和常用查詢模式,進行小心的建模(對應 schema-on-write)。
從完美主義者的角度來說,事先對業務場景進行仔細地建模再導入數據才是正道。只有這樣,數據庫用戶才能夠得到更高質量的數據。然而在實踐中,即便不管模式快速導入數據,可能會讓數據處于奇怪、難用、原始的格式,反而會比事先規劃、考究建模后將限制死格式更為有價值。
這種思想和數據庫倉庫很像:在大型組織中,將從不同部門來的數據快速聚集到一塊非常重要,因為這提供了將原先分離的數據進行聯結(join)的各種可能性。MPP 數據庫所要求的小心精確地建模,會嚴重拖慢中心化數據的速度。以原始格式將數據聚集到一塊,之后再去考慮如何進行建模,可以大大加速數據收集速度(這個概念有時也被稱為數據湖,data lake,或者企業數據中心,enterprise data hub)。
無腦數據導入其實是將數據理解的復雜度進行了轉移:數據生產者無需關心數據會被如何使用,這是數據消費者的問題(類似讀時模式,參見文檔模型中 Schema 的靈活性)。在數據生產者和消費者處于不同團隊、具有不同優先級時,這種方式的優勢非常明顯。因為可能沒有一種通用的理想模型,出于不同目的,會有不同的看待數據方式。將數據以原始方式導入,允許之后不同消費者進行不同的數據變換。這種方式被總結為 sushi 原則:數據越原始越好。
因此 Hadoop 經常用于 ETL 處理:將數據以某種原始的格式從事務型的處理系統中引入到分布式文件系統中,然后編寫 MapReduce 任務以處理這些數據,將其轉換回關系形式,進而導入到 MPP 數據倉庫匯總以備進一步分析之用。數據建模依然存在,但是拆解到了其他的步驟,從而與數據收集解耦了開來。由于分布式文件系統不關心應用方以何種方式對數據進行編碼,僅面向字節數組存儲,讓這種解耦成為了可能。
處理模型更為多樣
MPP 數據庫是一種將硬盤上的存儲布局、查詢計劃生成、調度和執行等功能模塊緊密糅合到一塊的整體式軟件。這些組件都可以針對數據庫的特定需求進行調整和優化,針對目標查詢類型,系統在整體上可以獲得很好的性能。此外,SQL 作為一種聲明式的查詢語言, 表達能力很強、語義簡潔優雅,讓人可以通過圖形界面而無需編寫代碼就可以完成對數據進行訪問。
但從另外的角度來說,并非所有類型的數據處理需求都可以合理地表達為 SQL 查詢。例如,如果你想要構建機器學習和推薦系統、支持相關性排序的全文索引引擎、進行圖像分析,則可能需要更為通用的數據處理模型。這些類型的數據處理構建通常都和特定應用強耦合(例如,用于機器學習的特征工程、用于機器翻譯的自然語言模型、用于欺詐預測的風險預估函數),因此不可避免地需要用通用語言寫代碼來實現,而不能僅僅寫一些查詢語句。
MapReduce 使工程師能夠在大型數據集尺度上輕松的運行自己的代碼(而不用關心底層分布式的細節)。如果你已經有 HDFS 集群和 MapReduce 計算框架,你可以基于此構建一個 SQL 查詢執行引擎, Hive 項目就是這么干的。當然,對于一些不適合表達為 SQL 查詢的處理需求,也可以基于 Hadoop 平臺來構建一些其他形式的批處理邏輯。
但后來人們又發現,對于某些類型的數據處理, MapReduce 限制太多、性能不佳,因此基于 Hadoop開發了各種其他的處理模型(在之后 “MapReduce 之外”小節中會提到一些)。僅有 SQL 和 MapReduce 這兩種處理模型是不夠的,我們需要更多的處理模型!由于Hadoop平臺的開放性,我們可以較為容易實現各種處理模型。然而,在 MPP 數據庫的限制下,我們想支持更多處理模型基本是不可能的。
更為重要的是,基于 Hadoop 實現的各種處理模型可以共享集群并行運行,且不同的處理模型都可以訪問 HDFS 上的相同文件。在 Hadoop 生態中,無需將數據在不同的特化系統間倒來倒去以進行不同類型的處理:Hadoop 系統足夠開放,能夠以單一集群支持多種負載類型。無需移動數據讓我們更容易的從數據中挖掘價值,也更容易開發新的處理模型。
Hadoop 生態系統既包括隨機訪問型的 OLTP 數據庫,如HBase(參見“SSTables和LSM-Trees”),也包括 MPP 風格的分析型數據庫,例如 Impala。HBase 和 Impala 都不依賴 MapReduce 進行計算,但兩者都使用 HDFS 作為底層存儲。它們訪問數據和處理數據的方式都非常不同,但卻可以神奇的并存于 Hadoop 生態中。
面向頻繁出錯設計
在對比 MapReduce 和 MPP 數據庫時,我們會發現設計思路上的兩個顯著差異:
- 故障處理方式:取決于對處理成本、故障頻次的假設
- 內存磁盤使用:取決于對數據量的假設
相對在線系統,批處理系統對故障的敏感性要低一些。如果批處理任務失敗,并不會立即影響用戶,而且可以隨時重試。
如果在執行查詢請求時節點崩潰,大多數 MPP 數據庫會中止整個查詢,并讓用戶進行重試或自動重試。由于查詢運行時通常會持續數秒或數分鐘,這種簡單粗暴的重試的處理錯誤的方式還可以接受,畢竟成本不算太高。MPP 數據庫還傾向將數據盡可能地存在內存里(例如在進行 HashJoin 的 HashBuild 時),以避免讀取磁盤的額外損耗。
與之相對,MapReduce 在遇到某個 map 或 reduce 子任務運行出錯時,可以單獨、自動地進行重試,而不會引起整個 MapReduce 任務的重試。此外,MapReduce 傾向于將數據(甚至是 map 到 reduce 中間環節的數據)進行落盤,一方面是為了容錯,另一方面是因為 MapReduce 在設計時假設面對的數據量足夠大,內存通常裝不下。
因此,MapReduce 通常更適合大任務:即那些需要處理大量數據、運行較長時間的任務。而巨量的數據、過長的耗時,都會使得處理過程中遇到故障司空見慣。在這種情況下,由于一個子任務(task) 的故障而重試整個任務(job) 就非常得不償失。當然,即使只在子任務粒度進行重試,也會讓那些并不出錯的任務運行的更慢(數據要持久化)。但對于頻繁出錯的任務場景來說,這個取舍是合理的。
但這種假設在多大程度上是正確的呢?在大多數集群中,機器確實會故障,但非常低頻——甚至可以低到大多任務在運行時不會遇到任何機器故障。在這種情況下,為了容錯引入的巨量額外損耗值得嗎?
為了理解 MapReduce 克制使用內存、細粒度重試的設計原因,我們需要回顧下 MapReduce 的誕生歷程。當時谷歌內部的數據中心很多都是共享使用的——集群中的同一個機器上,既有在線的生產服務,也有離線的批處理任務。每個任務使用容器(虛擬化)的方式進行(CPU、RAM、Disk Space)資源預留。不同任務之間存在優先級,如果某個高優先級的任務需要更多資源,則該任務所在機器上的低優先級任務可能就會被干掉以讓出資源。當然,優先級是和計算資源的價格掛鉤的:團隊需要為用到的資源付費,高優先級的資源要更貴。
這種架構設計的好處是,可以面向非線上服務超發(overcommitted)資源(這也是云計算賺錢的理由之一)。因為系統通過優先級跟用戶約定了,在必要時這些超發的資源都可以被回收。相比在線離線服務分開部署,這種混合部署、超發資源的方式能夠更加充分的利用機器資源。當然代價就是,以低優先級運行的 MapReduce 的任務可能會隨時被搶占。通過這種方式,批處理任務能夠充分地利用在線任務等高優先級任務留下的資源碎片。
統計來說,在谷歌當時集群中,為了讓位給高優先級任務,持續一小時左右 MapReduce 子任務大約有 5% 的概率被中止。這個概率大概比由于硬件問題、機器重啟和其他原因造成的子任務重啟要高一個數量級。在這種搶占率下,對于一個包含 100 個子任務、每個子任務持續 10 分鐘的 MapReduce 任務來說,在運行過程中,有超過一半的概率會發生至少一個子任務被中止。
這就是為什么 MapReduce 面向頻繁異常中止設計的原因:不是為了解決硬件的故障問題,而是給了系統隨意中止子任務的自由,進而在總體上提高計算集群的資源利用率。
但在開源的集群調度系統中,可搶占調度并不普遍。YARN 的 CapacityScheduler 支持搶占以在不同隊列間進行資源的均衡,但到本書寫作時,YARN、Mesos、Kubernetes 都不支持更為通用的按優先級搶占調度。在搶占不頻繁的系統中,MapReduce 這種設計取舍就不太有價值了。在下一節,我們會考察一些做出不同取舍的 MapReduce 的替代品。
參考資料
[1]DDIA 讀書分享會: https://ddia.qtmuniao.com/