StarRocks VS ClickHouse,攜程大住宿智能數據平臺的應用
攜程是全球領先的一站式旅行平臺,現有員工約30000人,公司旗下的平臺可面向全球用戶提供一套完整的旅行產品、服務及差異化的旅行內容。攜程大住宿部是國內最大的酒店分銷電子商務平臺,在全球擁有約63萬家國內酒店和70萬家國際酒店。攜程大住宿數據智能平臺中70%的實時數據場景已經接入StarRocks,查詢響應速度平均在200ms左右,超過500ms的慢查詢數大幅度減少,同時人力和硬件成本大大降低。后續會將剩余的實時場景和離線場景全部遷入StarRocks。
(作者:史文俊攜程大住宿數據智能部資深開發工程師,負責攜程大住宿數據智能平臺的研發)
平臺現狀
大住宿數據智能平臺(簡稱HData)是一個為攜程大住宿業務提供數據可視化的平臺,簡而言之,就是用圖表的形式更為直觀地展示與解讀數據,幫助業務獲得知識和洞察,形成正確的決策,做出快速決策,少犯錯誤。在大住宿內部,每個部門關心的指標側重點會不同,權限控制也都會不一樣,所以數據展示的方式也是多樣化。

HData每天有將近2200左右的UV,10w左右的PV來訪問我們的系統,而節假日期間的訪問量基本都會翻2到3倍。

從2018年開始使用ClickHouse以來,我們90%的業務線都強依賴于ClickHouse,95%左右的接口響應時長都在1s以內,ClickHouse強悍的查詢性能得到了充分體現。
現在總數據行數大概700億左右,每天有超過2000個左右的流程,需要更新的數據行數大概有150億左右。
未壓縮前的數據總容量:8T,壓縮后的數據總容量:1.75T。

但是ClickHouse無法支持高并發查詢的缺陷也很明顯,現在CPU大部分情況下消耗是在30%以內,不過當有用戶大量查詢時CPU使用率可能就會被拉的很高。并且如果出現一個復雜的高消耗查詢,只靠人工手刷,可能在很短的時間內就可以把40C的CPU使用率打滿:

工作日的早上9點一般會有一波訪問高峰,為了保持系統穩定,我們采用主動建立緩存+用戶被動觸發緩存的機制來降低ClickHouse服務器的壓力。
一方面我們會將一些高頻訪問的頁面查詢結果進行緩存。另一方面,在離線數據更新完成后,我們通過分析用戶行為數據,主動給最近5天來訪問過相關數據的用戶緩存默認條件的數據,降低波峰。
現在的主動緩存+被動緩存取代了原本需要從ClickHouse上很大一部分的查詢量,這樣可以避免我們無限的擴容服務器。同時也可以把因為集中并發的查詢拉起來的峰刺打平。

現階段痛點
在節假日期間,實時數據是關注的重點,以今年勞動節為例,實時看板的訪問量要比平時高10倍左右。

工作日期間,CPU使用率一般不會超過30%。

節假日期間,CPU使用率一度超過70%,這對服務器的穩定性造成了很大隱患。

面對這種情況,一方面我們在前端做了節流來防止用戶高頻查詢,同時在后端也做了緩存,但是實時數據的緩存時間不能太久,一般1~2分鐘已經是用戶可接受的極限。通過下圖可以發現,離線數據的緩存命中率一般都會比較高,基本能達到50%以上甚至更高,但對于實時數據,命中率則只有10%左右:

另一方面,我們在服務端啟用了分流機制:實際應用場景中有一些業務的權限比較小,對應需要查詢的數據量也會比較小,我們通過分析定義出了一個閾值,比如權限數小于5000的用戶從MySQL請求數據,這部分用戶即使通過MySQL查詢速度也很快。讓權限大的用戶通過ClickHouse請求數據,這樣可以引流很大一部分用戶。

