王晶晶:京東零售海量日志數據處理實踐
01 京東零售流量數倉架構
1. 京東零售——流量簡介
① 什么是流量?
簡單來說,流量就是用戶作用在京東頁面上,產生一系列行為數據的集合。
② 流量數據的來源
數據來源主要是移動端和PC端,以及線下店、外部采買、合作商的數據等。
?
這些數據是如何流轉到數倉的呢?
2. 京東零售——流量數據處理架構
由架構圖可以看出,對不同的終端采取不同的采集模式;例如,對APP原生頁面采取SDK的采集模式,對于PC、H5頁面是JS采集,數據采集后按照實時和離線雙寫,離線直接寫到CFS分布式文件系統中,每小時從CFS拉取數據文件,同時對數據文件大小、采集ip進行監控,防止數據丟失;實時是以白名單的方式動態配置,寫到kafka中,最后將數據入倉。
?
3. 京東零售——流量數倉分層介紹
數據流轉到數倉會進行一些統一化的管理,數倉是如何分層的呢?
受京東業務復雜度和數據體量的影響,整體分層較細,分為:數據緩沖層(BDM)、貼源數據層(FDM)、基礎數據層(GDM)、公共數據層(ADM)、應用數據層(APP)五層。
① BDM層
是源業務系統的一些數據,會進行永久性保存。
② FDM層
主要是從報文日志轉化成業務格式,對業務字段進行拆解、排序和數據回寫等,例如用戶逛京東時前期未登錄,最終下單時才登陸,那對用戶全鏈路回寫便是在這一層進行。
③ GDM層
按照主題域進行標準化封裝,整體會屏蔽生產系統干擾,同時會處理數據回灌事情。
④ ADM層
ADM是公共數據層,面向主題、面向業務過程的數據整合,目前劃分成兩層:ADM-D、ADM-S。
ADM-D負責統一的數據口徑封裝,提供各主題統一維度和指標的最細粒度數據;
ADM-S提供各主題統一維度和指標的聚合數據, 為各業務方提供統一口徑的共享數據。
⑤ APP層
數據看板的數據整合,也可以進行一些跨主題的聚合數據處理。
⑥ 維度層
DIM層主要就是一些通用的維度數據。
基于以上的數倉分層方案,來看下京東流量數倉架構在離線和實時上別分是如何處理的。
4. 京東零售-流量離線數倉架構
① 基礎數據層
離線數倉最下面一部分是基礎數據,主要面向實體模型建設,按照數據渠道和不同類型做數據整合,例如渠道:app、pc、m等;日志類型:瀏覽、點擊、曝光等。
② 公共數據層
這一層也是大家應用比較廣泛的一層,上面也提到了adm面向業務過程的模型建設,這層也是分成了明細和匯總兩層。在明細層,我們會把所有的業務口徑沉淀到adm明細中,封裝各種業務標識,保障數據口徑統一管理,避免口徑二義性,同時,為數據可視化管理,提供源數據依賴。
③ 應用數據層
應用層主要是面向數據看板的建設,提供預計算和OLAP兩種方式服務模式,這一層整體上會很薄,重點解決數據引擎查詢效率問題,高頻訪問的維度提供預計算、低頻應用的數據由OLAP方式提供數據服務。
④ 數據服務層
面向多維數據分析場景,進行指標和維度的統一管理,以及服務接口的可視化管理,對外提供統一的數據服務。
5. 京東零售——流量實時數倉架構
實時數倉與離線數倉的建設理念是基本一致的。
RDDM是分渠道、分站點、分日志類型的實時數據流,構建過程中主要考慮解耦,如果只消費部分數據,依然需要全量讀取,對帶寬、i/o都是一種浪費。同時,也方便下游按照業務實際情況進行數據融合。
RADM面向業務場景,在RDDM的基礎上進行整體封裝整合,例如商詳、來源去向、路徑樹等業務場景。
在整體封裝后,數據會接入到指標市場,按照統一的接口協議和元數據管理規范進行錄入,對外提供統一的數據服務。
以上主要介紹了京東流量場景的數據處理架構,接下來我們結合一個京東實際案例,講述京東特殊場景下的數據處理方案。
?
02 京東零售場景的數據處理
1. 京東零售——流量挑戰
首先是數據爆炸式的增長。2015年至今,整體的數據量翻了約十幾倍,但資源情況并沒有相應成比例的增長。其次,業務的復雜度升高,包括新增了小程序、開普勒、線下店的一些數據以及并購的企業的數據等,因此整體的數據格式以及完備度上還是存在較大差異的。再次,隨著業務發展,流量精細化運營的場景增多,但數據服務的時效并沒有較大變化,需要我們在有限時間內處理一些更多更大體量的數據,以滿足更多場景化應用。特別是京東刷崗這樣的場景,對數據的范圍、需要處理的數據量,以及數據時效都是一個比較大的挑戰。
?
2. 海量數據更新實踐——刷崗
什么是刷崗?將發生在該SKU的歷史事實數據,按照最新的SKU對應運營人員、崗位、部門等維度信息,進行歷史數據回刷。
?
刷崗在京東也經歷了多個階段,從最初數據量較小,采取全量刷崗的模式,后續逐漸升級成增量的刷崗。后續采取OLAP的刷崗模式,也就是將數據寫到CK中,通過Local join進行關聯查詢。目前我們通過iceberg+olap的方式來實現數據刷崗。
首先構建iceberg表;其次、對流量商品表的更新處理,將所有會發生變化的字段拼接做MD5的轉化,后續每天做這種差異化的判斷,如果有變化就做upsert操作;最后,生成的流量商品表與事實表進行merge into,進而得到刷崗更新后的數據;同時在此數據基礎上,針對不同應用頻率的數據,采取了預計算和OLAP兩種數據服務模式。
通過數據湖的方式來實現數據更新,相比于hive存儲格式,支持多版本并發控制,同時支持ACID事務語義,保障他的一致性,數據在同一個批次內提交,要么全對,要么全錯,不會更新一部分。另外,支持增量數據導入和更新刪除能力,支持upsert操作,整天數據處理的復雜度要降低很多,同時在資源的消耗和性能以及數據處理范圍上較hive端模式都有了極大的提升。
基于數據湖的模式進行刷崗目前還面臨數據傾斜的問題需要解決。
3. 數據傾斜治理方案
?
① 數據傾斜的原因及處理方式
數據傾斜出現的一個主要原因是數據分布不均,出現熱點key。對于數據傾斜的處理方案,比較常見的有:優化參數,如增加reduce的個數、過濾一些異常值、賦隨機值,或者按經驗值設置固定閾值,把大于某閾值的數據單獨處理。賦隨機數的處理方式,當任務執行過程中,某個節點異常,切換新節點重新執行,隨機數據會發生變化,導致數據異常。通過這種經驗值設定閾值的一個弊端是,在不同的場景下,不容易界定閾值大小,包括對于熱點key的識別,通常也只能事后發現處理。
② 數據傾斜的解決方案
基于此,我們在探索的過程中建立了一套智能監測傾斜的任務。
首先,利用實時的數據,提前對數據進行監測,針對數據分布特點,通過3倍標準差確定離群點,離群點即傾斜閾值。
其次,根據傾斜閾值計算分桶數量。
最后,按照對列資源在不同時段的健康度進行作業編排。
③ 如何尋找熱點key及傾斜閾值
熱key尋找的核心思想,就是根據數據的分布特點,通過3倍標準差確定離群點,離群點即傾斜閾值,如下圖所示,整體的數據是呈右偏分布,我們通過兩次3倍標準差得到最后的傾斜閾值X2。
第二步計算分桶的數量,根據整體的數據分布情況看,第一階段的拒絕域面積與第二階段的拒絕域面積相等。根據積分原理,頻率絕對數與頻次絕對數呈反比,因概率密度分布曲線未知,所以用兩次離群點的頻次均值比例,代表兩次抽樣數據量比例,進而得到分桶數。
?
④ 數據分桶作業
最后是作業編排,一次性起多個任務會出現資源獲取不到,一直處于等待狀態,同時對其他的任務也會產生較大影響,并發少了又會帶來資源浪費,針對這類問題,我按照對列資源的健康度,對執行的任務做了編排,由整體串聯執行和固化并發,調整為按資源健康度動態擴展,實現資源利用最大化。
03 數據處理架構未來探索
?未來探索方向
首先,目前我們基于Flink+Spark的方式來做流批一體的探索。圖中可以看到傳統的Lambda數據架構有一個很大的特點:實時和離線是兩套不同的數據鏈路。整體的數據處理過程中,研發的運維成本相對較高,而且兩條不同的數據鏈路,會容易導致數據口徑上的差異。
后續通過FlinkSQL+數據湖存儲實現同一套代碼兩種計算模式,同時保證計算口徑一致性。同時也會有一些挑戰,開發模式的改變,CDC(change data capture)延遲目前是分鐘級延遲,如果調整為秒級,頻繁提交,會生成很多小版本,對數據湖的吞吐量造成影響,總體來說,在部分應用場景下存在一定局限性,但分鐘級延遲可以滿足大多數的實時應用場景,對于研發成本和效率都會有較大提升,當然,目前也在不斷的完成和探索??傮w來說,目前在一些特殊場景下具有一定的局限性。
?
04 問答環節
Q:分桶的應用效果?
A:總結成幾個點就是:
- 從事后處理轉變為事前監測。
- 不同周期、不同場景下動態計算傾斜閾值和分桶數量。
- 根據對列資源健康度動態擴展任務并發數量,實現資源利用最大化。
Q:Spark的應用在京東場景里最小的延遲是多少?
A:目前主要是基于小站點數據去做探索,數據處理量級比較小,目前延遲大概在分鐘級左右,如提交的頻率增大,對于io的性能會是一個很大的考驗。
Q:Spark應該是不支持行級別的upsert,京東這邊是怎么去解決這個問題的問題,分區和小文件的合并有哪些相關的經驗分享?
A:目前的版本可以支持行級更新,關于分區這部分主要還是結合業務特性,在設計分區時,盡量讓變化的數據都集中到少部分文件上,降低文件更新范圍。
今天的分享就到這里,謝謝大家。