成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

深入了解Apache Spark窗口功能

大數據 Spark
在此博客文章中,我們將深入探討Apache Spark窗口函數。 您可能也對我之前有關Apache Spark的帖子感興趣。

窗口函數對數據組進行操作,并為每個記錄或組返回值

 

[[331416]]
> Photo by Tom Blackout on Unsplash

在此博客文章中,我們將深入探討Apache Spark窗口函數。 您可能也對我之前有關Apache Spark的帖子感興趣。

  • 使用Apache Spark開始您的旅程-第1部分
  • 使用Apache Spark開始您的旅程-第2部分
  • Apache Spark開始您的旅程-第3部分
  • 深入研究Apache Spark DateTime函數
  • 在Apache Spark中使用JSON

首先,讓我們看看什么是窗口函數以及何時使用它們。 我們在Apache Spark中使用了各種功能,例如月份(從日期返回月份),四舍五入(舍入值)和地板(為給定的輸入提供底值)等,這些功能將在每條記錄上執行并返回一個值 每條記錄。 然后,我們將對一組數據執行各種聚合函數,并為每個組返回一個值,例如sum,avg,min,max和count。 但是,如果我們想對一組數據執行該操作,并且希望對每個記錄有一個單一的值/結果怎么辦? 在這種情況下,我們可以使用窗口函數。 他們可以定義記錄的排名,累積分布,移動平均值,或標識當前記錄之前或之后的記錄。

讓我們使用一些Scala API示例來了解以下窗口函數:

  • 匯總:min, max, avg, count, 和 sum.
  • 排名:rank,dense_rank,percent_rank,row_num和ntile
  • 分析性:cume_dist,lag和lead
  • 自定義邊界:rangeBetween和rowsBetween

為便于參考,GitHub上提供了一個以JSON文件格式導出的Zeppelin筆記本和一個Scala文件。

創建Spark DataFrame

現在,我們創建一個示例Spark DataFrame,我們將在整個博客中使用它。 首先,讓我們加載所需的庫。

  1. import org.apache.spark.sql.expressions.Window 
  2. import org.apache.spark.sql.functions._ 

現在,我們將使用一些虛擬數據創建DataFrame,這些虛擬數據將用于討論各種窗口函數。

  1. case class Salary(depName: String, empNo: Long, salary: Long) 
  2.  
  3. val empsalary = Seq( 
  4.   Salary("sales", 1, 5000), 
  5.   Salary("personnel", 2, 3900), 
  6.   Salary("sales", 3, 4800), 
  7.   Salary("sales", 4, 4800), 
  8.   Salary("personnel", 5, 3500), 
  9.   Salary("develop", 7, 4200), 
  10.   Salary("develop", 8, 6000), 
  11.   Salary("develop", 9, 4500), 
  12.   Salary("develop", 10, 5200), 
  13.   Salary("develop", 11, 5200)).toDF() 

這是我們的DataFrame的樣子:

 

深入了解Apache Spark窗口功能

 

窗口集合函數

讓我們看一些聚合的窗口函數,看看它們如何工作。

首先,我們需要定義窗口的規范。 假設我們要根據部門獲取匯總數據。 因此,在此示例中,我們將基于部門名稱(列:depname)定義窗口。