這樣做雖然暫時解決了眼下的問題,不過新的問題又接踵而至:
·數據需要雙寫到ClickHouse和MySQL,無法保證兩邊數據的一致性
·同時存在兩套數據,導致硬件成本增加
·ClickHouse不支持標準SQL語法,所以代碼也需要維護兩套,開發成本增加
針對上述問題的挑戰,我們的目標是尋求一個新的OLAP引擎來減少開發和運維成本,同時還要兼顧查詢性能,并在高并發和高吞吐的場景下有較好的適用性。
為此我們嘗試了一些市面上其他引擎,如Ingite、CrateDB、Kylin等,每種引擎從硬件成本或性能上都有自己特有的優勢,不過綜合到使用場景,最終我們選擇了StarRocks。
StarRocks介紹
·StarRocks是一個高性能分布式關系型列式數據庫,通過MPP執行框架,單節點每秒可處理多達100億行數據,同時支持星型模型和雪花模型。
·StarRocks集群由FE和BE構成,可以使用MySQL客戶端訪問StarRocks集群。
·FE接收MySQL客戶端的連接,解析并執行SQL語句,管理元數據,執行SQL DDL命令,用Catalog記錄庫、表、分區,tablet副本等信息。
·BE管理tablet副本,tablet是table經過分區分桶形成的子表,采用列式存儲。BE受FE指導,創建或刪除子表。
·BE接收FE分發的物理執行計劃并指定BE coordinator節點,在BE coordinator的調度下,與其他BE worker共同協作完成執行。
·BE讀本地的列存儲引擎,獲取數據,通過索引和謂詞下沉快速過濾數據。

我們選擇StarRocks主要基于以下幾方面的考慮:
1.亞秒級查詢延時
2.在高并發查詢、多表關聯等復雜多維分析場景有良好的性能表現
3.支持彈性擴展,擴容不影響線上業務,后臺自動完成數據rebalance
4.集群中服務有熱備,多實例部署,節點的宕機、下線、異常都不會影響集群服務的整體穩定性。
5.支持物化視圖和Online Schema Change
6.兼容MySQL協議,支持標準的SQL語法
性能測試
HData上的數據以多表關聯為主,在這種場景下,ClickHouse單機性能相比集群性能要好,因而在這里選取ClickHouse單機做對比。下面用3個測試用例分別對StarRocks和ClickHouse進行對比,我們用6臺虛擬機構建成了一個集群,3臺FE、BE混部,3臺BE,機器配置如下:

軟件版本:StarRocks標準版1.16.2
ClickHouse配置如下:

軟件版本:ClickHouse20.8

測試用例1

·StarRocks用時:547ms
·ClickHouse用時:1814ms
測試用例2

·StarRocks用時:126ms
·ClickHouse用時:142ms
測試用例3

·StarRocks用時:387ms
·ClickHouse用時:884ms
可以看到,StarRocks的查詢性能完全不遜色于ClickHouse,甚至更快。
數據更新機制
StarRocks根據攝入數據和實際存儲數據之間的映射關系,將數據表的明細表,聚合表和更新表,分別對應有明細模型,聚合模型和更新模型。
·明細模型:表中存在主鍵重復的數據行,和攝入數據行一一對應,用戶可以召回所攝入的全部歷史數據。
·聚合模型:表中不存在主鍵重復的數據行,攝入的主鍵重復的數據行合并為一行,這些數據行的指標列通過聚合函數合并,用戶可以召回所攝入的全部歷史數據的累積結果,但無法召回全部歷史數據。
·更新模型:聚合模型的特殊情形,主鍵滿足唯一性約束,最近攝入的數據行,替換掉其他主鍵重復的數據行。相當于在聚合模型中,為數據表的指標列指定的聚合函數為REPLACE,REPLACE函數返回一組數據中的最新數據。
·StarRocks系統提供了5種不同的導入方式,以支持不同的數據源(如HDFS、Kafka、本地文件等),或者按不同的方式(異步或同步)導入數據。
·Broker Load:Broker Load通過Broker進程訪問并讀取外部數據源,然后采用MySQL協議向StarRocks創建導入作業。適用于源數據在Broker進程可訪問的存儲系統(如HDFS)中。
·Spark Load:Spark Load通過Spark資源實現對導入數據的預處理,提高StarRocks大數據量的導入性能并且節省StarRocks集群的計算資源。
·Stream Load:Stream Load是一種同步執行的導入方式,通過HTTP協議發送請求將本地文件或數據流導入到StarRocks中,并等待系統返回導入的結果狀態,從而判斷導入是否成功。
·Routine Load:Routine Load提供了一種自動從指定數據源進行數據導入的功能。用戶通過MySQL協議提交例行導入作業,生成一個常駐線程,不間斷的從數據源(如Kafka)中讀取數據并導入到StarRocks中。
·Insert Into:類似MySQL中的Insert語句,可以通過INSERT INTO tbl SELEC...或INSERT INTO tbl VALUES(...)等語句插入數據。
·HData中的數據主要分為實時數據和離線T+1數據。
實時數據主要通過Routine load的方式導入,以使用更新模型為主
離線T+1數據主要使用Zeus平臺,通過Stream load的方式導入,以使用明細模型為主

