深入了解Apache Spark窗口功能
窗口函數對數據組進行操作,并為每個記錄或組返回值
> Photo by Tom Blackout on Unsplash
在此博客文章中,我們將深入探討Apache Spark窗口函數。 您可能也對我之前有關Apache Spark的帖子感興趣。
- 使用Apache Spark開始您的旅程-第1部分
- 使用Apache Spark開始您的旅程-第2部分
- Apache Spark開始您的旅程-第3部分
- 深入研究Apache Spark DateTime函數
- 在Apache Spark中使用JSON
首先,讓我們看看什么是窗口函數以及何時使用它們。 我們在Apache Spark中使用了各種功能,例如月份(從日期返回月份),四舍五入(舍入值)和地板(為給定的輸入提供底值)等,這些功能將在每條記錄上執行并返回一個值 每條記錄。 然后,我們將對一組數據執行各種聚合函數,并為每個組返回一個值,例如sum,avg,min,max和count。 但是,如果我們想對一組數據執行該操作,并且希望對每個記錄有一個單一的值/結果怎么辦? 在這種情況下,我們可以使用窗口函數。 他們可以定義記錄的排名,累積分布,移動平均值,或標識當前記錄之前或之后的記錄。
讓我們使用一些Scala API示例來了解以下窗口函數:
- 匯總:min, max, avg, count, 和 sum.
- 排名:rank,dense_rank,percent_rank,row_num和ntile
- 分析性:cume_dist,lag和lead
- 自定義邊界:rangeBetween和rowsBetween
為便于參考,GitHub上提供了一個以JSON文件格式導出的Zeppelin筆記本和一個Scala文件。
創建Spark DataFrame
現在,我們創建一個示例Spark DataFrame,我們將在整個博客中使用它。 首先,讓我們加載所需的庫。
- import org.apache.spark.sql.expressions.Window
- import org.apache.spark.sql.functions._
現在,我們將使用一些虛擬數據創建DataFrame,這些虛擬數據將用于討論各種窗口函數。
- case class Salary(depName: String, empNo: Long, salary: Long)
- val empsalary = Seq(
- Salary("sales", 1, 5000),
- Salary("personnel", 2, 3900),
- Salary("sales", 3, 4800),
- Salary("sales", 4, 4800),
- Salary("personnel", 5, 3500),
- Salary("develop", 7, 4200),
- Salary("develop", 8, 6000),
- Salary("develop", 9, 4500),
- Salary("develop", 10, 5200),
- Salary("develop", 11, 5200)).toDF()
這是我們的DataFrame的樣子:

窗口集合函數
讓我們看一些聚合的窗口函數,看看它們如何工作。
首先,我們需要定義窗口的規范。 假設我們要根據部門獲取匯總數據。 因此,在此示例中,我們將基于部門名稱(列:depname)定義窗口。
為聚合函數創建窗口規范
- val byDepName = Window.partitionBy("depName")
在窗口上應用聚合函數
現在,在部門內(列:depname),我們可以應用各種聚合函數。 因此,讓我們嘗試查找每個部門的最高和最低工資。 在這里,我們僅選擇了所需的列(depName,max_salary和min_salary),并刪除了重復的記錄。
- val agg_sal = empsalary
- .withColumn("max_salary", max("salary").over(byDepName))
- .withColumn("min_salary", min("salary").over(byDepName))
- agg_sal.select("depname", "max_salary", "min_salary")
- .dropDuplicates()
- .show()
輸出:
- +---------+----------+----------+
- | depname|max_salary|min_salary|
- +---------+----------+----------+
- | develop| 6000| 4200|
- | sales| 5000| 4800|
- |personnel| 3900| 3500|
- +---------+----------+----------+
現在讓我們看看它是如何工作的。 我們已經按部門名稱對數據進行了分區:

現在,當我們執行合計函數時,它將應用于每個分區并返回合計值(在本例中為min和max)。

注意:可用的匯總函數為最大,最小,總和,平均和計數。
窗口排名功能
在本節中,我們將討論幾種類型的排名函數。
創建用于排序功能的窗口規范
現在,我們要根據員工在部門內的薪水進行排名。 薪水最高的員工將排名第一,薪水最低的員工將排名最后。 在這里,我們將基于部門(列:depname)對數據進行分區,并且在部門內,我們將根據薪水以降序對數據進行排序。
- val winSpec = Window.partitionBy("depName").orderBy("salary".desc)
對于每個部門,記錄將根據薪水以降序排序。

