基于ClickHouse造實時計算引擎,百億數據秒級響應!
前言
為了能夠實時地了解線上業務數據,京東算法智能應用部打造了一款基于ClickHouse的實時計算分析引擎,給業務團隊提供實時數據支持,并通過預警功能發現潛在的問題。
本文結合了引擎開發過程中對資源位數據進行聚合計算業務場景,對數據實時聚合計算實現秒級查詢的技術方案進行概述。ClickHouse是整個引擎的基礎,故下文首先介紹了ClickHouse的相關特性和適合的業務場景,以及最基礎的表引擎MergeTree。接下來詳細的講述了技術方案,包括Kafka數據消費到數據寫入、結合ClickHouse特性建表、完整的數據監控,以及從幾十億數據就偶現查詢超時到幾百億數據也能秒級響應的優化過程。
ClickHouse
- ClickHouse是Yandex公司內部業務驅動產出的列式存儲數據庫。為了更好地幫助自身及用戶分析網絡流量,開發了ClickHouse用于在線流量分析,一步一步最終形成了現在的ClickHouse。在存儲數據達到20萬億行的情況下,也能做到90%的查詢能夠在1秒內返回結果。
- ClickHouse能夠實現實時聚合,一切查詢都是動態、實時的,用戶發起查詢的那一刻起,整個過程需要能做到在一秒內完成并返回結果。ClickHouse的實時聚合能力和我們面對的業務場景非常符合。
- ClickHouse支持完整的DBMS。支持動態創建、修改或刪除數據庫、表和視圖,可以動態查詢、插入、修改或刪除數據。
- ClickHouse采用列式存儲,數據按列進行組織,屬于同一列的數據會被保存在一起,這是后續實現秒級查詢的基礎。
列式存儲能夠減少數據掃描范圍,數據按列組織,數據庫可以直接獲取查詢字段的數據。而按行存逐行掃描,獲取每行數據的所有字段,再從每一行數據中返回需要的字段,雖然只需要部分字段還是掃描了所有的字段,按列存儲避免了多余的數據掃描。
另外列式存儲壓縮率高,數據在網絡中傳輸更快,對網絡帶寬和磁盤IO的壓力更小。
除了完整的DBMS、列式存儲外,還支持在線實時查詢、擁有完善的SQL支持和函數、擁有多樣化的表引擎滿足各類業務場景。
正因為ClickHouse的這些特性,在它適合的場景下能夠實現動態、實時的秒級別查詢。
適合的場景
讀多于寫。數據一次寫入,多次查詢,從各個角度對數據進行挖掘,發現數據的價值。
大寬表,讀大量行聚合少量列。選擇少量的維度列和指標列,對大寬表的數據做聚合計算,得出少量的結果集。
數據批量寫入,不需要經常更新、刪除。數據寫入完成后,相關業務不要求經常對數據更新或刪除,主要用于查詢分析數據的價值。
ClickHouse適合用于商業智能領域,廣泛應用于廣告流量、App流量、物聯網等眾多領域。借助ClickHouse可以實時計算線上業務數據,如資源位的點擊情況,以及并對各資源位進行bi預警。
MergeTree
MergeTree系列引擎是最基礎的表引擎,提供了主鍵索引、數據分區等基本能力。了解這部分內容,是后續開發和優化的基礎和方向。
分區
指定表數據分區方式,支持多個列,但單個列分區查詢效果最好。有數據寫入時屬于同一分區的數據最終會被合并到同一個分區目錄,不同分區的數據永遠不會被合并在一起。結合業務場景設置合理的分區可以減少查詢時數據文件的掃描范圍。
排序
在一個數據片段內,數據以何種方式排序。當使用多個字段排序時ORDER BY(T1,T2),先按照T1排序,相同值再按照T2排序。
MergeTree存儲結構
一張數據表的完整物理結構依次是數據表、分區以及各分區下具體的數據文件。分區下具體的數據文件包括一級索引、每列壓縮文件、每列字段標記文件,了解他們的存儲和查詢原理,為后面建表、聚合計算的優化提供方向。
- 一級索引文件,存放稀疏索引,通過ORDER BY或PRIMARY KEY聲明,使用少量的索引能夠記錄大量數據的區間位置信息,內容生成規則跟排序字段有關,且索引數據常駐內存,取用速度快。借助稀疏索引,可以排除主鍵范圍外的數據文件,從而有效減少數據掃描范圍,加速查詢速度;
- 每列壓縮數據文件,存儲每一列的數據,每一列字段都有獨立的數據文件;
- 每列字段標記文件,每一列都有對應的標記文件,保存了列壓縮文件中數據的偏移量信息,與稀疏索引對齊,又與壓縮文件對應,建立了稀疏索引與數據文件的映射關系。不能常駐內存,使用LRU緩存策略加快其取用速度。
在讀取數據時,需通過標記數據的位置信息才能夠找到所需要的數據,分為讀取壓縮數據塊和讀取數據塊兩個步驟。
掌握數據存儲和查詢的過程,后續建表和查詢有理論支持。
1)數據寫入
每批數據的寫入,都會生成一個新的分區目錄,后續會異步的將相同分區的目錄進行合并。按照索引粒度,會分別生成一級索引文件、每個字段的標記和壓縮數據文件。寫入過程如下圖:
2)查詢過程
查詢過程通過指定WHERE條件,不斷縮小數據范圍。借助分區能找到數據所在的數據塊,一級索引查找具體的行數區間信息,從標記文件中獲取數據壓縮文件中的壓縮文件信息。查詢過程如下圖:
查詢語句如果沒有匹配到任務索引,會掃描所有分區目錄,這種操作給整個集群造成較大壓力。
引用官方文檔中的例子對查詢過程進行說明。以(CounterID, Date) 為主鍵,排序好的索引的圖示會是下面這樣:
- 指定查詢如下:
- CounterID in ('a', 'h'),服務器會讀取標記號在[0, 3)和[6, 8) 區間中的數據。
- CounterID IN ('a', 'h') AND Date = 3,服務器會讀取標記號在[1, 3)和[7, 8)區間中的數據。
- Date = 3,服務器會讀取標記號在[1, 10]區間中的數據。
ClickHouse支持集群部署,在查詢分布式表時,集群會將每個節點的數據進行合并,得到所有節點的數據后返回結果。MergeTree系列表引擎支持副本,如ReplicatedMergeTree表引擎建表存放明細數據,接下來介紹的兩種表引擎都繼承自MergeTree,但又有各自的特殊功能。
- ReplacingMergeTree實現數據去重
在建表時設置ORDER BY排序字段作為判斷重復數據的唯一鍵,在合并分區的時候會觸發刪除重復數據,能夠一定程度上解決數據重復的問題。
- AggregatingMergeTree
在合并分區的時候按照定義的條件聚合數據,將需要聚合的數據預先計算出來,在聚合查詢時直接使用結果數據,以空間換時間的方法提高查詢性能。該引擎需要使用AggregateFunction類型來處理所有列。
了解了ClickHouse相關內容后,接下來將介紹完整的技術方案。
技術方案及查詢優化
資源位的數據來源包括Kafka的實時數據和hdfs里面存儲的離線數據。實時數據通過Flink實時任務寫入ClickHouse,離線數據通過建立MapReduce定時任務寫入ClickHouse。
架構圖
實時數據入庫
實時數據從實時數據到寫入CK過程:
- 各業務線產生的實時數據寫入kafka通道,根據數據量分配不同的分區個數。
- 創建的flink任務對各個業務的kafka數據進行消費,每個業務處理過程會有所不同。一般包括過濾算子、數據加工算子、寫入算子。
過濾算子,過濾掉不需要的數據,這個步驟非常重要,設置嚴格的數據評估標準,防止臟數據、不符合規則的數據寫入集群。另外對臟數據的過濾要做好記錄,在數據完整性測試過程中會用到。
數據加工算子,主要負責從實時數據流中解析出業務需要的數據,這個過程也要設置嚴格的校驗邏輯,保證數據整潔;若涉及數據加工邏輯更新,要保證加工邏輯及時更新。
寫入算子,采用批量寫入方式,根據集群情況,設置合理的批次,實時查詢和寫入性能達到均衡。
寫入ck過程可以通過域名連接分布式表,也可以通過nginx進程掌握一份集群機器IP列表,每個nginx進程自己輪詢,均衡寫入集群的每臺機器,但需要保證寫入ClickHouse的QPS不能太小,防止出現寫入不均衡情況。
離線數據入庫
- 離線數據建立定時任務,將hive表中的數據加工好,通過建立MapReduce定時任務,將加工后的數據寫入ClickHouse。
- 離線數據入庫過程同樣包括過濾、數據加工、寫入ClickHouse過程。
批量寫入
在前面merge章節有介紹,每次數據寫入都會產生臨時分區目錄,后續會異步的將相同分區的目錄進行合并。寫入過程會消耗集群的資源,所以一定采用批量寫入方式,每批次寫入條數看集群和數據情況(1萬、5萬、10萬每批次可作為參考)。采用JDBC方式實現批量寫入程序如下:
JDBC驅動,可以使用官方提供的驅動程序:
- <dependency>
- <groupId>ru.yandex.clickhouse</groupId>
- <artifactId>clickhouse-jdbc</artifactId>
- <version>0.2.4</version>
- </dependency>
初始化Connection:
- Class.forName(Ck.DRIVER);
- Connection connection = DriverManager.getConnection(Ck.URL, Ck.USERNAME, Ck.PASSWORD);
- connection.setAutoCommit(false);
批量寫入:
- PreparedStatement state = null;
- try {
- state = connection.prepareStatement(INSERT_SQL);
- for(控制寫入批次)
- {
- state.set...(index, value);
- state.addBatch();
- }
- state.executeBatch();
- connection.commit();
- }catch (SQLException e) {
建表
在開始建表前,對業務進行充分理解,了解集群數據的查詢場景,在建表時規劃好分區字段和排序規則,這個過程非常重要,是集群查詢性能良好的基礎。
例如我們面臨的業務場景為,計算移動App每個點擊按鈕聚合PV和UV(需要去重),按天或者小時聚合計算,還有商品各種屬性聚合計算的PV和UV。
選擇分區字段。正如前面MergeTree章節介紹,ClickHouse支持分區,分區字段是每張表整個數據目錄最外層結構,可以很大程度加快查詢速度。
另外分區字段不易過多,分區過多就意味著數據目錄更加復雜,在進行聚合計算時,肯定會影響整個集群的查詢性能。目前我們遇到的業務場景,適合以時間字段(時分秒)來作為分區字段,toYYYYMMDD(ts)。
設置排序規則。數據會按照設置的排序字段先后順序來進行存儲,在進行聚合計算時也會按照聚合條件對相鄰數據進行計算,但如果聚合條件不在排序字段里,集群會對當前分區的所有數據掃描一遍,這種查詢就會慢很多,大量消耗集群的內存、CPU資源。我們應該避免這種情況出現,設置合理的排序規則才能以最快的速度聚合出我們想要的結果。
當前業務場景下,我們可以選擇代表各個按鈕的id和商品的屬性作為排序字段。在進行聚合查詢時,where條件下選擇分區,排序規則卡出來需要的數據,能夠很大程度提高查詢速度。
所以在建表階段就要充分了解未來的查詢場景,選擇合適的分區字段和排序規則。
另外,建表時候最重要的是選擇合適的表引擎,每種表引擎的使命都不同,根據自身業務選擇出最合適表引擎。當前業務場景我們可以選擇ReplicatedMergeTree引擎存明細數據。
建表實例:
- CREATE TABLE table_name
- (
- Event_ts DateTime,
- T1 String,
- T2 UInt32,
- T3 String
- ) ENGINE = ReplicatedMergeTree('/clickhouse/ck.test/tables/{layer}-{shard}/table_name', '{replica}')
- PARTITION BY toYYYYMM(Event_ts)
- ORDER BY (T1, T2)
進行到這里,完成了建表和數據寫入,集群的查詢速度一般還是可以的,在集群硬件還不差的情況下滿足每次10幾億的數據的聚合查詢沒有問題,當然前提是是選擇了分區和卡排序字段的基礎上。
但數據再進一步多到百億甚至近千億數據,只是簡單的設置分區和優化排序字段是很難做到實時秒級查詢了。
查詢優化
雖然在查詢時卡了分區和排序條件,但隨著存儲的數據量增多,ClickHouse集群的查詢壓力會逐漸增加,出現查詢速度慢情況。如果有大SQL請求發給了集群,會造成整個集群的CPU和內存升高,直到把整個集群內存打滿,集群基本會處于癱瘓狀態。對查詢進行優化非常重要。
排查耗時SQL。耗時的SQL對整個集群造成很大的壓力,要先找到解決耗時SQL的優化方案。當前業務場景下,能很容易發現聚合計算UV(去重)是比較消耗集群資源的。
對于聚合結果的場景,我們多次嘗試優化方案后,通過建立物化視圖,以空間換取時間,大部分聚合查詢速度能提高10幾倍。建立物化視圖同樣要先去了解業務場景,選擇分區字段、ORDER BY字段,并選擇count、sum、uniq等聚合函數。
物化視圖建表語句:
- CREATE MATERIALIZED VIEW test_db.app_hp_btn_event_test ON CLUSTER test_cluster ENGINE = ReplicatedAggregatingMergeTree( '/clickhouse/ck.test/tables/{layer}-{shard}/test_db/app_hp_btn_event_test', '{replica}') PARTITION BY toYYYYMMDD(time) ORDER BY(btn_id,cate2) TTL time + toIntervalDay(3) SETTINGS index_granularity = 8192
- AS
- SELECT
- toStartOfHour(event_time) AS time,
- btn_id,
- countState(uid) PV,
- uniqState(uid) AS UV
- FROM
- test_db.app_hp_btn_event_test
- GROUP BY
- btn_id,
- toStartOfHour(event_time)
查詢實例:
- hour from test_db.app_hp_btn_event_test where toYYYYMMDD(time) = 20200608 group by hour
避免明細數據join。ClickHouse更適合大寬表數據聚合查詢,對于明細數據join的場景盡量避免出現。
集群硬件升級。軟件的優化總是有限的,觀察集群的CPU、內存、硬盤情況,集群的日常CPU、內存較高時,及時升級機器。
數據監控報警
完善的監控體系讓我們及時得知引擎異常,同時也能時刻觀測數據寫入查詢情況,掌握整個引擎的運行情況。
- 數據從消費到寫入各個階段異常信息。主要包括java.lang.NullPointerException、java.lang.ArrayIndexOutOfBoundsException等異常信息,大部分是因為數據源有所調整引起;
- 各個階段添加報警功能,Kafka添加積壓報警、核心算子計算邏輯添加異常報警、ck集群在mdc系統添加硬盤、cpu、內存預警;
- Grafana查詢系統。主要包括CPU、內存、硬盤使用情況;
- 大SQL監控。查詢耗時SQL和沒有卡分區和排序字段的查詢。
最后
ClickHouse自身有處理萬億數據的能力。在掌握了它的存儲、查詢、MergeTree原理后,創建符合業務要求的數據庫表,執行符合ClickHouse特性的查詢SQL,實現1000億數據的秒級聚合查詢并不是難事。
ClickHouse還有很多特性,需要在開發過程中不斷地摸索和嘗試。