實時數據通過攜程自研的消息隊列系統QMQ實現,下圖是原先的實時數據導入流程:

接入StarRocks后的實時數據導入流程:

很快我們就遇到了一個難題:有一個場景是訂閱訂單狀態變化的消息,下游我們以訂單號作為主鍵,使用更新模型來將數據落地。對外我們提供訂單狀態為非取消的數據進行展示。
在收到消息后,我們還需要調用外部接口來補全一些其他字段,最后再把數據落地。但如果收到一條消息就調用一次接口,這么做會對接口造成壓力,所以我們采取了批處理的方式。
不過這樣做產生了一個問題:Kafka本身無法保證全局消息是有序的,只能保證partition內的有序性。同一個批次同一個訂單,但訂單狀態不同的2條數據如果分別落在了不同的partition,routine load時無法保證哪條數據會先被消費。如果訂單狀態為取消的消息先被消費,而其他訂單狀態的消息后被消費,這樣會造成原本應該取消的訂單重新變成了非取消訂單,從而影響統計的準確性。
我們也考慮過不通過QMQ而改用原生的Kafka,將訂單號作為key來指定發送到哪個partition中,不過這樣做需要二次開發,而且改動的成本也不低。
為了解決這個問題,我們選擇了一個折中的辦法:在消息落地同時,又用明細模型落地了一個日志表,表里只需要存訂單號、訂單狀態以及消息發送時間。同時,有一個定時任務每隔一段時間會對該表內相同訂單號的數據進行排序,取消息發送時間最新的一條數據,用訂單號與正式表中訂單狀態不一致的數據進行匹配然后進行更新,以這樣的形式對數據進行一個補償。
T+1數據我們通過攜程自研的數據同步平臺Zeus進行ETL和導入:

DR和高可用
攜程對DR有著很高的要求,每隔一段時間都會有公司級的DR演練。StarRocks本身已經具備了十分優秀的DR機制,在此基礎之上,我們構建了一套適合自己的高可用體系:
·服務器分別部署在2個機房,以5:5的流量對外提供服務。對外提供服務的FE節點的負載均衡以配置項的形式實現,可以動態修改,實時生效(主要是考慮有服務器打補丁、版本升級等需要手動拉出的情況)。
·每個FE和BE進程全部都用supervisor進行進程守護,保證進程出現意外退出時可以被自動拉起。
·當FE節點出現故障時,存活的follower會立即選舉出一個新的leader節點提供服務,但是應用端卻無法立即感知,為了應對這種情況,我們起了一個定時任務,每隔一段時間對FE服務器進行health check,一旦發現FE節點故障,則立即將故障節點拉出集群,同時以短信方式通知開發人員。

·當BE節點出現故障時,StarRocks內部會自動進行副本均衡,對外仍可繼續提供服務,同時我們也會有一個定時任務對其進行health check,每當發現有BE節點故障,則會以郵件形式通知開發人員。

·同時,我們針對每臺服務器的硬件指標也配置了告警,通過攜程自研的智能告警中臺,一旦服務器的CPU、Mem、磁盤空間等指標發生異常,開發人員可以立即感知并介入。


總結和后期規劃
現在HData中70%的實時數據場景已經接入StarRocks,查詢響應速度平均在200ms左右,耗時500ms以上的查詢只占總查詢量的1%;并且數據和代碼也只需要維護一套,人力和硬件成本大大降低。


后期規劃
·將剩余的實時場景全部遷入StarRocks。
·離線場景也逐漸遷入StarRocks,逐步用StarRocks來統一OLAP分析全場景。
·進一步完善對StarRocks的監控機制,使其更健壯。
·通過讀取Hive外表的形式做數據冷熱分離,減少硬件成本。