成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

Spark 3.0—簡而言之的新功能

大數據 Spark
本文通篇介紹了Spark SQL和Scala API中用于DataFrame操作訪問的Spark新功能,以及從Spark SQL移植到Scala API以進行編程訪問的功能。

Spark 3.0—簡而言之的新功能

最近,Apache Spark社區發布了Spark 3.0的預覽版,該預覽版包含許多重要的新功能,這些功能將幫助Spark創造強大的影響力,在此大數據和數據科學時代,該產品已擁有廣泛的企業用戶和開發人員。

在新版本中,Spark社區已將一些功能從Spark SQL移植到了編程的Scala API(

org.apache.spark.sql.functions),這鼓勵開發人員直接將此功能用作其DataFrame轉換的一部分,而不是直接輸入 進入SQL模式或創建視圖,并使用此函數以及SQL表達式或callUDF函數。

社區還辛苦地引入了一些新的數據轉換功能和partition_transforms函數,這些功能在與Spark的新DataFrameWriterv2一起使用以將數據寫到某些外部存儲時非常有用。

Spark 3中的一些新功能已經是Databricks Spark以前版本的一部分。 因此,如果您在Databricks云中工作,您可能會發現其中一些熟悉的功能。

本文通篇介紹了Spark SQL和Scala API中用于DataFrame操作訪問的Spark新功能,以及從Spark SQL移植到Scala API以進行編程訪問的功能。

Spark SQL中的Spark 3.0中引入的功能以及用于DataFrame轉換的功能

from_csv

像from_json一樣,此函數解析包含CSV字符串的列,并將其轉換為Struct類型。 如果CSV字符串不可解析,則將返回null。

