ClickHouse作為目前業內主流的列式存儲數據庫(DBMS)之一,擁有著同類型DBMS難以企及的查詢速度。作為該領域中的后起之秀,ClickHouse已憑借其性能優勢引領了業內新一輪分析型數據庫的熱潮。但隨著企業業務數據量的不斷擴大,在復雜query場景下,ClickHouse容易存在查詢異常問題,影響業務正常推進。
字節跳動作為國內最大規模的ClickHouse使用者,在對ClickHouse的應用與優化過程中積累了大量技術經驗。在近日的【T·Talk】系列技術分享活動的第11期中,我們特別邀請到了字節跳動數據平臺資深研發工程師董一峰老師為廣大聽眾解析ClickHouse的復雜查詢問題,董一峰老師也在直播過程中首次公開分享了字節跳動解決ClickHouse復雜查詢問題的優化思路與技術細節。【T·Talk】將本次的核心內容進行了整理,希望能給大家帶來一些啟發:
?
項目背景
ClickHouse的執行模式與Druid、ES等大數據引擎類似,其基本的查詢模式可分為兩個階段。第一階段,Coordinator在收到查詢后,將請求發送給對應的Worker節點。第二階段,Worker節點完成計算,Coordinator在收到各Worker節點的數據后進行匯聚和處理,并將處理后的結果返回。
兩階段的執行模式能夠較為高效地支持目前許多常見的業務場景,例如各類大寬表單的查詢,這也是ClickHouse最擅長的場景。ClickHouse的優點是簡單、高效,通常來說,簡單就意味著高效。但隨著企業業務的持續發展,愈加復雜的業務場景對ClickHouse提出了以下三類挑戰。
第一類,當一階段返回的數據較多,且二階段計算較為復雜時,Coordinator會承受較大壓力,容易成為Query的瓶頸。例如一些重計算的Agg算子,如Count Distinct,若采用哈希表的方式進行去重,第二階段需在Coordinator單機上去合并各個Worker的哈希表。這個計算量會很重且無法并行。
第二類,由于目前ClickHouse模式并不支持Shuffle,因此對于Join而言,右表必須為全量數據。無論是普通Join還是Global Join,當右表的數據量較大時,若將數據都放到內存中,會比較容易OOM。若將數據spill到磁盤,雖然可以解決內存問題,但由于有磁盤 IO 和數據序列化、反序列化的代價,因此查詢的性能會受到影響。特別是當Join采用Hash Join時,如果右表是一張大表,構建也會比較慢。針對構建問題,近期社區也進行了一些右表并行構建的優化,數據按照Join key進行Split來并行地構建多個Hash Table,但額外的代價是左右表都需要增加一次Split操作。
第三類,則是關于復雜查詢(如多表 Join、嵌套多個子查詢、window function 等),ClickHouse對這類需求場景的支持并不是特別友好,由于ClickHouse并不能通過Shuffle來分散數據增加執行并行度,并且其生成的Pipeline在一些case下并不能充分并行。因此在某些場景下,難以發揮集群的全部資源。
隨著企業業務復雜度的不斷提升,復雜查詢,特別是有多輪的分布式Join,且有很多agg的計算的需求會越來越強烈。在這種情況下,業務并不希望所有的Query都按照ClickHouse擅長的模式進行,即通過上游數據 ETL 來產生大寬表。這樣做對ETL的成本較大,并且可能會有一些數據冗余。
企業的集群資源是有限的,但整體的數據量會持續增長,因此在這種情況下,我們希望能夠充分地去利用機器的資源,來應對這種越來越復雜的業務場景和SQL。所以我們的目標是基于ClickHouse能夠高效支持復雜查詢。
?
技術方案
對于ClickHouse復雜查詢的實現,我們采用了分Stage的執行方式,來替換掉目前ClickHouse的兩階段執行方式。類似于其他的分布式數據庫引擎,例如Presto等,會將一個復雜的Query按數據交換情況切分成多個 Stage,各Stage之間則通過Exchange完成數據交換。Stage之間的數據交換主要有以下三種形式。
- 按照單個或者多個key進行Shuffle
- 將單個或者多個節點的數據匯聚到一個節點上,稱為Gather
- 將同一份數據復制到多個節點上,稱為Broadcast或廣播
對于單個Stage執行,繼續復用ClickHouse目前底層的執行方式。開發上按照不同功能切分不同模塊。各個模塊預定接口,減少彼此的依賴與耦合。即使模塊發生變動或內部邏輯調整,也不會影響其他模塊。其次,對模塊采用插件架構,允許模塊按照靈活配置支持不同的策略。這樣便能夠根據不同業務場景實現不同的策略。
首先,當Coordinator接受復雜的查詢以后,它會在當前的語法樹的基礎上,根據節點類型和數據分布情況,插入Exchange節點,并生成一個分布式Plan。其次,Coordinator節點會根據ExchangeNode類型切分Plan,并生成每個Stage執行計劃片段。
接著,Coordinator節點會調用SegmentScheduler調度器,將各Stage的PlanSegment發送給Worker節點。當Worker接收到PlanSegment后,InterpreterPlanSegment會完成數據的讀取和執行,通過ExchangeManager完成數據的交互。最后,Coordinator從最后一輪Stage所對應的ExchangeManager中去讀取數據,并返回給Client。
查詢片段調度器SegmentScheduler負責調度查詢不同的PlanSegment,根據上下游依賴關系和數據分布,以及Stage并行度和worker分布和狀態信息,按照一定的調度策略,將PlanSemgent發給不同的 Worker 節點。
目前而言,我們在進行計劃下發和調度時,主要實現了兩種策略。
第一種是依賴調度,根據Stage依賴關系定義拓撲結構,產生DAG圖,并根據DAG圖調度Stage。依賴調度要等到依賴Stage啟動以后,才會調度對應的Stage。例如兩表Join,會先調度左右表讀取Stage,之后再調度Join這個Stage,因為Join的Stage依賴于左右表的Stage。
第二種是AllAtOnce策略,先計算每個Stage的相關信息,后一次性調度所有Stage。
相比而言,這兩種策略是在容錯、資源使用和延時上去做取舍。第一種策略依賴調度,可以實現更好的容錯。由于ClickHouse數據可以有多個副本,讀數據時,如部分節點連接失敗,可以嘗試它的副本節點。對后續依賴的節點的Stage來說,并不需要感知到前面 Stage 的執行情況。非Source Stage,本身沒有對數據的依賴,所以容錯能力會更強,只要保證Stage并行度的節點存活即可。甚至極端情況下,如需保證Query正常執行,也可以降低Stage的并行度。但調度存在依賴關系,并不能完全并行,會增加調度的時長。Stage較多的情況下,調度延時可能會占據SQL整體不小的比例。針對上述問題的可做如下優化:對于一些沒有依賴關系的,盡可能支持并行。例如同一個Stage的不同節點,可以并行。沒有依賴關系的Stage,也可以并行。
AllAtOnce策略,通過并行可以極大降低調度延時。為防止出現大量網絡IO線程,可以通過異步化手段控制線程數目。AllAtOnce策略的缺點是容錯性沒有依賴調度好,每一個Stage的Worker在調度前就已經確定了,調度過程中有一個Worker出現連接異常,則整個Query都會失敗。另一類情況,Stage在上游數據還沒有ready,就被調度起來了,則需要較長時間等數據。例如Final的agg Stage,要等Partial agg完成以后才能夠拿到對應的數據。雖然我們也對此進行了一些優化,并不會長時間空跑,浪費CPU資源。但是其實也消耗了一部分資源,例如需要去創建這些執行的線程。
ClickHouse的查詢節點執行主要是以SQL形式在節點間互相交互。在切分Stage后,我們需要支持能夠執行一個單獨的PlanSegment的執行計劃。因此,InterpreterPlanSegment主要的作用就是接受一個序列化后的PlanSegment,能夠在Worker節點上去運行整個PlanSegment的邏輯。此外,我們也進行了功能和性能上的增強,例如支持一個Stage處理多個Join,這樣便可以減少Stage的數目和一些不必要的傳輸,用一個Stage就可以完成整個Join的過程。InterpreterPlanSegment的執行會上報對應的狀態信息,如出現執行異常,會將異常信息報告給查詢片段調度器,調度器會取消Query其他的Stage的Worker執行。
ExchangeManager是PlanSegment數據交換的媒介,能平衡數據上下游處理的能力。整體而言,我們的設計采用Push與隊列的方式,當上游的數據ready時,主動推送給下游,并在這個基礎上支持了反壓的能力。
在整個流程中,上下游都會通過隊列來優化發送和讀取,上游與下游會有一個自己的隊列。當隊列飽和的時候,會通過類似反壓的機制來控制上游這個執行速度,若上游計算快,下游處理能力比較慢,出現下游處理不過來的情況,則會通過反壓的方式來控制上游執行的速度。
由于采用push和隊列,因此要考慮一個相對比較特殊的場景,在某些case的情況下,下游的Stage并不需要讀取全部的上游的數據。例如Limit100,下游只需讀取100條數據,而上游可能會產生非常大規模的數據。因此在這種情況下,當下游的Stage讀取到足夠的數據后,它需要能夠主動取消上游Stage的執行,并且清空隊列。
ExchangeManager考慮的優化點較多,例如細粒度的內存控制,能夠按照實例、Query、Segment等多個層次進行內存控制,避免OOM。更長期的考慮是在一些對延遲要求不高、數據量大的場景。第一,通過將數據 Spill 到磁盤,降低內存的使用。
第二,為了提升傳輸效率,小數據要做Merge,大數據要做Split。同時,在網絡傳輸和處理某些場景的時候,需要做一種有序性的保證。例如在Sort的場景,Partial Sort和Merge Sort的網絡傳輸過程必須要保證是有序的,傳輸數據不能出現亂序的情況,否則進行Merge Sort時數據就會出問題,并影響最終結果。
第三,連接的復用和網絡的優化,包括上下游在同一個節點,盡可能走內存交換,而不走網絡。這樣可以減少網絡開銷以及數據的序列化和反序列化的代價。此外,ClickHouse在計算上做了非常充足的優化,因此其在某些場景中,內存帶寬會成為瓶頸,在ExchangeManager的一些場景中,可以用一些零拷貝和其他優化,盡量減少內存的拷貝。
第四,異常處理和監控。相比于單機,分布式情況下異常情況會更加復雜,且更加難以感知。通過重試能夠避免一些節點短時性的高負載或者異常對查詢的影響。做好監控,在出問題的時候,能快速感知,并進行排查,也能夠針對性地去做優化。
優化與診斷
首先是Join的多種實現和優化。根據數據的規模和分布,可以根據不同的場景去選擇合適的Join的實現方式:
- Shuffle Join,是目前使用方式最多,也是最常見的。
- Broadcast Join,大表Join小表場景,將右表廣播到左表的所有Worker節點上面,這樣可以避免左表大表的數據傳輸。
- Colocate Join,如果左右表都已按照Join key分布,并且它們是相通的分布的話,其實不需要去做數據的exchange,可以將數據的傳輸減到最小。
網絡連接的優化,核心本質是減少連接的建立和使用,特別是在數據需要Shuffle時,下一輪Stage中的每一個節點都要從上游的Stage中的每個節點去拉取數據。若集群整體的節點數較多,且存在很多較復雜的Query,就會建立非常多的連接。
目前在字節內部,ClickHouse集群的規模非常大,在當前 ClickHouse 二階段執行的高并發情況下,單機最大可能會建立幾萬個連接。因此必須要進行網絡連接的優化,特別是支持連接的復用,每個連接上可以跑多個Stage查詢。通過盡可能去復用連接,在不同的節點之間,能夠建立固定數目的連接,不同的Query、Stage都會復用這些連接,連接數并不會隨著Query和Stage的規模的增長而增長。
網絡傳輸優化,在數據中心內,遠程的直接的內存訪問,通常指RDMA,是一種能夠超過遠程主機操作系統的內核,去訪問內存里的數據的技術。由于這種技術不需要經過操作系統,所以不僅節省了大量的CPU資源,同樣也提升了系統吞吐量,降低了系統的網絡通信延遲,尤其適合大規模并行的計算機集群。由于 ClickHouse 在計算層面做了很多優化,而網絡帶寬相比于內存帶寬要小不少,在一些數據量傳輸特別大的場景,網絡傳輸會成為一定的瓶頸。為了提升網絡傳輸的效率和提升數據 exchange 的吞吐,一方面可以引入壓縮來降低傳輸數據量,另一方面可以引入 RDMA 來減少一定的開銷。經過測試,在一些數據傳輸量大的場景,有不小的收益。
利用Runtime Filter的優化在不少數據庫也有使用。Join的算子通常是OLAP引擎里最耗時的算子,優化Join算子有兩種思路。一種思路是可以提升Join算子的性能。比如對于 HashJoin,可以優化 HashTable 實現,也可以實現更好的哈希算法,包括做一些更好的并行的方式。
另一種思路是,如果本身算子耗時比較重,可以減少參與算子計算的數據。Runtime Filter是在一些場景下特別是事實表Join多張維度表的星型模型場景有比較好的效果。在此類場景下,通常事實表的規模會非常大,而大部分的過濾條件都是在維度表上面。
Runtime Filter的作用,是通過在Join的Probe端,提前過濾掉并不會命中Join條件的輸入數據,從而大幅減少Join中的數據傳輸和計算。通過這種方式,能夠減少整體的執行時間。因此我們在復雜查詢上也支持了Runtime Filter,目前主要支持Min Max和Bloom Filter。
如果 runtime filter 的列(join column)構建了索引(主鍵、skip index…),是需要重新生成 pipeline 的。因為命中索引后,可能會減少數據的讀取,pipeline 并行度和對應數據的處理 range 都可能發生變化。如果 runtime filter 的列跟索引無關,可以在計劃生成的時候預先帶上過濾條件,一開始為空,只是占位,runtime filter 下發的時候把占位信息改成真正的過濾條件即可。這樣即使 runtime filter 下發超時了,查詢片段已經開始執行,只要查詢片段沒有執行完,之后的數據仍然可以進行過濾。
但需要注意的是,Runtime Filter是一種特殊場景下的優化,針對場景是右表數據量不大,并且構建的Runtime Filter對左表有比較好的過濾效果。若右表數據量較大,構建的Runtime Filter的時間比較久,或對左表的數據過濾沒有效果。Runtime Filter反而會增加查詢的耗時和計算的開銷。因此要根據數據的特征和規模來決定是否開啟優化。
性能診斷和分析對復雜查詢很關鍵,由于引入了復雜查詢的多Stage模型,SQL執行的模式會變得復雜。對此的優化首先是盡可能完善各類Metrics,包括Query執行時間、不同Stage執行時間、起始時間、結束時間、處理的IO數據量、算子處理的數據、執行情況,以及各類的算子Metrics和一些Profile Events(例如Runtime Filter會有構建時間、過濾數據量等Metrics)。
其次,我們記錄了反壓信息與上下游的隊列長度,以此推斷Stage的執行情況和瓶頸。
通??梢杂腥缦屡袛啵?/span>
- 輸入和輸出隊列數目同為低或同為高分別表明當前 stage 處理正?;蛱幱诒幌掠畏磯?,此時可以通過反壓信息來進一步判斷
- 當輸入和輸出隊列數目不一樣,這可能是出于反壓傳導的中間狀態或者該 stage 就是反壓的根源
- 如果一個 stage 的輸出隊列數目很多,且經常被反壓,通常是被下游 stage 所影響,所以可以排除它本身是反壓根源的可能性,更多關注它的下游
- 如果一個 stage 的輸出隊列數目很少,但其輸入隊列的數目很高,則表明它有可能是反壓的根源。優化目標是提升這個 stage 的處理能力
總的來說,SQL的場景包羅萬象,非常復雜的場景有時還是需要對引擎有一定了解的同學去診斷和分析,給出優化建議。字節目前也在不斷完善這些經驗,希望能夠通過不斷完善Metrics和分析的路徑,持續減輕Oncall的負擔,在某些場景下能夠更加準確地給出優化建議。
?
效果與展望
根據上述所提,目前執行模型存在三個缺點,我們進行了復雜查詢的優化,因此需要驗證這種新的模式是否能夠解決發現的問題,測試場景如下:
- 第二階段計算較復雜,且第一階段數據較多
- Hash Join右表是大表
- 多表Join,模擬復雜Query
- 以SSB 1T數據作為數據集,環境則是構建了8個節點的集群
Case1——二階段計算復雜。我們看到有一個比較重的計算算子UniqExact,就是count distinct的計算方式,通過Hash表做去重。count distinct默認采用這種算法,當我們使用復雜查詢后,Query的執行時間從8.5秒減少到2.198秒。第二階段 agg uniqExact 算子的合并原本由coordinator單點合并,現在通過按照group by key shuffle后可以由多個節點并行完成。因此通過shuffle減輕了coordinator的 merge agg 壓力。
Case2——右表為大表。由于 ClickHouse 對多表的優化做的還不是很到位。這里采用子查詢來下推過濾的條件。在這個case中,Lineorder是一張大表,采用復雜查詢的模式以后,Query執行時間從17秒優化到了1.7秒。由于Lineorder是一張大表,通過Shuffle可以將數據按照Join key Shuffle到各Worker節點上,這樣就減少了右表構建的壓力。
Case3——多表Join。開啟復雜查詢后,Query的執行時間從8.58秒優化到4.464秒,所有的右表都可以同時開始數據的處理和構建。為了和現有模式做對比,復雜查詢這里并沒有開啟 runtime filter,開啟 runtime filter 后效果會更好。
事實上,優化器對復雜查詢的性能提升也非常大,通過一些RBO的規則,例如常見的謂詞下推、相關子查詢的處理等,可以極大提升SQL的執行效率。在復雜查詢的模式下,由于有優化器的存在,用戶甚至不需要寫得非常復雜,優化器自動去完成這些下推和RBO規則優化。
此外,選擇用哪一種Join的實現,也會對Join的性能影響較大。若能夠滿足Join Key分布,使用Colocate Join可以減少左右表Shuffle的傳輸代價。在多表Join的情況下,Join的順序和Join的實現方式對執行的時長影響,會比兩表Join更大。借助這種數據的統計信息,通過一些CBO的優化,可以得到一個比較好的執行模式。
有了優化器,業務同學可以按照業務邏輯來寫任何的 SQL,引擎自動計算出相對最優的 SQL 計劃并執行,加速查詢的執行。
總結一下,ClickHouse目前的執行模式在很多單表的場景下表現非常優異,我們主要針對復雜場景做優化,通過實現多Stage的模式,實現了Stage之間的數據的傳輸,從工程實踐上做了較多嘗試和優化,去提升執行和網絡傳輸的性能。并希望通過完善Metrics和智能診斷來降低SQL分析和調優的門檻。目前已經實現了第一步,未來字節仍有很多努力的方向。
首先,是要繼續去提升執行和Exchange的性能。這里不談論引擎執行通用的優化,比如更好的索引或者算子的優化,主要是跟復雜查詢模式有關。舉一個例子,比如 Stage 復用,在 SQL 出現子查詢結果被反復使用的場景,比如一些多表 join 和 CTE 場景可能有幫助。通過 Stage 復用可以減少相同數據的多次讀取。Stage 復用我們之前就已經支持,但是用的場景比較少,未來準備更靈活和通用。
其次,Metrics和智能診斷加強。SQL的靈活度很高,因此一些復雜查詢如果沒有Metrics其實幾乎很難去做診斷和調優。以上都是字節跳動數據平臺在未來會長期的持續去發力的方向。
?
嘉賓介紹
董一峰,字節跳動數據平臺資深研發工程師。負責字節跳動企業級實驗平臺團隊,致力于打造業界最先進好用的實驗平臺,把A/B測試變成驅動業務增長的新基建。從0到1參與搭建了字節內實驗中臺Libra,服務于抖音、Tiktok、今日頭條等500多個業務線;對外發布火山引擎A/B測試(aka DataTester)、BytePlus Optimize等產品。