為聚合函數創建窗口規范

  1. val byDepName = Window.partitionBy("depName"

在窗口上應用聚合函數

現在,在部門內(列:depname),我們可以應用各種聚合函數。 因此,讓我們嘗試查找每個部門的最高和最低工資。 在這里,我們僅選擇了所需的列(depName,max_salary和min_salary),并刪除了重復的記錄。

  1. val agg_sal = empsalary 
  2.            .withColumn("max_salary"max("salary").over(byDepName)) 
  3.            .withColumn("min_salary"min("salary").over(byDepName)) 
  4.                  
  5.  
  6. agg_sal.select("depname""max_salary""min_salary"
  7.         .dropDuplicates() 
  8.         .show() 

輸出:

  1. +---------+----------+----------+ 
  2. |  depname|max_salary|min_salary| 
  3. +---------+----------+----------+ 
  4. |  develop|      6000|      4200| 
  5. |    sales|      5000|      4800| 
  6. |personnel|      3900|      3500| 
  7. +---------+----------+----------+ 

現在讓我們看看它是如何工作的。 我們已經按部門名稱對數據進行了分區:

 

深入了解Apache Spark窗口功能

現在,當我們執行合計函數時,它將應用于每個分區并返回合計值(在本例中為min和max)。

 

深入了解Apache Spark窗口功能

注意:可用的匯總函數為最大,最小,總和,平均和計數。

窗口排名功能

在本節中,我們將討論幾種類型的排名函數。

創建用于排序功能的窗口規范

現在,我們要根據員工在部門內的薪水進行排名。 薪水最高的員工將排名第一,薪水最低的員工將排名最后。 在這里,我們將基于部門(列:depname)對數據進行分區,并且在部門內,我們將根據薪水以降序對數據進行排序。

  1. val winSpec = Window.partitionBy("depName").orderBy("salary".desc

對于每個部門,記錄將根據薪水以降序排序。

 

深入了解Apache Spark窗口功能

1.等級功能:等級

此函數將返回分區中每個記錄的等級,并跳過任何重復等級之后的后續等級:

  1. val rank_df = empsalary.withColumn("rank", rank().over(winSpec)) 
  2. rank_df.show() 

輸出:

  1. +---------+-----+------+----+ 
  2. |  depName|empNo|salary|rank| 
  3. +---------+-----+------+----+ 
  4. |  develop|    8|  6000|   1| 
  5. |  develop|   11|  5200|   2| 
  6. |  develop|   10|  5200|   2| 
  7. |  develop|    9|  4500|   4| 
  8. |  develop|    7|  4200|   5| 
  9. |    sales|    1|  5000|   1| 
  10. |    sales|    4|  4800|   2| 
  11. |    sales|    3|  4800|   2| 
  12. |personnel|    2|  3900|   1| 
  13. |personnel|    5|  3500|   2| 
  14. +---------+-----+------+----+ 

在這里我們可以看到某些等級重復,而有些等級丟失。 例如,在開發部門中,我們有2名等級= 2的員工,而沒有等級= 3的員工,因為等級函數將為相同的值保留相同的等級,并相應地跳過下一個等級。

2.密集等級:densed_rank

此函數將返回分區中每個記錄的等級,但不會跳過任何等級。

  1. val dense_rank_df = empsalary.withColumn("dense_rank", dense_rank().over(winSpec)) 
  2.  
  3. dense_rank_df.show() 

輸出:

  1. +---------+-----+------+-----------+ 
  2. |  depName|empNo|salary|desnse_rank| 
  3. +---------+-----+------+-----------+ 
  4. |  develop|    8|  6000|          1| 
  5. |  develop|   10|  5200|          2| 
  6. |  develop|   11|  5200|          2| 
  7. |  develop|    9|  4500|          3| 
  8. |  develop|    7|  4200|          4| 
  9. |    sales|    1|  5000|          1| 
  10. |    sales|    3|  4800|          2| 
  11. |    sales|    4|  4800|          2| 
  12. |personnel|    2|  3900|          1| 
  13. |personnel|    5|  3500|          2| 
  14. +---------+-----+------+-----------+ 

在這里,我們可以看到某些等級是重復的,但是排名并沒有像我們使用等級功能時那樣丟失。 例如,在開發部門中,我們有2名員工,等級=2。density_rank函數將為相同的值保留相同的等級,但不會跳過下一個等級。

3.行號函數:row_number

此功能將在窗口內分配行號。 如果2行的排序列值相同,則不確定將哪個行號分配給具有相同值的每一行。

  1. val row_num_df = empsalary.withColumn("row_number", row_number().over(winSpec)) 
  2.  
  3. row_num_df.show() 

輸出:

  1. +---------+-----+------+----------+ 
  2. |  depName|empNo|salary|row_number| 
  3. +---------+-----+------+----------+ 
  4. |  develop|    8|  6000|         1| 
  5. |  develop|   10|  5200|         2| 
  6. |  develop|   11|  5200|         3| 
  7. |  develop|    9|  4500|         4| 
  8. |  develop|    7|  4200|         5| 
  9. |    sales|    1|  5000|         1| 
  10. |    sales|    3|  4800|         2| 
  11. |    sales|    4|  4800|         3| 
  12. |personnel|    2|  3900|         1| 
  13. |personnel|    5|  3500|         2| 
  14. +---------+-----+------+----------+ 

4.百分比排名函數:percent_rank

此函數將返回分區中的相對(百分數)等級。

  1. val percent_rank_df = empsalary.withColumn("percent_rank", percent_rank().over(winSpec)) 
  2.  
  3. percent_rank_df.show() 

輸出:

  1. +---------+-----+------+------------+ 
  2. |  depName|empNo|salary|percent_rank| 
  3. +---------+-----+------+------------+ 
  4. |  develop|    8|  6000|         0.0| 
  5. |  develop|   10|  5200|        0.25| 
  6. |  develop|   11|  5200|        0.25| 
  7. |  develop|    9|  4500|        0.75| 
  8. |  develop|    7|  4200|         1.0| 
  9. |    sales|    1|  5000|         0.0| 
  10. |    sales|    3|  4800|         0.5| 
  11. |    sales|    4|  4800|         0.5| 
  12. |personnel|    2|  3900|         0.0| 
  13. |personnel|    5|  3500|         1.0| 
  14. +---------+-----+------+------------+ 

5. N-tile功能:ntile

此功能可以根據窗口規格或分區將窗口進一步細分為n個組。 例如,如果需要將部門進一步劃分為三類,則可以將ntile指定為3。

  1. val ntile_df = empsalary.withColumn("ntile", ntile(3).over(winSpec)) 
  2.  
  3. ntile_df.show() 

輸出:

  1. +---------+-----+------+-----+ 
  2. |  depName|empNo|salary|ntile| 
  3. +---------+-----+------+-----+ 
  4. |  develop|    8|  6000|    1| 
  5. |  develop|   10|  5200|    1| 
  6. |  develop|   11|  5200|    2| 
  7. |  develop|    9|  4500|    2| 
  8. |  develop|    7|  4200|    3| 
  9. |    sales|    1|  5000|    1| 
  10. |    sales|    3|  4800|    2| 
  11. |    sales|    4|  4800|    3| 
  12. |personnel|    2|  3900|    1| 
  13. |personnel|    5|  3500|    2| 
  14. +---------+-----+------+-----+ 

 


窗口分析功能

接下來,我們將討論諸如累積分布,滯后和超前的分析功能。

1.累積分布函數:cume_dist

此函數提供窗口/分區的值的累積分布。

  1. val winSpec = Window.partitionBy("depName").orderBy("salary"
  2.  
  3. val cume_dist_df =  
  4.               empsalary.withColumn("cume_dist",cume_dist().over(winSpec)) 
  5.  
  6. cume_dist_df.show() 

定義窗口規范并應用cume_dist函數以獲取累積分布。

輸出:

  1. +---------+-----+------+------------------+ 
  2. |  depName|empNo|salary|         cume_dist| 
  3. +---------+-----+------+------------------+ 
  4. |  develop|    7|  4200|               0.2| 
  5. |  develop|    9|  4500|               0.4| 
  6. |  develop|   10|  5200|               0.8| 
  7. |  develop|   11|  5200|               0.8| 
  8. |  develop|    8|  6000|               1.0| 
  9. |    sales|    4|  4800|0.6666666666666666| 
  10. |    sales|    3|  4800|0.6666666666666666| 
  11. |    sales|    1|  5000|               1.0| 
  12. |personnel|    5|  3500|               0.5 
  13. |personnel|    2|  3900|               1.0| 
  14. +---------+-----+------+------------------+ 

2.滯后功能:滯后

此函數將在從DataFrame偏移行之前返回該值。

lag函數采用3個參數(lag(col,count = 1,默認= None)),col:定義需要在其上應用函數的列。 count:需要回顧多少行。 default:定義默認值。

  1. val winSpec = Window.partitionBy("depName").orderBy("salary"
  2.  
  3. val lag_df =  
  4.           empsalary.withColumn("lag", lag("salary", 2).over(winSpec)) 
  5.  
  6. lag_df.show() 

輸出:

  1. +---------+-----+------+----+ 
  2. |  depName|empNo|salary| lag| 
  3. +---------+-----+------+----+ 
  4. |  develop|    7|  4200|null
  5. |  develop|    9|  4500|null
  6. |  develop|   10|  5200|4200| 
  7. |  develop|   11|  5200|4500| 
  8. |  develop|    8|  6000|5200| 
  9. |    sales|    4|  4800|null
  10. |    sales|    3|  4800|null
  11. |    sales|    1|  5000|4800| 
  12. |personnel|    5|  3500|null
  13. |personnel|    2|  3900|null
  14. +---------+-----+------+----+ 

例如,讓我們在當前行之前查找薪水2行。

  • 對于depname = develop,salary = 4500。沒有這樣的行,該行在該行之前2行。 因此它將為空。

 

深入了解Apache Spark窗口功能
  • 對于部門名稱=發展,薪水= 6000(以藍色突出顯示)。 如果我們提前兩排,我們將獲得5200的薪水(以綠色突出顯示)。

 

深入了解Apache Spark窗口功能

3.導聯功能:導聯

此函數將返回DataFrame的偏移行之后的值。

  1. val winSpec = Window.partitionBy("depName").orderBy("salary"
  2.  
  3. val lead_df =  
  4.           empsalary.withColumn("lead", lead("salary", 2).over(winSpec)) 
  5.  
  6. lead_df.show() 

lead函數采用3個參數(lead(col,count = 1,默認= None))col:定義需要在其上應用函數的列。 count:對于當前行,我們需要向前/向后查找多少行。 default:定義默認值。

輸出:

  1. +---------+-----+------+----+ 
  2. |  depName|empNo|salary| lag| 
  3. +---------+-----+------+----+ 
  4. |  develop|    7|  4200|5200| 
  5. |  develop|    9|  4500|5200| 
  6. |  develop|   10|  5200|6000| 
  7. |  develop|   11|  5200|null
  8. |  develop|    8|  6000|null
  9. |    sales|    3|  4800|5000| 
  10. |    sales|    4|  4800|null
  11. |    sales|    1|  5000|null
  12. |personnel|    5|  3500|null
  13. |personnel|    2|  3900|null
  14. +---------+-----+------+----+ 

讓我們嘗試從當前行中查找前進/后兩行的薪水。

  • 對于depname =開發人員,薪水= 4500(以藍色突出顯示)。 如果我們在前進/后退兩行,我們將獲得5200的薪水(以綠色突出顯示)。

 

深入了解Apache Spark窗口功能
  • 對于depname =人員,薪水=3500。在此分區中,沒有此行向前2行/在該行之后。 因此我們將獲得空值。

 

深入了解Apache Spark窗口功能

 

深入了解Apache Spark窗口功能

自定義窗口定義

默認情況下,窗口的邊界由分區列定義,我們可以通過窗口規范指定順序。 例如,對于開發部門,窗口的開始是工資的最小值,窗口的結束是工資的最大值。

但是,如果我們想更改窗口的邊界怎么辦? 以下功能可用于定義每個分區內的窗口。

1. rangeBetween

使用rangeBetween函數,我們可以顯式定義邊界。例如,從當前薪水開始,將其定義為100,然后將其定義為300,并查看其含義。 從100開始表示窗口將從100個單位開始,從當前值開始以300個值結束(包括開始值和結束值)。

  1. val winSpec = Window.partitionBy("depName"
  2.           .orderBy("salary"
  3.           .rangeBetween(100L, 300L) 

定義窗口規格

起始值和結束值后的L表示該值是Scala Long類型。

  1. val range_between_df = empsalary.withColumn("max_salary"max("salary").over(winSpec)) 
  2.  
  3. range_between_df.show() 

應用自定義窗口規范

輸出:

  1. +---------+-----+------+----------+ 
  2. |  depName|empNo|salary|max_salary| 
  3. +---------+-----+------+----------+ 
  4. |  develop|    7|  4200|      4500| 
  5. |  develop|    9|  4500|      null
  6. |  develop|   10|  5200|      null
  7. |  develop|   11|  5200|      null
  8. |  develop|    8|  6000|      null
  9. |    sales|    3|  4800|      5000| 
  10. |    sales|    4|  4800|      5000| 
  11. |    sales|    1|  5000|      null
  12. |personnel|    5|  3500|      null
  13. |personnel|    2|  3900|      null
  14. +---------+-----+------+----------+ 

現在,讓我們嘗試了解輸出。

  • 對于depname = developer,salary = 4200,窗口的開始將是(當前值+開始),即4200 + 100 =4300。窗口的結束將是(當前值+結束),即4200 + 300 = 4500。

由于只有一個薪水值在4300到4500之間,包括開發部門的4500,所以我們將4500作為max_salary作為4200(上面的檢查輸出)。

 

深入了解Apache Spark窗口功能
  • 同樣,對于depname = develop,salary = 4500,窗口將為(開始:4500 + 100 = 4600,結束:4500 + 300 = 4800)。 但是開發部門沒有薪水值在4600到4800之間,因此最大值不會為空(上面的檢查輸出)。

 

深入了解Apache Spark窗口功能

這里有一些特殊的邊界值可以使用。

  • Window.currentRow:指定一行中的當前值。
  • Window.unboundedPreceding:這可以用于使窗口無限制地開始。
  • Window.unbounded以下:此方法可用于使窗口具有無限的末端。

例如,我們需要從員工工資中找到最高工資,該最高工資大于300。 因此,我們將起始值定義為300L,將結束值定義為Window.unboundedFollowing:

  1. val winSpec = Window.partitionBy("depName").orderBy("salary"
  2.              .rangeBetween(300L, Window.unboundedFollowing) 
  3.  
  4. val range_unbounded_df = empsalary.withColumn("max_salary"max("salary").over(winSpec)) 
  5.  
  6. range_unbounded_df.show() 

輸出:

  1. +---------+-----+------+----------+ 
  2. |  depName|empNo|salary|max_salary| 
  3. +---------+-----+------+----------+ 
  4. |  develop|    7|  4200|      6000| 
  5. |  develop|    9|  4500|      6000| 
  6. |  develop|   10|  5200|      6000| 
  7. |  develop|   11|  5200|      6000| 
  8. |  develop|    8|  6000|      null
  9. |    sales|    3|  4800|      null
  10. |    sales|    4|  4800|      null
  11. |    sales|    1|  5000|      null
  12. |personnel|    5|  3500|      3900| 
  13. |personnel|    2|  3900|      null
  14. +---------+-----+------+----------+ 

因此,對于depname =人員,薪水=3500。窗口將是(開始:3500 + 300 = 3800,結束:無邊界)。 因此,此范圍內的最大值為3900(檢查上面的輸出)。

同樣,對于depname = sales,salary = 4800,窗口將為(開始:4800 + 300、5100,結束:無邊界)。 由于銷售部門的值不大于5100,因此結果為空。

2.rowsBetween

通過rangeBetween,我們使用排序列的值定義了窗口的開始和結束。 但是,我們也可以使用相對行位置定義窗口的開始和結束。

例如,我們要創建一個窗口,其中窗口的開始是當前行之前的一行,結束是當前行之后的一行。

定義自定義窗口規范

  1. val winSpec = Window.partitionBy("depName"
  2.             .orderBy("salary").rowsBetween(-1, 1) 

應用自定義窗口規范

  1. val rows_between_df = empsalary.withColumn("max_salary"max("salary").over(winSpec)) 
  2.  
  3. rows_between_df.show() 

輸出:

  1. +---------+-----+------+----------+ 
  2. |  depName|empNo|salary|max_salary| 
  3. +---------+-----+------+----------+ 
  4. |  develop|    7|  4200|      4500| 
  5. |  develop|    9|  4500|      5200| 
  6. |  develop|   10|  5200|      5200| 
  7. |  develop|   11|  5200|      6000| 
  8. |  develop|    8|  6000|      6000| 
  9. |    sales|    3|  4800|      4800| 
  10. |    sales|    4|  4800|      5000| 
  11. |    sales|    1|  5000|      5000| 
  12. |personnel|    5|  3500|      3900| 
  13. |personnel|    2|  3900|      3900| 
  14. +---------+-----+------+----------+ 

現在,讓我們嘗試了解輸出。

  • 對于depname =開發,salary = 4500,將定義一個窗口,該窗口在當前行之前和之后一行(以綠色突出顯示)。 因此窗口內的薪水為(4200、4500、5200),最高為5200(上面的檢查輸出)。

 

深入了解Apache Spark窗口功能
  • 同樣,對于depname = sales,salary = 5000,將在當前行的前后定義一個窗口。 由于此行之后沒有行,因此該窗口將只有2行(以綠色突出顯示),其薪水分別為(4800,5000)和max為5000(上面的檢查輸出)。

 

深入了解Apache Spark窗口功能

我們還可以像以前使用rangeBetween一樣使用特殊邊界Window.unboundedPreceding,Window.unboundedFollowing和Window.currentRow。

注意:rowsBetween不需要排序,但是我使用它來使每次運行的結果保持一致。

責任編輯:未麗燕 來源: 今日頭條
相關推薦

2023-10-06 00:04:02

2019-06-28 14:31:03

網絡協議棧Flink數據

2010-06-23 20:31:54

2010-07-13 09:36:25

2010-11-19 16:22:14

Oracle事務

2022-08-26 13:48:40

EPUBLinux

2009-08-25 16:27:10

Mscomm控件

2020-09-21 09:53:04

FlexCSS開發

2020-07-20 06:35:55

BashLinux

2018-09-26 11:12:35

iOS蘋果功能

2010-02-03 17:08:48

千兆交換機

2010-01-11 09:46:56

智能交換機

2010-02-22 10:08:33

MySQL 5.5分區

2011-07-18 15:08:34

2022-06-03 10:09:32

威脅檢測軟件

2010-11-15 11:40:44

Oracle表空間

2018-06-22 13:05:02

前端JavaScript引擎

2021-01-19 12:00:39

前端監控代碼

2010-09-27 09:31:42

JVM內存結構

2021-04-28 10:13:58

zookeeperZNode核心原理
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 中文字幕av网址 | 欧美午夜剧场 | 区一区二在线观看 | 久久久久久久久久久久久9999 | 亚洲欧美一区二区三区国产精品 | 免费视频99 | 日韩精品一区二区三区在线播放 | 久久精品免费看 | 成人免费xxxxx在线视频 | 日韩欧美一级片 | 色偷偷噜噜噜亚洲男人 | 97伦理电影 | 久久一区二 | 精品欧美黑人一区二区三区 | 久久99精品久久 | 超碰97人人人人人蜜桃 | 精品一二三区 | 美女爽到呻吟久久久久 | 午夜www | 日本又色又爽又黄的大片 | 国产一区久久精品 | 91最新视频 | 日韩精品一区二区三区第95 | 99久9| 国产一区二区久久 | 国产一区二区在线免费观看 | 国产农村妇女精品一二区 | 久久成人av电影 | 亚洲欧美在线一区 | 国产一区在线免费观看视频 | 亚洲精品在线视频 | 国产精品无码久久久久 | 国产a一区二区 | 亚洲精品免费视频 | 亚洲欧美激情精品一区二区 | 日本精品久久久久 | 久久久精品网 | 91传媒在线播放 | 日韩精品一区二区三区在线播放 | 久久国产麻豆 | av一二三四 |