基于 MaxCompute 分布式 Python 能力的大規模數據科學分析
一、Python 生態的重要性
Why Python
Python has grown to become the dominant language both in data analytics, and general programming。
根據技術問答網站stack overflow統計,Python、C#、Javascript、java、php、C++、SQL、R、statistics這些編程語言從2009年至2021年的趨勢圖如下圖所示。可以看出Python的趨勢是呈現上漲趨勢,特別是在數據分析和數據科學領域,幾乎是top one的編程語言。這是Python生態的發展趨勢。當然,在數據分析數據科學機器學習這個領域,不只是有編程語言這一個因素。
統計來自 https://insights.stackoverflow.com/trends
數據科學技術棧
在數據科學領域編程語言只是一個方面,語言不止包含Python,也有數據分析人員用SQL,或者傳統分析語言R,或者是函數式編程語言Scala。第二個方面需要有數據分析對應的庫,比如NumPy、pandas等,或者是基于可視化的庫會在里面。Python運行的集群還會有一些運維的技術棧在里面,比如可以運行在docker或者是kubernetes上。如果要做數據分析數據科學,前期需要對數據進行清洗,有一些ETL的過程。有一些清洗不只是一兩步能完成的,需要用工作流去完成整體的ETL的過程。里面涉及到最流行的組件比如Spark,整個工作流調度Airflow,最終結果做一個呈現,就需要存儲,一般用PostgreSQL數據庫或者內存數據庫redis,對外再連接一個BI工具,做最終結果的展示。還有比如機器學習的一些組件或者平臺,TensorFlow、PyTorch等。如果是設計到Web開發,快速搭建起一個前端平臺,還會用的比如Flask等。最后包括一個商業智能的軟件,比如有BI工具tableau、Power BI,或者是數據科學領域經常用到的軟件SaaS。
這就是整個數據科學技術棧比較完整的一個視圖。我們從編程語言切入,發現如果要實現大規模數據的數據科學是需要方方面面的考量。
二、MaxCompute 分布式 Python 能力介紹
MaxCompute 分布式 Python 技術 - PyODPS
MaxCompute是一款SaaS模式的云數據倉庫,基于MaxCompute是有兼容Python的能力。
PyODPS 是 MaxCompute 的 Python 版本的 SDK, 它提供了對 MaxCompute 對象的基本操作;并提供了 DataFrame 框架(二維表結構,可以進行增刪改查操作),能在 MaxCompute 上進行數據分析。
PyODPS 提交的 SQL 以及 DataFrame作業都會轉換成 MaxCompute SQL 分布式運行;如果第三方庫,能以 UDF+SQL 的形式運行,也可以分布式運行。
如果需要 Python 把作業拆成子任務分布式來運行,比如大規模的向量計算原生 Python 沒有分布式能力,這時候推薦用 MaxCompute Mars。是可以把Python任務拆分成子任務進行運行的框架。
自定義函數中使用三方包
假如不是單純運行Python,需要借助一些Python第三方包,這個MaxCompute也是支持的。
流程如下:
Step1
確定使用到的第三方包
sklearn,scipy
Step2
找到對應報的所有依賴
sklearn,scipy,pytz,pandas,six,python-dateutil
Step3
下載對應的三方包(pypi)
python-dateutil-2.6.0.zip,
pytz-2017.2.zip, six-1.11.0.tar.gz,
pandas-0.20.2-cp27-cp27m-manylinux1_x86_64.zip,
scipy-0.19.0-cp27-cp27m-manylinux1_x86_64.zip,
scikit_learn-0.18.1-cp27-cp27m-manylinux1_x86_64.zip
Step4
上傳資源變成MaxCompute的一個Resource對象。
這樣我們去創建函數,再引用自定義函數,就能夠使用到第三方包。
自定義函數代碼
- def test(x): from sklearn import datasets, svm from scipy import misc import numpy as np iris = datasets.load_iris() clf = svm.LinearSVC() clf.fit(iris.data, iris.target) pred = clf.predict([[5.0, 3.6, 1.3, 0.25]]) assert pred[0] == 0 assert misc.face().shape is not None return x
MaxCompute 分布式 Python 技術 - Mars
項目名字 Mars
最早是 MatrixandArray;登陸火星
為什么要做 Mars
為大規模科學計算設計的:大數據引擎編程接口對科學計算不太友好,框架設計不是為科學計算模型考慮的
傳統科學計算基于單機,大規模科學計算需要用到超算
Tips科學計算:計算機梳理數據: Excel-> 數據庫 (MySQL)-> Hadoop, Spark, MaxCompute 數據量有 了很大變化,計算模型沒有變化,二維表,投影、切片、聚合、篩選和排序,基于關系代數,集合論;科學計算基礎結構不是二維表:例如圖片2維度,每個像素點不是一個數字(RGB+α 透明通道)
傳統 SQL 模型處理能力不足:線性代數,行列式的相乘,現有數據庫效率低
現狀 R,Numpy 單機基于單機; Python 生態的 Dask 大數據到科學計算的橋梁
案例
客戶A MaxCompute 現有數據,需要針對這些 百億數據 TB 級別的數據相乘;現有 MapReduce 模式性能低;用 Mars 就可以高效的解決;目前是唯一一個大規模科學計算引擎
加速數據科學的新方式
加速數據科學的方式如下圖所示。
基于DASK或者是 MaxCompute Mars其實是 Scale up 和 Scale out 兼容的方式。在下圖左下代表單機運行Python 的庫做數據科學的一個方式。大規模超算的思路是Scale up,也就是線上垂直擴散,增加硬件能力,比如可以利用多核,當前每臺電腦或服務器上不止一核,包括GPU、TPU、NPU等做深度學習的硬件。可以把Python移植到這些硬件上做一些加速。這里的技術包含比如Modin是做多核加速pandas。在右下,也有一些框架在做分布式Python,比如RAY是螞蟻的一款框架服務,本質上Mars是可以運行在RAY上,相當于Python生態的一個調度,一個kubernetes。DASK也是在做分布式Python,包括Mars。當然,最佳的模式是 Scale up 和 Scale out 兩種做一個組合。這樣的好處是,可以做分布式,在單節點上也可以利用硬件能力。Mars當前只能在大規模集群上,單機配置在GPU集群。
分布 Python 的設計邏輯
Mars本質上設計思路是把數據科學庫分布式化掉,比如Python,可以把Dataframe做一個拆分,包括Numpy,Scikit-Learn。
把大規模作業拆分成小作業來做分布式計算。本身框架就是拆成作業用的,首先客戶端提交一個作業,Mars框架把作業拆分,做一個DAG圖,最后匯總收集計算結果。
Mars 場景1 CPU和GPU混合計算
1、安全和金融領域,傳統大數據平臺挖掘周期長,資源緊張,等待周期長。
2、Mars DataFrame加速數據處理:大規模排序;統計;聚合分析。
3、Mars learn 加速無監督學習;Mars拉起分布式深度學習計算。
4、 使用 GPU 加速特定計算。
Mars 場景2 可解釋性計算
1、廣告歸因&洞察特征的解釋算法,本身計算量巨大,耗時長。
2、使用 Mars Remote 將計算用數十臺服務器進行加速,提升百倍性能。
Mars場景3 大規模k-最鄰近算法
1、Embedding 的流行使得用向量表述實體非常常見。
2、Mars 的NearestNeighbors算法兼容 scikit-learn。暴力算法在300萬向量和300萬向量計算top10相似計算(9萬億次向量比對)中,用20個worker兩個小時計算完成,大數據平臺基于SQL+UDF的方式無法完成計算。更小規模 Mars 相比大數據平臺性能提升百倍。
3、Mars 支持分布式的方式加速Faiss、Proxima(阿里達摩院向量檢索庫),達到千萬和億級別規模。
三、最佳實踐
Mars本身會集成一些Python第三方包,基本主流機器學習和深度學習的庫都包含在里面。下方Demo講一個使用Mars做智能推薦,用lightgbm做一個分類算法,比如有一些優惠判斷是不是給某些用戶做推送。
Mars 包括的第三方包:
https://pyodps.readthedocs.io/zh_CN/latest/mars-third-parties.html
第一張圖上主要步驟是通過 AK、project 名字、Endpoint 信息連接到 MaxCompute。接下來創建一個4節點,每個節點8 core,32G 的集群,應用 extended 擴展包,并生成 100w 用戶 64維度描述信息的訓練數據。
利用 Lightgbm 2分類算法的模型訓練:
將模型以 Create resource 方式傳到 MaxCompute 作為 resource 對象,準備測試集數據
使用測試測試集數據驗證模型,得出分類: