Apache Celeborn 社區的今天和明天
一、Celeborn 是什么
1、計算引擎中間數據帶來的挑戰
大數據計算就是數據的轉換和流轉。
數據的轉換指的是數據發生變化、產生新的數據或者被過濾掉等。具體的實現方式包括算子和 UDF,如常見的 Filter、Project、Aggregation、Join,內置 function,自定義 UDF 等。
數據流轉包括兩種:
一種是持久化數據的流轉,通常是表數據。一個大數據計算作業從對象存儲或分布式存儲讀取結構化、半結構化或者非結構化的數據,進行各種分布式計算處理,最后寫回存儲系統,這就是持久化數據的流轉。這種數據流轉目前主要由湖格式來完成。
另一種是中間(臨時)數據流轉。這里的中間數據指的是大數據計算引擎在執行某個作業的過程中所產生的臨時數據,一般是由于內存中放不下溢出到外部存儲,在作業完成之后可以安全刪除。典型的中間數據有三種:Shuffle 數據、Spill 數據、Cache 數據。
Apache Celeborn 正是專注于中間數據的處理,以提高這些數據的流轉效率。
在 Celeborn 誕生之前,大數據計算引擎在中間數據的處理上存在著諸多挑戰。下面以 Shuffle 為例說明。Shuffle 的過程如下圖所示:
MapTask 將其產生的 shuffle 數據根據 Partition id 做本地排序,當內存不夠用時會引入外排,而外排會產生對本地磁盤寫放大的問題。最終生成數據文件和 Index 文件。接下來,下游的每個 ReduceTask 會從每個 Shuffle 數據文件中讀取 Partition 數據。從單個文件看,假設下游有幾千上萬個 ReduceTask,就會被讀取幾千上萬次,且每次讀取的是隨機位置和小的數據量,因此磁盤的 IOPS 會非常大,導致穩定性和性能問題。
另外,中間數據存儲在本地磁盤,這就要求計算節點有大容量的本地磁盤。現在的數據湖架構或者云原生架構一般采用存算分離,存算分離的好處是根據功能的不同最優化節點(機器)配置,能更好地適應不同的計算負載和存儲需求,提高資源利用率。如果計算過程中需要大容量磁盤存儲中間數據,就難以存算分離。第二,難以及時縮容,像 Spark 計算引擎,作業在運行過程中可以動態資源伸縮,Executor 處于 idle 一段時間就可以釋放,但是如果本地磁盤存儲了下游所需的中間數據,那么就無法釋放節點,特別是 Spark on K8s 的場景。
2、 Celeborn 的前世今生
Celeborn 的定位是通過接管中間數據來解決上述問題。在具體介紹 Celeborn 之前,先來看一下其發展歷程。
- Celeborn 最早于 2020 年在阿里云內部開發。
- 2021 年 12 月正式對外開源,趣頭條和小米成為種子用戶。
- 2022 年 10 月發布 0.1 系列,實現生產可用,支持 Spark,使用本地盤存儲,采用 Standalone 的方式部署。同年,正式捐贈給了Apache 軟件基金會(ASF),并更名為 Celeborn,正式進入 Apache 孵化。
- 2023 年 3 月發布 0.2 系列,是捐贈后的首個版本,支持 K8s 部署,提升了穩定性和性能。
- 2023 年 7 月發布 0.3.0,支持 Flink batch,也支持 Native Spark,在存儲層支持把數據存儲在 HDFS,并支持優雅升級。
- 2023 年 10 月 13 日,發布了 0.3.1。
截至目前,參與貢獻和使用 Celeborn 的企業來自各個行業和海內外。
3、整體架構
下圖是 Celeborn 的主要架構,其中藍色部分是 Celeborn 的組件。
從上圖可以看出 Apache Celeborn 是一個 Server-Client 的架構。
服務端包括 Master 和 Worker 節點:
Master 的主要職責是:管理整個集群的狀態;負責負載的分配;同時基于 Raft 實現高可用。
Worker 的主要職責是:接收、存儲和服務 shuffle 數據。實現了多層存儲,目前已經支持 Local Disks 和 DFS ,Memory 正在開發中。
Spark和Flink計算引擎分兩種角色:一是Driver/JobMaster,負責整個Application 生命周期的管理、作業的調度;另一個是 Executor/TaskManager,實現真正的分布式計算執行。
相對應的,Celeborn 也分兩部分,一個是 Lifecycle Manager,另一個是 Shuffle Client。Lifecycle Manager 的職責是管理當前作業的 shuffle metadata,把 shuffle 元數據從 Master 轉移到 Celeborn Application,讓 Application 自己管理自己的 shuffle,從而大幅降低 Master 的負載。Shuffle Client 存在于每一個Executor 或 TaskManager 上,負責具體推送和讀取 shuffle 數據。
Celeborn 的 Master、Worker、Lifecycle Manager 和 Shuffle Client 是與引擎無關的,沒有引入任何引擎的依賴。各計算引擎通過 Shuffle Client 的 API 來集成 Celeborn。
目前,Celeborn 社區官方已集成 Spark、Flink 和 MapReduce 三種計算引擎。另外,有公司基于 Shuffle Client 的 API 實現了 MR3 引擎的集成。
從上圖中可以看到,shuffle 數據不再存儲于本地磁盤,而是存儲到 Celeborn cluster 上,這就解除了計算節點對本地磁盤的依賴。
而穩定性和性能問題,Celeborn 是通過 Partition 的聚合來解決的。
4、核心設計
MapTask在推送數據時,把屬于同一個Partition的數據都推給同一個 CelebornWorker,Worker 把接收到的數據聚合之后寫入磁盤,這樣一個 Reducer 只需要從一個 Worker 上讀取相應的 Partition 文件即可。經過聚合后在shuffle read 階段,網絡連接數變成了 n,單個文件被讀取數就變成了 1,從而很好地解決了磁盤 IOPS 的問題。單個 Partition 文件過大時,可能會對磁盤造成過大的壓力,系統會檢測每個 Partition 文件的大小,如果超過預設的閾值,會對 Partition 做切分,Partiton 切分相關信息會存儲到 Lifecycle Manager 上,不會造成 Reduce 丟失數據。
下圖展示了使用 Celeborn 之后 shuffle 的生命周期。
首先,MapTask 在第一次需要寫 shuffle 數據時,向 Lifecycle Manager 發起 RegisterShuffle 請求,Lifecycle Manager 向 Celeborn Master 發起RequestSlots 請求,Master 選擇一部分 Worker 為當前的 shuffle 請求服務。Lifecycle Manager 向這些 Worker 發起 ReserveSlots 請求,這些 Worker 會做相應的準備工作。接下來,Lifecycle Manager 會把 PartitionLocations 信息返回給 MapTask,MapTask 就可以持續往這些 Worker 推送 shuffle 數據。Worker 接收到 shuffle 數據后在內存中緩存,如果開啟了多副本,會在兩個 Worker 做數據冗余。單個 Partition 數據在內存中超過默認的 256k 時,就會 Append 到 Partition 文件中。每個 MapTask 結束時會向 Lifecycle Manager 發送 MapperEnd,當所有 MapTask 都結束之后,Lifecycle Manager 向所有服務本次shuffle 的 Worker 發送 Commit files 請求,Worker 會把尚未 flush 的內存數據 flush 到磁盤。至此,就完成了整個 shuffle write 的過程。
ReduceTask 在啟動時向 Lifecycle Manager 獲取屬于自己的 PartitionLocations 信息,然后向相應的 Worker 讀取數據。
Celeborn 的另一個核心設計點是容錯和保證精準一次。
Fault tolerance
Celeborn 中最高頻的是 Push data,當其失敗時,Celeborn 不會認為這個Worker 失敗了,之前推送的數據就丟失了,而是認為當前推送臨時出問題,后面的推送會嘗試申請新的 Worker,繼續往新 Worker 推送。檢測數據是否丟失則是在Commit files 階段完成,Lifecycle Manager 既往之前失敗的 Worker 發送 commit 請求、也會往新的 Worker 上發送 commit 請求,只有當兩個 Pair 至少都有一個副本 commit 成功,才會判定當前數據沒有丟失。
Exactly once
另一個問題是,假設 Client 認為數據推送失敗了,重新申請 Worker 推送數據,但實際上 Worker 成功接收數據,并且 commit 成功了。這時需要一個機制來保證精準一次推送,Celeborn 是通過在每一個推送數據的 batch 上添加三元組 header(MapId、AttemptId和BatchId)保證不會有數據重復。Reducer 在讀取數據時會過濾掉非 successful attempt id 的數據和重復的 batch id 數據。
ShuffleClient 提供了 API 幫助計算引擎集成 Celeborn,主要包括以下四個 API:
(1)pushData
registerShuffle 會包含在 pushData 的實現里,引擎只需要創建 ShuffleClient 實例,調用 pushData 即可。
(2)mapperEnd
shuffle write 階段,除了 pushData,還需一個 mapperEnd API,告知 Celeborn shuffle write 完成。
(3)readPartition
在 shuffle read 階段,只有一個 readPartition 接口,Celeborn 會返回一個InputStream 對象,計算引擎接收這個 InputStream 完成后續工作。
(4)unregisterShuffle
當計算引擎認為一個 shuffle 已經完全結束,后續也不會再使用,可以調用 unregisterShuffle 通知 Celeborn 該 shuffle 生命周期結束。
除了上面介紹的一些核心設計,Celeborn 還具有如下特性,這里不做展開介紹。
- 滾動升級:升級時不影響當前正在運行的作業,下線時也不需要等待長尾作業的完成。
- 優雅下線
- 多租戶
- 負載均衡
- MapPartition
- 流量控制
- Spark AQE
- 列式 shuffle
5、效果評估
下圖是 Celeborn 0.3.0 發布時做的一個純 shuffle 作業的對比測試結果,對比的是 Spark 原生的 External Shuffle Service (ESS) 和 Celeborn 0.2.1 版本。
可以看到 Celeborn 在性能上優于 ESS,且規模越大,優勢越明顯。同時 0.3.0 版本比 0.2.1 版本的性能也有提升。
對比 ESS,Celeborn 的優勢主要在 shuffle read 階段,這與前面介紹的穩定性和性能主要在于 shuffle read 時的非常低效的 IO 模式一致。shuffle write 階段,Celeborn 需要通過一層網絡,而 ESS 直接寫本地文件。但 Celeborn 最新版本在 shuffle write 過程相對于 ESS 并沒有明顯的性能降低。
二、Celeborn 的社區/用戶/生產介紹
1、Celeborn 多元社區
Celeborn 于 2022 年 10 月捐贈給 Apache 軟件基金會,隨后(2022-10-18)進入孵化器,經過一年的發展,Celeborn 已經變成一個多元的社區。
目前 Celeborn 社區擁有:
- 7 位 PPMC,除了阿里,還有兩位分別來自網易和 Shopee。一位是來自 Shopee的朱夷 @Angerszhuuuu,是第一個大規模使用 Celeborn 的用戶,踩了很多坑,對 Celeborn 的發展做出了很多貢獻;另一位是來自網易的潘成(Cheng Pan),也是一位非常重要的 PMC。后面還會有更多 PMC 加入 Celeborn 社區。
- 6 位 Committers,其中三位來自阿里之外。
- 72 位 Contributors。
2、Celeborn 用戶場景
目前用戶使用 Celeborn 的方式有三種:混部、獨立部署、完全存算分離。
- 混部:把 Celeborn 集群跟現有的 HDFS/YARN 集群部署在一起。這種一般是已經存在 HADOOP 集群,又不想或基于成本考慮不能對集群進行升級改造,則可以把 Celeborn 直接部署在原集群上。
- Celeborn 獨立部署:這種方式的好處是可以把 shuffle 相關的 IO 跟 HDFS 的 IO 隔離開來,能夠有效提升穩定性。
- 完全存算分離:這種方式是把計算集群、Celeborn 集群、存儲集群完全分開,三方集群都可以很方便地進行擴展,能得到很好的性價比。
3、Celeborn 生產場景
下圖展示了阿里和其他用戶的 Celeborn 真實生產場景的情況,一些大用戶的 Celeborn 集群每天服務的 shuffle 數據總量達到若干個 PB、數萬個作業,單 Shuffle 數據達到數百 T。
4、 Celeborn 用戶反饋
下面摘錄一些正向的用戶反饋。
當然也有一些問題反饋,針對問題反饋,社區會積極解決。
5、Celeborn 與其他社區
(1)Kyuubi + Celeborn
接下來介紹 Celeborn 跟其他社區或項目的結合情況。
首先是 Kyuubi 和 Celeborn 的結合使用。Kyuubi 是網易開源的一個很優秀的開源項目。阿里云的 EMR 產品較早的時候就引入了 Kyuubi,用戶在創建 EMR 集群時可以直接選擇 Kyuubi 組件。
同時,網易通過 Kyuubi + Celeborn 在內部一些生產場景有效提升了性價比。
Kyuubi 社區和 Celeborn 社區聯系緊密。Kyuubi 的一些 PM 和 Committers 也在為 Celeborn 社區做貢獻。雙方都希望 Kyuubi 聯合 Celeborn 將來能做出更廣為大眾接受的云原生方案。
(2)Gluten + Celeborn
Spark 社區近年來一直希望能做出一個 Photon 的翻版組件。Databricks 的 Photon 是一個高效的 Native SQL 執行引擎,不開源。
Gluten 就是做這樣一件事情的產品,它將 Velox、ClickHouse 等 Native 的 SQL 執行引擎通過友好的方式集成到 Spark 里面。
Gluten 還提供了高效的 Columnar Shuffle 機制,下圖是 Gluten Columnar Shuffle 的示意圖。 Gluten 采用高效的設計讓 RecordBatch 數據結構有比較高效的 shuffle 性能,但是其在 shuffle 的數據傳輸上沿用了 Spark External Shuffle Service 的方式,因此也存在前面介紹的各種問題。
為解決 shuffle 數據上的問題,過去一段時間,Gluten 社區和 Celeborn 社區做了很好的聯合,把 Celeborn 集成到了 Gluten 里面。
編譯 Gluten 時指定 RSS Profile,就會默認使用 Celeborn 的 Client 接管 shuffle 數據。實現方式是在 Gluten 的 Native Partitioner 引入了 Celeborn 的 SDK,在推送和讀取 shuffle 數據時,通過 Celeborn SDK 連接 Celeborn Cluster。要說明的一點是,Celeborn 的引入與 Gluten 本身的 Native Partitoner 是正交關系,并不是替代關系。
(3)MR3 + Celeborn
MR3 是一個韓國教授團隊通過 API 自發的集成 Celeborn,社區也幫助 MR3 團隊解決了很多問題。MR3 + Celeborn 已完成第一個 Release 版本的發布。
三、Celeborn 未來發展規劃
Celeborn 未來的發展規劃包括三個方向。
1、社區
希望有更多 PPMC、Committer 和 Contributor 能夠加入社區,大家一起把社區做好,也希望社區更多元化,有更多公司的同事加入社區。
大家可以順手點個 star https://github.com/apache/incubator-celeborn。
2、用戶
持續推廣用戶,特別是希望有機會推廣到海外。
3、Feature 規劃
- 實現 Spill / Cache Data
- 對接更多的引擎,比如:Tez/Trino/Ray/...
- 繼續完善多層存儲
- 實現認證和安全隔離,Linkedin 團隊已在開發中
- Stage Rerun(螞蟻團隊)
- 更好的AQE支持
- 實現 C++ SDK,使得在集成 Ray 和 Native 時更加順暢
以上就是本次分享的內容,謝謝大家。