如何從 Spark 的 DataFrame 中取出具體某一行?
如何從 Spark 的 DataFrame 中取出具體某一行?
根據阿里專家Spark的DataFrame不是真正的DataFrame-秦續業的文章-知乎[1]的文章:
- DataFrame 應該有『保證順序,行列對稱』等規律
- 因此「Spark DataFrame 和 Koalas 不是真正的 DataFrame」
確實可以運行,但卻看到一句話,大意是數據會被放到一個分區來執行,這正是因為數據本身之間并不保證順序,因此只能把數據收集到一起,排序,再調用 shift。這樣就不再是一個分布式的程序了,甚至比 pandas 本身更慢。
我們可以明確一個前提:Spark 中 DataFrame 是 RDD 的擴展,限于其分布式與彈性內存特性,我們沒法直接進行類似 df.iloc(r, c) 的操作來取出其某一行。
但是現在我有個需求,分箱,具體來講,需要『排序后遍歷每一行及其鄰居比如 i 與 i+j』,因此,我們必須能夠獲取數據的某一行!
不知道有沒有高手有好的方法?我只想到了以下幾招!
1/3排序后select再collect
collect 是將 DataFrame 轉換為數組放到內存中來。但是 Spark 處理的數據一般都很大,直接轉為數組,會爆內存。
因此不能直接 collect 。
要處理哪一列,就直接 select('列名') 取出這一列就好,再 collect 。我的數據有 2e5 * 2e4 這么多,因此 select 后只剩一列大小為 2e5 * 1 ,還是可以 collect 的。
這顯然不是個好方法!因為無法處理真正的大數據,比如行很多時。
2/3排序后加index然后用SQL查找
給 DataFrame 實例 .sort("列名") 后,用 SQL 語句查找:
- select 列名 from df_table where 索引列名 = i
我對于 SQL 不是很了解,因此這個做法只是在構思階段。
此外,我不清楚 SQL 的性能!我要調用很多次 df.iloc[i, 列] ,那這樣會不會太慢了?
3/3排序后加index然后轉置查找列名
這個想法也只是停留在腦子里!因為會有些難度。
給每一行加索引列,從0開始計數,然后把矩陣轉置,新的列名就用索引列來做。
之后再取第 i 個數,就 df(i.toString) 就行。
這個方法似乎靠譜。
附加方案:ml.feature.Bucketizer
- import org.apache.spark.ml.feature.{Bucketizer, QuantileDiscretizer}
spark中 Bucketizer 的作用和我實現的需求差不多(盡管細節不同),我猜測其中也應該有相似邏輯。有能力和精力了應該去讀讀源碼,看看官方怎么實現的。
參考資料
[1]Spark的DataFrame不是真正的DataFrame-秦續業的文章-知乎:
https://zhuanlan.zhihu.com/p/135329592