1.等級功能:等級
此函數將返回分區中每個記錄的等級,并跳過任何重復等級之后的后續等級:
- val rank_df = empsalary.withColumn("rank", rank().over(winSpec))
- rank_df.show()
輸出:
- +---------+-----+------+----+
- | depName|empNo|salary|rank|
- +---------+-----+------+----+
- | develop| 8| 6000| 1|
- | develop| 11| 5200| 2|
- | develop| 10| 5200| 2|
- | develop| 9| 4500| 4|
- | develop| 7| 4200| 5|
- | sales| 1| 5000| 1|
- | sales| 4| 4800| 2|
- | sales| 3| 4800| 2|
- |personnel| 2| 3900| 1|
- |personnel| 5| 3500| 2|
- +---------+-----+------+----+
在這里我們可以看到某些等級重復,而有些等級丟失。 例如,在開發部門中,我們有2名等級= 2的員工,而沒有等級= 3的員工,因為等級函數將為相同的值保留相同的等級,并相應地跳過下一個等級。
2.密集等級:densed_rank
此函數將返回分區中每個記錄的等級,但不會跳過任何等級。
- val dense_rank_df = empsalary.withColumn("dense_rank", dense_rank().over(winSpec))
- dense_rank_df.show()
輸出:
- +---------+-----+------+-----------+
- | depName|empNo|salary|desnse_rank|
- +---------+-----+------+-----------+
- | develop| 8| 6000| 1|
- | develop| 10| 5200| 2|
- | develop| 11| 5200| 2|
- | develop| 9| 4500| 3|
- | develop| 7| 4200| 4|
- | sales| 1| 5000| 1|
- | sales| 3| 4800| 2|
- | sales| 4| 4800| 2|
- |personnel| 2| 3900| 1|
- |personnel| 5| 3500| 2|
- +---------+-----+------+-----------+
在這里,我們可以看到某些等級是重復的,但是排名并沒有像我們使用等級功能時那樣丟失。 例如,在開發部門中,我們有2名員工,等級=2。density_rank函數將為相同的值保留相同的等級,但不會跳過下一個等級。
3.行號函數:row_number
此功能將在窗口內分配行號。 如果2行的排序列值相同,則不確定將哪個行號分配給具有相同值的每一行。
- val row_num_df = empsalary.withColumn("row_number", row_number().over(winSpec))
- row_num_df.show()
輸出:
- +---------+-----+------+----------+
- | depName|empNo|salary|row_number|
- +---------+-----+------+----------+
- | develop| 8| 6000| 1|
- | develop| 10| 5200| 2|
- | develop| 11| 5200| 3|
- | develop| 9| 4500| 4|
- | develop| 7| 4200| 5|
- | sales| 1| 5000| 1|
- | sales| 3| 4800| 2|
- | sales| 4| 4800| 3|
- |personnel| 2| 3900| 1|
- |personnel| 5| 3500| 2|
- +---------+-----+------+----------+
4.百分比排名函數:percent_rank
此函數將返回分區中的相對(百分數)等級。
- val percent_rank_df = empsalary.withColumn("percent_rank", percent_rank().over(winSpec))
- percent_rank_df.show()
輸出:
- +---------+-----+------+------------+
- | depName|empNo|salary|percent_rank|
- +---------+-----+------+------------+
- | develop| 8| 6000| 0.0|
- | develop| 10| 5200| 0.25|
- | develop| 11| 5200| 0.25|
- | develop| 9| 4500| 0.75|
- | develop| 7| 4200| 1.0|
- | sales| 1| 5000| 0.0|
- | sales| 3| 4800| 0.5|
- | sales| 4| 4800| 0.5|
- |personnel| 2| 3900| 0.0|
- |personnel| 5| 3500| 1.0|
- +---------+-----+------+------------+
5. N-tile功能:ntile
此功能可以根據窗口規格或分區將窗口進一步細分為n個組。 例如,如果需要將部門進一步劃分為三類,則可以將ntile指定為3。
- val ntile_df = empsalary.withColumn("ntile", ntile(3).over(winSpec))
- ntile_df.show()
輸出:
- +---------+-----+------+-----+
- | depName|empNo|salary|ntile|
- +---------+-----+------+-----+
- | develop| 8| 6000| 1|
- | develop| 10| 5200| 1|
- | develop| 11| 5200| 2|
- | develop| 9| 4500| 2|
- | develop| 7| 4200| 3|
- | sales| 1| 5000| 1|
- | sales| 3| 4800| 2|
- | sales| 4| 4800| 3|
- |personnel| 2| 3900| 1|
- |personnel| 5| 3500| 2|
- +---------+-----+------+-----+
窗口分析功能
接下來,我們將討論諸如累積分布,滯后和超前的分析功能。
1.累積分布函數:cume_dist
此函數提供窗口/分區的值的累積分布。
- val winSpec = Window.partitionBy("depName").orderBy("salary")
- val cume_dist_df =
- empsalary.withColumn("cume_dist",cume_dist().over(winSpec))
- cume_dist_df.show()
定義窗口規范并應用cume_dist函數以獲取累積分布。
輸出:
- +---------+-----+------+------------------+
- | depName|empNo|salary| cume_dist|
- +---------+-----+------+------------------+
- | develop| 7| 4200| 0.2|
- | develop| 9| 4500| 0.4|
- | develop| 10| 5200| 0.8|
- | develop| 11| 5200| 0.8|
- | develop| 8| 6000| 1.0|
- | sales| 4| 4800|0.6666666666666666|
- | sales| 3| 4800|0.6666666666666666|
- | sales| 1| 5000| 1.0|
- |personnel| 5| 3500| 0.5
- |personnel| 2| 3900| 1.0|
- +---------+-----+------+------------------+
2.滯后功能:滯后
此函數將在從DataFrame偏移行之前返回該值。
lag函數采用3個參數(lag(col,count = 1,默認= None)),col:定義需要在其上應用函數的列。 count:需要回顧多少行。 default:定義默認值。
- val winSpec = Window.partitionBy("depName").orderBy("salary")
- val lag_df =
- empsalary.withColumn("lag", lag("salary", 2).over(winSpec))
- lag_df.show()
輸出:
- +---------+-----+------+----+
- | depName|empNo|salary| lag|
- +---------+-----+------+----+
- | develop| 7| 4200|null|
- | develop| 9| 4500|null|
- | develop| 10| 5200|4200|
- | develop| 11| 5200|4500|
- | develop| 8| 6000|5200|
- | sales| 4| 4800|null|
- | sales| 3| 4800|null|
- | sales| 1| 5000|4800|
- |personnel| 5| 3500|null|
- |personnel| 2| 3900|null|
- +---------+-----+------+----+
例如,讓我們在當前行之前查找薪水2行。
- 對于depname = develop,salary = 4500。沒有這樣的行,該行在該行之前2行。 因此它將為空。

- 對于部門名稱=發展,薪水= 6000(以藍色突出顯示)。 如果我們提前兩排,我們將獲得5200的薪水(以綠色突出顯示)。

3.導聯功能:導聯
此函數將返回DataFrame的偏移行之后的值。
- val winSpec = Window.partitionBy("depName").orderBy("salary")
- val lead_df =
- empsalary.withColumn("lead", lead("salary", 2).over(winSpec))
- lead_df.show()
lead函數采用3個參數(lead(col,count = 1,默認= None))col:定義需要在其上應用函數的列。 count:對于當前行,我們需要向前/向后查找多少行。 default:定義默認值。
輸出:
- +---------+-----+------+----+
- | depName|empNo|salary| lag|
- +---------+-----+------+----+
- | develop| 7| 4200|5200|
- | develop| 9| 4500|5200|
- | develop| 10| 5200|6000|
- | develop| 11| 5200|null|
- | develop| 8| 6000|null|
- | sales| 3| 4800|5000|
- | sales| 4| 4800|null|
- | sales| 1| 5000|null|
- |personnel| 5| 3500|null|
- |personnel| 2| 3900|null|
- +---------+-----+------+----+
讓我們嘗試從當前行中查找前進/后兩行的薪水。
- 對于depname =開發人員,薪水= 4500(以藍色突出顯示)。 如果我們在前進/后退兩行,我們將獲得5200的薪水(以綠色突出顯示)。

- 對于depname =人員,薪水=3500。在此分區中,沒有此行向前2行/在該行之后。 因此我們將獲得空值。


自定義窗口定義
默認情況下,窗口的邊界由分區列定義,我們可以通過窗口規范指定順序。 例如,對于開發部門,窗口的開始是工資的最小值,窗口的結束是工資的最大值。
但是,如果我們想更改窗口的邊界怎么辦? 以下功能可用于定義每個分區內的窗口。
1. rangeBetween
使用rangeBetween函數,我們可以顯式定義邊界。例如,從當前薪水開始,將其定義為100,然后將其定義為300,并查看其含義。 從100開始表示窗口將從100個單位開始,從當前值開始以300個值結束(包括開始值和結束值)。
- val winSpec = Window.partitionBy("depName")
- .orderBy("salary")
- .rangeBetween(100L, 300L)
定義窗口規格
起始值和結束值后的L表示該值是Scala Long類型。
- val range_between_df = empsalary.withColumn("max_salary", max("salary").over(winSpec))
- range_between_df.show()
應用自定義窗口規范
輸出:
- +---------+-----+------+----------+
- | depName|empNo|salary|max_salary|
- +---------+-----+------+----------+
- | develop| 7| 4200| 4500|
- | develop| 9| 4500| null|
- | develop| 10| 5200| null|
- | develop| 11| 5200| null|
- | develop| 8| 6000| null|
- | sales| 3| 4800| 5000|
- | sales| 4| 4800| 5000|
- | sales| 1| 5000| null|
- |personnel| 5| 3500| null|
- |personnel| 2| 3900| null|
- +---------+-----+------+----------+
現在,讓我們嘗試了解輸出。
- 對于depname = developer,salary = 4200,窗口的開始將是(當前值+開始),即4200 + 100 =4300。窗口的結束將是(當前值+結束),即4200 + 300 = 4500。
由于只有一個薪水值在4300到4500之間,包括開發部門的4500,所以我們將4500作為max_salary作為4200(上面的檢查輸出)。

