Facebook Velox 運行機制全面解析
概述
Facebook Velox 是一個針對 SQL 運行時的 C++ 庫,旨在統一 Facebook 各種計算流,包括 Spark 和 Presto,使用推的模式、支持向量計算。
Velox 接受一棵優化過的 PlanNode Tree,然后將其切成一個個的線性的 Pipeline,Task 負責這個轉變過程,每個 Task 針對一個 PlanTree Segment。大多數算子是一對一翻譯的,但是有一些特殊的算子,通常出現在多個 Pipeline 的切口處,通常來說,這些切口對應計劃樹的分叉處,如 HashJoinNode,CrossJoinNode, MergeJoinNode ,通常會翻譯成 XXProbe 和 XXBuild。但也有一些例外,比如 LocalPartitionNode 和 LocalMergeNode 。
邏輯計劃翻譯成物理計劃,可調整 Pipeline 并發度
為了提高執行的并行度,Velox 引入了 LocalPartitionNode 節點,可以將一個 Pipeline 進行多線程(每個線程一個實例)并行運行,并且互斥的消費數據。其中每個實例稱為 Driver。該算子在輸入計劃樹里并沒有分叉(即沒有多個 source),但在翻譯成物理算子時,會在此節點處進行切開,并在切口前后改變執行的并行度,對應的物理算子是LocalPartition 和 LocalExchange。
調整并發度算子,一個邏輯算子翻譯成兩個物理算子
還有一個特殊節點,稱為 LocalMergeNode,該對輸入有要求:必須有序,然后會進行單線程的歸并排序,從而使輸出全局有序。也因此,由其而切開的消費 Pipeline 一定是單 Driver 的。翻譯成算子,對應兩個 CallbackSink 和 LocalMerge。
Merge 算子,也是一種邏輯翻譯成兩種物理算子
總結一下,上述五個 PlanNode,HashJoinNode,CrossJoinNode, MergeJoinNode ,LocalPartitionNode ,LocalMergeNode 在翻譯時會造成切口,即將邏輯 PlanTree 切成多個物理 Pipeline,因此在切口處會將一個邏輯算子翻譯成多個物理算子,分到不同 Pipeline 上。每個 Pipeline 會有一個從 0 開始的編號:Pipeline ID,是全局粒度的。
并且,可以由 LocalPartitionNode 來按需改變每個 Pipeline 并行度,其中 Pipeline 的每個線程由一個 Driver 來執行。每個 Driver 也有一個從 0 開始的編號:Driver ID,是 Pipeline 粒度的。
其他 PlanNode 到算子的翻譯基本都是一對一的,感興趣的可以看官方文檔的這個頁面:Plan Nodes and Operators。
下面展開一些細節。
Splits
Velox 允許應用層(即 Velox 的使用方)以 Splits (每個算子的輸入片段稱為 Split)的方式給 Pipeline 喂數據,可以流式的喂,因此有兩個 API:
- Task::addSplit(planNodeId, split) :喂一份數據給 Velox
- Task::noMoreSplits() :通知 Velox 我喂完了。
Velox 會使用一個隊列在緩存這些 Splits 數據。在數據喂完之前的任意一個時刻,Pipeline 的葉子算子(對的,外部喂數據只能發生在葉子節點,如 TableScan,Exchange 和 MergeExchange)都可以從隊列中取數據,對應 API 是 Task::getSplitOrFuture(planNodeId) ,返回值有兩種:
- 如果隊列中有數據,則返回一個 Split
- 如果隊列中無數據,但還沒有收到喂完的信號,則返回一個 Future (類似于一個欠條,之后有數據之后,會憑該欠條兌付)。
Task 是 PlanTree Segment 執行單位,可以通過 Splits 方式流式喂數據
Join Bridges and Barriers
Join (HashJoinNode 和 CrossJoinNode)會翻譯成 XXProbe 和 XXBuild 兩個算子,并且通過一個共享的 Bridge 來溝通數據,兩側 Pipeline 都可以通過 Task::getHashJoinBridge() 函數來根據 PlanNodeId 獲取該共享的 Bridge。
為了提高 build 速度,build 側 Pipeline 通常使用多個 Driver 并發執行。但由于只有一個 Bridge,每個 Driver 在結束時可以調用 Task::allPeersFinished() (內部是使用一個 BarrierState 的結構來實現的)來判斷自己是否為最后一個 Driver,如果是,則將所有 Driver 的輸出進行合并后送到 Bridge。
當然,在 RIGHT and FULL OUTER join 情況下,Probe 側也需要將沒有 match 上的數據喂給 Bridge,此時也需要由最后一個 Driver 來負責這件事,于是同樣需要調用 Task::allPeersFinished() 函數。
使用 Bridge 對 Join 兩側 Pipeline 進行數據橋接(Build->Probe)
下面來詳細看下 Join 類算子的切分細節。以 HashJoin 為例,Task 在切分 PlanTree 時,會將邏輯上的一個 HashJoin 算子,轉化成物理上的一對算子:HashProbe 和 HashJoin,并且使用異步機制進行通知:在 HashJoin 完成后,通知 HashProbe 所在 Pipeline 繼續執行,在此之前,后者是阻塞等待的。
Join 兩側 Pipeline 是可以調整并發度的
如上圖,每個 Pipeline 在實例化(邏輯 PlanNode 轉物理 Operator)的時候,可以生成多份,進行并發執行,互斥的消費數據。并且,每個 Pipeline 的并行粒度可以不一樣,如上圖 Probe Pipeline 實例化了兩份,而 Build Pipeline 實例化了三份。并且,Build Pipeline 組中最后一個運行完的 Pipeline 負責將數據通過 Bridge 發送給 Probe Pipeline。
Exchange Clients
Velox 使用 Exchange Clients 來獲取遠程 worker 的數據。分兩個步驟:
第一步,Pipeline 中第一個 Driver (driverId == 0) 的 Exchange 算子從 Task 中獲取一個 Split,并且初始化一個共享 Exchange Client。
第二步,Exchange Client 會為上游每個 Task 構造一個 Exchange Source,并行的拉取每個上游 Task 同一個 Partition (圖中是 Partition-15)數據,然后將其放在 Client 的隊列 Queue 中。Exchange 的每個 Driver 都會去隊列中拉取這些數據。
如何從上游 Task 拉取數據的邏輯,需要由用戶自定義實現 ExchangeSource 和 ExchangeSource::Factory 。每個 ExchangeSource 接受一個上游 Task 的字符串 ID、Partition 編號和一個隊列作為參數。然后會從上游 Task 中拉取該 Partition 的數據,并且放到隊列中。
向上游 Task 遠程(跨進程)拉取數據,也叫 MaterializePage
Local Exchange Queues
Local exchange 用于在一個 Task 內部調整數據并發度,會被翻譯成兩個物理算子:LocalPartition 和 LocalExchange。其中,LocalPartition 在生產側 Pipeline,LocalExchange 在消費側 Pipeline。
中間通過 LocalExchangeQueues 來溝通生產者和消費者,這些隊列在 Task 類中。對于每個消費者(也即 LocalExchange 側 Driver)Task 都會構建一個 LocalExchangeQueue 隊列;每個生產者 (LocalPartition)可以訪問所有隊列。在產生一條數據是,會對其按照某種方式進行 Partition,然后寫到對應隊列中。這個過程類似于 MapReduce 中的 Shuffle 階段。
本地改變并發度時,使用一個隊列進行數據溝通
具體來說,Local Exchange 可以有幾種方式改變并行度。如一改多、多改一。多改一,典型的例子如,并行 sort:先切成多個分片每個分片分別 sort,后通過 Local Exchange 進行 merge sort。不僅單個 Pipeline 的多個 Driver 在進行數據合并時可以用 Local Exchange,多個 Pipeline 的合并也可以用 Local Exchange,不妨稱之為多并一。典型例子有,Union All,將多個數據集合并起來。
多改一
多并一
一改多通常用在,在經歷了某些必須使用單線程的算子后(比如一些 Shuffle 算子),重新對數據分片提高并發度,使用多線程運行。
一改多
Local Merge Sources
LocalMerge 算子和 LocalExchange 算子類似,但對并發數和輸入都有限定。其所在 Pipeline 只會單線程運行,但會接受多線程運行的 Pipeline 的輸入。并且要求所有輸入有序,然后將輸入進行歸并,保證輸出是有序的。
LocalMerge 算子通過 Task::getLocalMergeSources() 來獲取所有待 Merge 的 sources。因此,每個 LocalMergeNode 會初始化給定并發數個 LocalMergeSource。
Merge Join Sources
MergeJoin 算子提供了某種接受右側輸入的方法。Task 會在右側 Pipeline 增加一個 CallbackSink 算子,來匯集數據。左側算子可以通過 Task::getMergeJoinSource() 接口來獲取該 CallbackSink 的輸出。
擴展性
Velox 允許用戶自定義 PlanNode 和 Operator,以及 Join 相關的 Operator 和 Bridge。自定義 Operator 可以訪問 task 中的 splits 并使用 barriers。
但 Exchange clients, local exchange queues 和 local merge sources、 merge join sources 等狀態由于不是通用的,因此訪問不了。
總結
小節一下,Task 負責將由 PlanNode 組成的 PlanTree 翻譯成由 Operator 組成的 Pipeline,并且對 Pipeline 進行并發運行。在此期間,Task 會維護 Operator 間的共享狀態、協調 Operator 間的運行依賴。這些共享狀態包括:
- Splits
- Join bridges and barriers
- Exchange clients
- Local exchange queues
- Local merge sources
- Merge join sources
上述的每個狀態都是和特定 PlanNode 關聯的(即不是全局范圍的,而是和 PlanNode 綁定的),因此 Opeator 需要使用 PlanNodeID 來訪問相關狀態。前兩個狀態是所有算子都有的,因此自定義算子可以訪問到,后幾個狀態是某些算子特有的,因此自定義算子訪問不到。