數據處理的大一統——從 Shell 腳本到 SQL 引擎
“工業流水線”的鼻祖,福特 T 型汽車[1]的電機裝配,將組裝過程拆成 29 道工序,將裝備時間由平均二十分鐘降到五分鐘,效率提升四倍 ,下圖圖源[2]。
T 型汽車裝配流水線
這種流水線的思想在數據處理過程中也隨處可見。其核心概念是:
- 標準化的數據集合:對應待組裝對象,是對數據處理中各個環節輸入輸出的一種一致性抽象。所謂一致,就是一個任意處理環節的輸出,都可以作為任意處理環節的輸入。
- 可組合的數據變換:對應單道組裝工序,定義了對數據進行變換的一個原子操作。通過組合各種原子操作,可以具有強大的表達力。
則,數據處理的本質是:針對不同需求,讀取并標準化數據集后,施加不同的變換組合。
Unix 管道
Unix 管道是一項非常偉大的發明,體現了 Unix 的一貫哲學:
程序應該只關注一個目標,并盡可能把它做好。讓程序能夠互相協同工作。應該讓程序處理文本數據流,因為這是一個通用的接口。
— Unix Pipe 機制發明者 Malcolm Douglas McIlroy
上述三句話哲學正體現了我們提到的兩點,標準化的數據集合——來自標準輸入輸出的文本數據流,可組合的數據變換——能夠協同工作的程序(如像 sort, head, tail 這種 Unix 自帶的工具,和用戶自己編寫的符合管道要求的程序)。
讓我們來看一個使用 Unix tools 和管道來解決實際問題的例子。假設我們有一些關于服務訪問的日志文件(var/log/nginx/access.log ,例子來自 DDIA[3] 第十章),日志的每一行格式如下:
// $remote_addr - $remote_user [$time_local] "$request"
// $status $body_bytes_sent "$http_referer" "$http_user_agent"
216.58.210.78 - - [27/Feb/2015:17:55:11 +0000] "GET /css/typography.css HTTP/1.1"
200 3377 "http://martin.kleppmann.com/" "Mozilla/5.0 (Macintosh; Intel Mac OS X 10_9_5)
AppleWebKit/537.36 (KHTML, like Gecko) Chrome/40.0.2214.115 Safari/537.36"
我們的需求是,統計出日志文件中最受歡迎的五個網頁。使用 Unix Shell ,我們會寫出類似的命令:
cat /var/log/nginx/access.log | # 讀取文件,打入標準輸出
awk '{print $7}' | # 取出每行按空格分割的第七個字段
sort | # 對每行按字面值進行排序
uniq -c | # 歸并重復行,并給出重復次數
sort -r -n | # 按重復次數降序進行排序
head -n 5 # 輸出前五行
可以看出上述 Shell 命令有以下幾個特點:
- 每個命令實現的功能都很簡單(高內聚)
- 所有命令通過管道進行組合(低耦合),當然這也要求可組合的程序只面向標準輸入、標準輸出進行編程,無其他副作用(比如輸出到文件)
- 輸入輸出面向文本而非二進制
此外,Unix 的管道的另一大優點是——流式的處理數據。也即所有程序中間結果并非都計算完成之后,才送入下一個命令,而是邊算邊送,從而達到多個程序并行執行的效果,這就是流水線的精髓了。
當然,管道也有缺點——只能進行線性的流水線排布,這也限制了他的表達能力。
GFS 和 MapReduce
MapReduce 是谷歌 2004 年的論文 MapReduce: Simplified Data Processing on Large Clusters[4] 提出的,用以解決大規模集群、并行數據處理的一種算法。GFS 是與 MapReduce 配套使用的基于磁盤的分布式文件系統。
MapReduce 算法主要分為三個階段:
- Map:在不同機器上并行的對每個數據分區執行用戶定義的 map() → List<Key, Value> 函數。
- Shuffle:將 map 的輸出結果(KV 對)按 key 進行重新分區,按 key 聚集送到不同機器上, Key→ List<Value>。
- Reduce:在不同機器上并行地對 map 輸出的每個 key 對應的List<Value> 調用 reduce 函數。
DDIA 第十章 MapReduce 執行示意圖
每個 MapReduce 程序就是對存儲在 GFS 上的數據集(標準化的數據集)的一次變換。理論上,我們可以通過組合多個 MapReduce 程序(可組合的變換),來滿足任意復雜的數據處理需求。
但與管道不同的是,每次 MapReduce 的輸出都要進行“物化”,即完全落到分布式文件系統 GFS 上,才會執行下一個 MapReduce 程序。好處是可以進行任意的、非線性的 MapReduce 程序排布。壞處是代價非常高,尤其考慮到 GFS 上的文件是多機多副本的數據集,這意味著大量的跨機器數據傳輸、額外的數據拷貝開銷。
但要考慮到歷史上開創式的創新,縱然一開始缺點多多,但會隨著時間迭代而慢慢克服。GFS + MapReduce 正是這樣一種在工業界開創了在大規模集群尺度上處理海量數據的先河。
Spark
Spark 便是為了解決 MapReduce 中每次數據集都要落盤的一種演進。
首先,Spark 提出了標準的數據集抽象——RDD[5],這是一種通過分片的形式分散在多機上、基于內存的數據集。基于內存可以使得每次處理結果不用落盤,從而處理延遲更低。基于分片可以使得在機器宕機時,只用恢復少量分片,而非整個數據集。邏輯上,我們可以將其當做一個整體來進行變換,物理上,我們使用多機內存承載其每個分片。
其次,基于 RDD,Spark 提供了多種可靈活組合的算子集,這相當于對一些常用的變換邏輯進行“構件化”,可以讓用戶開箱即用。(下面圖源 RDD 論文[6])
RDD 論文中列出的算子
基于此,用戶可以進行任意復雜數據處理,在物理上多個數據集(點)和算子(邊)會構成一個復雜的 DAG (有向無環圖)執行拓撲:
RDD 和算子構成的 DAG
關系型數據庫
關系型數據庫是數據處理系統的集大成者。一方面,它對外提供強大的聲明式查詢語言——SQL,兼顧了靈活性和易用性。另一方面,他對內使用緊湊、索引友好的存儲方式,可以支撐高效的數據查詢需求。關系型數據庫系統同時集計算和存儲于一身,又會充分利用硬盤,甚至網絡(分布式數據庫)特點,是對計算機各種資源全方位使用的一個典范。本文不去過分展開關系型數據庫實現的各個環節,而是聚焦本文重點——標準的數據集和可組合的算子。
關系型數據庫對用戶提供的數據基本組織單位是——關系,或者說表。在 SQL 模型中,這是一種由行列組成的、強模式的二維表。所謂強模式,可以在邏輯上理解為表格中每個單元所存儲的數據必須要符合該列“表頭”的類型定義。針對這種標準的二維表,用戶可以施加各種關系代數算子(選擇、投影、笛卡爾乘積)。
一條 SQL 語句,在進入 RDBMS 之后,經過解析、校驗、優化,最后轉化成算子樹進行執行。對應的 RDBMS 中的邏輯單元,我們通常稱之為——執行引擎,Facebook Velox[7] 就是專門針對該生態位的一個 C++ 庫。
傳統的執行引擎多使用火山模型,一種屬于拉( pull-based )流派的執行方式。其基本概念就是以樹形的方式組織算子,并從根節點開始,自上而下的進行遞歸調用,算子間自下而上的以行(row)或者批(batch)的粒度返回數據。
近些年來,基于推(push-based)的流派漸漸火起來了,DuckDB、Velox 都屬于此流派。類似于將遞歸轉化為迭代,自下而上,從葉子節點進行計算,然后推給父親節點,直到根節點。每個算子樹都可以拆解為多個可以并行執行的算子流水線(下圖源,Facebook Velox 文檔[8])
我們把上圖順時針旋轉九十度,可以發現他和 Spark 的執行方式如出一轍,更多關于 velox 機制的解析,可以參考我寫的這篇文章[9]。
但無論推還是拉,其對數據集和算子的抽象都符合本文一開始提出的理論。
小結
考察完上述四種系統之后,可以看出,數據處理在某種角度上是大一統的——首先抽象出歸一化的數據集,然后提供施加于該數據集之上的運算集,最終通過組合的形式表達用戶的各種數據處理需求。
參考資料
[1]福特 T 型汽車: https://www.youtube.com/watch?v=As0lqsd2-NI
[2]汽車流水線圖源: https://www.motor1.com/features/178264/ford-model-t-factory-cutaway-kimble/
[3]DDIA: https://ddia.qtmuniao.com/
[4]MapReduce 論文: https://research.google.com/archive/mapreduce-osdi04.pdf
[5]RDD 分析: https://www.qtmuniao.com/2019/11/14/rdd/
[6]RDD 論文: https://www.usenix.org/system/files/conference/nsdi12/nsdi12-final138.pdf
[7]Facebook Velox: https://github.com/facebookincubator/velox
[8]Facebook Velox 文檔: https://facebookincubator.github.io/velox/develop/task.html
[9]Facebook velox 運行機制解析: https://zhuanlan.zhihu.com/p/614918289