- 同樣,對于depname = develop,salary = 4500,窗口將為(開始:4500 + 100 = 4600,結束:4500 + 300 = 4800)。 但是開發部門沒有薪水值在4600到4800之間,因此最大值不會為空(上面的檢查輸出)。

這里有一些特殊的邊界值可以使用。
- Window.currentRow:指定一行中的當前值。
- Window.unboundedPreceding:這可以用于使窗口無限制地開始。
- Window.unbounded以下:此方法可用于使窗口具有無限的末端。
例如,我們需要從員工工資中找到最高工資,該最高工資大于300。 因此,我們將起始值定義為300L,將結束值定義為Window.unboundedFollowing:
- val winSpec = Window.partitionBy("depName").orderBy("salary")
- .rangeBetween(300L, Window.unboundedFollowing)
- val range_unbounded_df = empsalary.withColumn("max_salary", max("salary").over(winSpec))
- range_unbounded_df.show()
輸出:
- +---------+-----+------+----------+
- | depName|empNo|salary|max_salary|
- +---------+-----+------+----------+
- | develop| 7| 4200| 6000|
- | develop| 9| 4500| 6000|
- | develop| 10| 5200| 6000|
- | develop| 11| 5200| 6000|
- | develop| 8| 6000| null|
- | sales| 3| 4800| null|
- | sales| 4| 4800| null|
- | sales| 1| 5000| null|
- |personnel| 5| 3500| 3900|
- |personnel| 2| 3900| null|
- +---------+-----+------+----------+
因此,對于depname =人員,薪水=3500。窗口將是(開始:3500 + 300 = 3800,結束:無邊界)。 因此,此范圍內的最大值為3900(檢查上面的輸出)。
同樣,對于depname = sales,salary = 4800,窗口將為(開始:4800 + 300、5100,結束:無邊界)。 由于銷售部門的值不大于5100,因此結果為空。
2.rowsBetween
通過rangeBetween,我們使用排序列的值定義了窗口的開始和結束。 但是,我們也可以使用相對行位置定義窗口的開始和結束。
例如,我們要創建一個窗口,其中窗口的開始是當前行之前的一行,結束是當前行之后的一行。
定義自定義窗口規范
- val winSpec = Window.partitionBy("depName")
- .orderBy("salary").rowsBetween(-1, 1)
應用自定義窗口規范
- val rows_between_df = empsalary.withColumn("max_salary", max("salary").over(winSpec))
- rows_between_df.show()
輸出:
- +---------+-----+------+----------+
- | depName|empNo|salary|max_salary|
- +---------+-----+------+----------+
- | develop| 7| 4200| 4500|
- | develop| 9| 4500| 5200|
- | develop| 10| 5200| 5200|
- | develop| 11| 5200| 6000|
- | develop| 8| 6000| 6000|
- | sales| 3| 4800| 4800|
- | sales| 4| 4800| 5000|
- | sales| 1| 5000| 5000|
- |personnel| 5| 3500| 3900|
- |personnel| 2| 3900| 3900|
- +---------+-----+------+----------+
現在,讓我們嘗試了解輸出。
- 對于depname =開發,salary = 4500,將定義一個窗口,該窗口在當前行之前和之后一行(以綠色突出顯示)。 因此窗口內的薪水為(4200、4500、5200),最高為5200(上面的檢查輸出)。

- 同樣,對于depname = sales,salary = 5000,將在當前行的前后定義一個窗口。 由于此行之后沒有行,因此該窗口將只有2行(以綠色突出顯示),其薪水分別為(4800,5000)和max為5000(上面的檢查輸出)。

我們還可以像以前使用rangeBetween一樣使用特殊邊界Window.unboundedPreceding,Window.unboundedFollowing和Window.currentRow。
注意:rowsBetween不需要排序,但是我使用它來使每次運行的結果保持一致。