例:

  • 該函數需要一個Struct模式和一些選項,這些模式和選項指示如何解析CSV字符串。 選項與CSV數據源相同。
    ss="dp-sql">
  1. ss="alt">val studentInfo = ss="string">"1,Jerin,CSE"::ss="string">"2,Jerlin,ECE"::ss="string">"3,Arun,CSE"::Nil 
  2. ss="">val ss="keyword">schema = new StructType()  
  3. ss="alt">            .ss="keyword">add(ss="string">"Id",IntegerType) 
  4. ss="">            .ss="keyword">add(ss="string">"Name",StringType) 
  5. ss="alt">            .ss="keyword">add(ss="string">"Dept",StringType) 
  6. ss="">val options = Map(ss="string">"delimiter" ->ss="string">","
  7. ss="alt">val studentDF = studentInfo.toDF(ss="string">"Student_Info"
  8. ss="">.withColumn(ss="string">"csv_struct",from_csv('Student_Info, ss="keyword">schema,options)) 
  9. ss="alt">studentDF.show() 

to_csv

要將"結構類型"列轉換為CSV字符串。

例:

  • 與Struct type列一起,此函數還接受可選的options參數,該參數指示如何將Struct列轉換為CSV字符串。
    ss="dp-sql">
  1. ss="alt">studentDF 
  2. ss="">.withColumn(ss="string">"csv_string",to_csv($ss="string">"csv_struct",Map.empty[String, String].asJava)) 
  3. ss="alt">.show 

推斷CSV字符串的模式,并以DDL格式返回模式。

例:

  • 該函數需要一個CSV字符串列和一個可選參數,其中包含如何解析CSV字符串的選項。
    ss="dp-sql">
  1. ss="alt">studentDF 
  2. ss="">  .withColumn(ss="string">"schema",schema_of_csv(ss="string">"csv_string")) 
  3. ss="alt">  .show 

for_all

將給定謂詞應用于數組中的所有元素,并且僅當數組中的所有元素求值為true時返回true,否則返回false。

例:

  • 檢查給定Array列中的所有元素是否均是偶數。
    ss="dp-sql">
  1. ss="alt">val  df = Seq(Seq(2,4,6),Seq(5,10,3)).toDF(ss="string">"int_array"
  2. ss="">df.withColumn(ss="string">"flag",forall($ss="string">"int_array",(x:ss="keyword">Column)=>(lit(x%2==0)))) 
  3. ss="alt">.show 

transform

將函數應用于數組中的所有元素后,返回一個新數組。

例:

  • 將" 1"添加到數組中的所有元素。
    ss="dp-sql">
  1. ss="alt">val df = Seq((Seq(2,4,6)),(Seq(5,10,3))).toDF(ss="string">"num_array"
  2. ss="">df.withColumn(ss="string">"num_array",transform($ss="string">"num_array",x=>x+1)).show 

overlay

要替換列的內容,請使用從指定字節位置到可選的指定字節長度的實際替換內容。

例:

  • 將特定人員的問候語更改為傳統的" Hello World"

這里我們用世界替換人名,因為名字的起始位置是7,并且我們要在替換內容之前刪除完整的姓名,需要刪除的字節位置的長度應大于或等于最大值 列中名稱的長度。

因此,我們將替換詞傳遞為"world",將內容替換為" 7"的特定起始位置,從指定起始位置移除的位置數為" 12"(如果未指定,則該位置是可選的 函數只會從指定的起始位置將源內容替換為替換內容)。

覆蓋替換了StringType,TimeStampType,IntegerType等中的內容。但是Column的返回類型將始終為StringType,而與Column輸入類型無關。

    ss="dp-sql">
  1. ss="alt">val greetingMsg = ss="string">"Hello Arun"::ss="string">"Hello Mohit Chawla"::ss="string">"Hello Shaurya"::Nil 
  2. ss="">val greetingDF = greetingMsg.toDF(ss="string">"greet_msg"
  3. ss="alt">greetingDF.withColumn(ss="string">"greet_msg",overlay($ss="string">"greet_msg",lit(ss="string">"World"),lit(ss="string">"7"),lit(ss="string">"12"))) 
  4. ss="">.show 

分裂

根據給定的正則表達式和指定的限制拆分字符串表達式,該限制指示將正則表達式應用于給定的字符串表達式的次數。

如果指定的限制小于或等于零,則正則表達式將在字符串上應用多次,并且結果數組將根據給定的正則表達式包含所有可能的字符串拆分。

如果指定的限制大于零,則將使用不超過該限制的正則表達式

例:

  • 根據正則表達式將給定的字符串表達式拆分為兩個。 即 字符串定界符。
    ss="dp-sql">
  1. ss="alt">val num = ss="string">"one~two~three"::ss="string">"four~five"::Nil 
  2. ss="">val numDF = num.toDF(ss="string">"numbers"
  3. ss="alt">numDF 
  4. ss="">.withColumn(ss="string">"numbers",split($ss="string">"numbers",ss="string">"~",2)) 
  5. ss="alt">.show 

將同一個字符串表達式分成多個部分,直到出現分隔符

    ss="dp-sql">
  1. ss="alt">numDF 
  2. ss="">.withColumn(ss="string">"numbers",split($ss="string">"numbers",ss="string">"~",0)) 
  3. ss="alt">.show 

map_entries

將映射鍵值轉換為無序的條目數組。

例:

  • 獲取數組中Map的所有鍵和值。
    ss="dp-sql">
  1. ss="alt">val df = Seq(Map(1->ss="string">"x",2->ss="string">"y")).toDF(ss="string">"key_values"
  2. ss="">df.withColumn(ss="string">"key_value_array",map_entries($ss="string">"key_values")) 
  3. ss="alt">.show 

map_zip_with

使用功能根據鍵將兩個Map合并為一個。

例:

  • 要計算跨部門員工的總銷售額,并通過傳遞一個函數,該函數將基于鍵匯總來自兩個不同"地圖"列的總銷售額,從而在單個地圖中獲取特定員工的總銷售額。
    ss="dp-sql">
  1. ss="alt">val df = Seq((Map(ss="string">"EID_1"->10000,ss="string">"EID_2"->25000), 
  2. ss="">             Map(ss="string">"EID_1"->1000,ss="string">"EID_2"->2500)))   .toDF(ss="string">"emp_sales_dept1",ss="string">"emp_sales_dept2"
  3. ss="alt"> 
  4. ss="">df. 
  5. ss="alt">withColumn(ss="string">"total_emp_sales",map_zip_with($ss="string">"emp_sales_dept1",$ss="string">"emp_sales_dept2",(k,v1,v2)=>(v1+v2))) 
  6. ss="">.show 

map_filter

返回僅包含滿足給定謂詞功能的Map值的新鍵值對。

例:

  • 僅篩選出銷售值高于20000的員工
    ss="dp-sql">
  1. ss="alt">val df = Seq(Map(ss="string">"EID_1"->10000,ss="string">"EID_2"->25000)) 
  2. ss="">          .toDF(ss="string">"emp_sales"
  3. ss="alt"> 
  4. ss="">df 
  5. ss="alt">.withColumn(ss="string">"filtered_sales",map_filter($ss="string">"emp_sales",(k,v)=>(v>20000))) 
  6. ss="">.show 

transform_values

根據給定的函數操作Map列中所有元素的值。

例:

  • 通過給每個雇員加薪5000來計算雇員薪水
    ss="dp-sql">
  1. ss="alt">val df = Seq(Map(ss="string">"EID_1"->10000,ss="string">"EID_2"->25000)) 
  2. ss="">         .toDF(ss="string">"emp_salary"
  3. ss="alt"> 
  4. ss="">df 
  5. ss="alt">.withColumn(ss="string">"emp_salary",transform_values($ss="string">"emp_salary",(k,v)=>(v+5000))) 
  6. ss="">.show 

transform_keys

根據給定的函數操作Map列中所有元素的鍵。

例:

  • 要將公司名稱" XYZ"添加到員工編號。
    ss="dp-sql">
  1. ss="alt">val df = Seq(Map(ss="string">"EID_1" -> 10000, ss="string">"EID_2" -> 25000)) 
  2. ss="">        .toDF(ss="string">"employees"
  3. ss="alt">df 
  4. ss="">.withColumn(ss="string">"employees", transform_keys($ss="string">"employees", (k, v) => concat(k,lit(ss="string">"_XYZ")))) 
  5. ss="alt">.show 

xhash64

要計算給定列內容的哈希碼,請使用64位xxhash算法并將結果返回為long。

從Spark SQL移植到Spark 3.0中的Scala API進行DataFrame轉換的功能

Scala API可使用大多數Spark SQL函數,該函數可將相同的函數用作DataFrame操作的一部分。 但是仍然有一些功能不能作為編程功能使用。 要使用這些功能,必須進入Spark SQL模式并將這些功能用作SQL表達式的一部分,或使用Spark" callUDF"功能使用相同的功能。 隨著功能的普及和使用不斷發展,這些功能中的某些功能過去曾被移植到新版本的程序化Spark API中。 以下是從以前版本的Spark SQL函數移植到Scala API(

org.spark.apache.sql.functions)的函數

date_sub

從日期,時間戳記和字符串數據類型中減去天數。 如果數據類型為字符串,則其格式應可轉換為日期" yyyy-MM-dd"或" yyyy-MM-dd HH:mm:ss.ssss"

例:

  • 從eventDateTime中減去" 1天"。

如果要減去的天數為負,則此功能會將給定的天數添加到實際日期中。

    ss="dp-sql">
  1. ss="alt">var df = Seq( 
  2. ss="">        (1, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-01 23:00:01")), 
  3. ss="alt">        (2, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-02 12:40:32")), 
  4. ss="">        (3, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-03 09:54:00")), 
  5. ss="alt">        (4, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-04 10:12:43")) 
  6. ss="">         ) 
  7. ss="alt">     .toDF(ss="string">"typeId",ss="string">"eventDateTime"
  8. ss=""> 
  9. ss="alt"> df.withColumn(ss="string">"Adjusted_Date",date_sub($ss="string">"eventDateTime",1)).show() 

date_add

與date_sub相同,但是將天數添加到實際天數中。

例:

  • 將" 1天"添加到eventDateTime
    ss="dp-sql">
  1. ss="alt">var df = Seq( 
  2. ss="">         (1, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-01 23:00:01")), 
  3. ss="alt">         (2, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-02 12:40:32")), 
  4. ss="">         (3, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-03 09:54:00")), 
  5. ss="alt">         (4, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-04 10:12:43")) 
  6. ss="">         ) 
  7. ss="alt">    .toDF(ss="string">"Id",ss="string">"eventDateTime"
  8. ss="">df 
  9. ss="alt">.withColumn(ss="string">"Adjusted Date",date_add($ss="string">"eventDateTime",1)) 
  10. ss="">.show() 

months_add

像date_add和date_sub一樣,此功能有助于添加月份。

減去月份,將要減去的月份數設為負數,因為沒有單獨的減去函數用于減去月份

例:

  • 從eventDateTime添加和減去一個月。
    ss="dp-sql">
  1. ss="alt">var df = Seq( 
  2. ss="">    (1, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-01 23:00:01")), 
  3. ss="alt">    (2, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-02 12:40:32")), 
  4. ss="">    (3, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-03 09:54:00")), 
  5. ss="alt">    (4, ss="keyword">Timestamp.valueOf(ss="string">"2020-01-04 10:12:43")) 
  6. ss="">     ).toDF(ss="string">"typeId",ss="string">"eventDateTime"
  7. ss="alt">//ss="keyword">To ss="keyword">add one months 
  8. ss=""> df 
  9. ss="alt">.withColumn(ss="string">"Adjusted Date",add_months($ss="string">"eventDateTime",1)) 
  10. ss="">.show() 
  11. ss="alt">//ss="keyword">To subtract one months 
  12. ss="">df 
  13. ss="alt">.withColumn(ss="string">"Adjusted Date",add_months($ss="string">"eventDateTime",-1)) 
  14. ss="">.show() 

zip_with

通過應用函數合并左右數組。

此函數期望兩個數組的長度都相同,如果其中一個數組比另一個數組短,則將添加null以匹配更長的數組長度。

例:

  • 將兩個數組的內容相加并合并為一個
    ss="dp-sql">
  1. ss="alt">val df = Seq((Seq(2,4,6),Seq(5,10,3))) 
  2. ss="">         .toDF(ss="string">"array_1",ss="string">"array_2"
  3. ss="alt">   
  4. ss=""> df 
  5. ss="alt">.withColumn(ss="string">"merged_array",zip_with($ss="string">"array_1",$ss="string">"array_2",(x,y)=>(x+y))) 
  6. ss=""> .show 

將謂詞應用于所有元素,并檢查數組中的至少一個或多個元素是否對謂詞函數成立。

例:

  • 檢查數組中至少一個元素是否為偶數。
    ss="dp-sql">
  1. ss="alt">val df= Seq(Seq(2,4,6),Seq(5,10,3)).toDF(ss="string">"num_array"
  2. ss="">df.withColumn(ss="string">"flag",exists($ss="string">"num_array", x =>lit(x%2===0))) 
  3. ss="alt">.show 

過濾

將給定謂詞應用于數組中的所有元素,并過濾掉謂詞為true的元素。

例:

  • 僅過濾掉數組中的偶數元素。
    ss="dp-sql">
  1. ss="alt">val df = Seq(Seq(2,4,6),Seq(5,10,3)).toDF(ss="string">"num_array"
  2. ss="">df.withColumn(ss="string">"even_array",filter($ss="string">"num_array", x =>lit(x%2===0))) 
  3. ss="alt">.show 

聚合 aggregate

使用給定函數將給定數組和另一個值/狀態簡化為單個值,并應用可選的finish函數將縮減后的值轉換為另一個狀態/值。

例:

  • 將10加到數組的總和并將結果乘以2
    ss="dp-sql">
  1. ss="alt">val df = Seq((Seq(2,4,6),3),(Seq(5,10,3),8)) 
  2. ss="">  .toDF(ss="string">"num_array",ss="string">"constant"
  3. ss="alt">df.withColumn(ss="string">"reduced_array",aggregate($ss="string">"num_array", $ss="string">"constant",(x,y)=>x+y,x => x*2)) 
  4. ss="">  .show 

Spark 3.0中為Spark SQL模式引入的功能

以下是新的SQL函數,您只能在Spark SQL模式下才能使用它們。

acosh

查找給定表達式的雙曲余弦的倒數。

asinh

找出給定表達式的雙曲正弦的逆。

atanh

查找給定表達式的雙曲正切的逆。

bit_and,bit_or和bit_xor

計算按位AND,OR和XOR值

bit_count

返回計數的位數。

bool_and和bool_or

驗證表達式的所有值是否為真或驗證表達式中的至少一個為真。

count_if

返回一列中的真值數量

例:

  • 找出給定列中的偶數值
    ss="dp-sql">
  1. ss="alt">var df = Seq((1),(2),(4)).toDF(ss="string">"num"
  2. ss=""> 
  3. ss="alt"> df.createOrReplaceTempView(ss="string">"table"
  4. ss="">spark.sql(ss="string">"select count_if(num %2==0) from table").show 

date_part

提取日期/時間戳的一部分,例如小時,分鐘等…

div

用于將表達式或帶有另一個表達式/列的列分開

every 和 sum

如果給定的表達式對每個列的所有列值都求值為true,并且至少一個值對某些值求得true,則此函數返回true。

make_date,make_interval和make_timestamp

構造日期,時間戳和特定間隔。

例:

    ss="dp-sql">
  1. ss="alt">ss="keyword">SELECT make_timestamp(2020, 01, 7, 30, 45.887) 

max_by和min_by

比較兩列并返回與右列的最大值/最小值關聯的左列的值

例:

    ss="dp-sql">
  1. ss="alt">var df = Seq((1,1),(2,1),(4,3)).toDF(ss="string">"x",ss="string">"y"
  2. ss=""> 
  3. ss="alt"> df.createOrReplaceTempView(ss="string">"table"
  4. ss="">spark.sql(ss="string">"select max_by(x,y) from table").show 

類型

返回列值的數據類型

返回Spark版本及其git版本

justify_days,justify_hours和justify_interval

新引入的對齊功能用于調整時間間隔。

例:

  • 表示30天為一個月,
    ss="dp-sql">
  1. ss="alt">ss="keyword">SELECT justify_days(interval ss="string">'30 day'

分區轉換功能

從Spark 3.0及更高版本開始,存在一些新功能,這些功能有助于對數據進行分區,我將在另一篇文章中介紹。

總體而言,我們已經分析了所有數據轉換和分析功能,這些功能是3.0版本中產生的火花。 希望本指南有助于您了解這些新功能。 這些功能肯定會加速火花開發工作,并有助于建立堅固有效的火花管道。

如果您有任何疑問,請在Twitter上將其扔給我。

責任編輯:未麗燕 來源: 今日頭條
相關推薦

2009-09-17 09:39:28

Chrome 3.0谷歌瀏覽器

2009-06-17 16:21:43

Spring3.0新功

2009-04-20 08:40:19

Iphone蘋果移動OS

2024-01-03 08:08:51

Pulsar版本數據

2011-11-06 21:27:38

Eclipse

2021-06-04 17:57:04

微信微信圈子騰訊

2020-10-24 17:52:10

工業物聯網IIOT物聯網

2011-02-28 17:41:20

SQL Server

2020-11-09 07:15:51

Fedora 33WorkstationLinux

2009-06-19 12:53:56

Spring 2.0

2020-08-16 09:25:21

Windows 10Windows操作系統

2023-03-29 13:08:27

Ubuntu新功能

2022-08-26 13:52:20

FedoraFedora 37

2012-07-31 15:25:46

Word 2013Office 2013

2013-11-13 10:07:26

Visual Stud微軟

2022-10-20 09:41:49

XubuntuLinux軟件

2013-07-01 09:58:58

Windows 8.1

2012-09-13 11:08:53

IBMdw

2012-07-20 10:21:13

Ubuntu開源

2024-04-26 07:36:42

Hudi 1.0數據湖倉數據查詢
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 免费在线观看黄色av | 精品国产一区二区三区久久久蜜月 | 男女羞羞视频在线 | 成年人在线观看视频 | 黄色免费av | 蜜桃毛片 | 日本电影网站 | 中文字幕成人在线 | 波多野结衣先锋影音 | 精品一区在线看 | 性高朝久久久久久久3小时 av一区二区三区四区 | 久久精品国产亚洲夜色av网站 | 亚洲一区在线日韩在线深爱 | 伊人网综合 | 久久久久久女 | 超碰在线国产 | 久久久.com| 午夜精品视频一区 | 国产精品一区二区视频 | 国产精品一区二区三区免费观看 | 另类专区成人 | 午夜天堂精品久久久久 | 免费一级毛片 | 国产精品美女久久久久久久网站 | 久久精品天堂 | 欧美乱淫视频 | 国产男女猛烈无遮掩视频免费网站 | 日韩免费成人av | 日韩精品一区二区三区高清免费 | 视频在线一区二区 | 手机在线一区二区三区 | 黑人巨大精品欧美黑白配亚洲 | 欧美一级欧美一级在线播放 | 国产精品久久久久aaaa樱花 | 国产精品一区在线播放 | 亚洲狠狠| 手机在线一区二区三区 | 精品毛片 | 精品伊人 | 毛片一级片 | 在线a视频 |