成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

使用Spark Streaming轉換不同的JSON有效負載

譯文
開發 開發工具 Spark
使用 Spark Streaming,你只需要從數據源創建一個讀流,這樣就可以創建寫入流將數據加載到目標數據源中。

【51CTO.com快譯】Spark Streaming 是底層基于 Spark Core 的對大數據進行實時計算的框架,可以流方式從源讀取數據。只需要從數據源創建一個讀取流,然后我們可以創建寫入流以將數據加載到目標數據源中。

?[[418750]]?

接下來的演示,將假設我們有不同的 JSON 有效負載進入一個 kafka 主題,我們需要將其轉換并寫入另一個 kafka 主題。

創建一個ReadStream

為了能連續接收JSON有效負載作為消息。我們需要首先讀取消息并使用spark的readstream創建數據幀。Spark 中提供了 readStream 函數,我們可以使用這個函數基本上創建一個 readStream。這將從 kafka 主題中讀取流負載。 

val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("subscribe", "topic1")
.load()

我們可以創建一個 case-class(例如CustomerUnion),它將包含JSON有效負載的所有可能字段。這樣,我們就能在數據幀上運行select查詢而不會失敗。 

val rawDfValue = rawData.selectExpr("CAST(value AS STRING)").as[String]

val schema = ScalaReflection.schemaFor[CustomerUnion].dataType.asInstanceOf[StructType]

val extractedDFWithSchema = rawDfValue.select(from_json(col("value"), schema).as("data")).select("data.*")

extractedDFWithSchema.createOrReplaceTempView(“tempView”)

這將為我們提供一個數據幀提取的 DFWithSchema,其中包含作為有效負載字段的列。

示例輸入負載

這是兩個樣本輸入有效負載,但也可以有更多的有效負載,有些字段不存在(變量)。 

{
“id”: 1234,
“firstName”:”Jon”,
“lastName”:”Butler”,
“City”:”Newyork”,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}


{
“firstName”:”Jon”,
“lastName”:”Butler”,
“City”:”Newyork”,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}

樣例輸出負載

根據id字段,我們將決定輸出有效負載。如果存在一個 id 字段,我們將把它視為一個用戶更新案例,并且在輸出有效負載中只發送“Email”和“Phone”。我們可以根據某些條件配置任何字段。這只是一個例子。

如果 id 不存在,我們將發送所有字段。下面是兩個輸出載荷的示例: 

{
“userid”: 1234,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}


{
“fullname”:”Jon Butler”,
“City”:”Newyork”,
“Email”:abc@gmail.com,
“Phone”:”2323123”
}

開始WriteStreams

一旦我們有了數據幀,我們就可以運行盡可能多的sql查詢,并根據所需的有效負載寫入 kafka 主題。因此,我們可以創建一個包含所有sql查詢的列表,并通過該列表進行循環,并調用writeStream函數。讓我們假設,我們有一個名為 queryList 的列表,它只包含字符串(即sql查詢)。

下面為寫入流定義的一個函數: 

def startWriteStream(query: String): Unit = {

val transformedDf = spark.sql(query)
transformedDf
.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
.writeStream
.format("kafka")
.option("kafka.bootstrap.servers", "host1:port1,host2:port2")
.option("topic", "topic1")
.start()

}

這將啟動列表中每個查詢的寫入流。 

queryList.foreach(startWriteStream)
spark.streams.awaitAnyTermination()

如果我們知道輸入有效負載的所有可能字段,那么即使有一些字段不存在,我們的sql查詢也不會失敗。我們已經將有效負載的模式指定為case-class,它將為缺席字段創建指定 NULL 的數據幀。

通過這種方式,我們可以使用 spark-streaming 在所需的轉換/過濾器之后將多個有效負載從同一主題寫入不同的主題。

【51CTO譯稿,合作站點轉載請注明原文譯者和出處為51CTO.com】


責任編輯:黃顯東 來源: dzone.com
相關推薦

2017-08-14 10:30:13

SparkSpark Strea擴容

2017-06-06 08:31:10

Spark Strea計算模型監控

2016-12-19 14:35:32

Spark Strea原理剖析數據

2019-10-17 09:25:56

Spark StreaPVUV

2016-01-28 10:11:30

Spark StreaSpark大數據平臺

2017-10-13 10:36:33

SparkSpark-Strea關系

2018-04-09 12:25:11

2016-05-11 10:29:54

Spark Strea數據清理Spark

2023-10-24 20:32:40

大數據

2019-12-13 08:25:26

FlinkSpark Strea流數據

2021-07-09 10:27:12

SparkStreaming系統

2018-04-18 08:54:28

RDD內存Spark

2010-01-08 10:24:38

轉換JSON

2017-09-26 09:35:22

2010-05-04 13:59:09

負載均衡技術

2010-01-08 10:49:21

JSON 轉換工具

2017-10-11 11:10:02

Spark Strea大數據流式處理

2017-06-27 15:08:05

大數據Apache SparKafka Strea

2010-06-28 17:00:58

FTP傳輸模式

2009-06-15 15:10:02

Java數據轉換JSON
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 久久精品国产久精国产 | 国产精品自拍av | 亚洲精品乱码久久久久v最新版 | www.国产精品 | 国产精品海角社区在线观看 | 91麻豆精品国产91久久久更新资源速度超快 | 欧美全黄| 欧美综合国产精品久久丁香 | 国产一级视屏 | 国产精品成人一区二区 | 日韩在线播放第一页 | 亚洲视频观看 | 久久伊人操 | 中文字幕在线视频免费视频 | 国产一区二区三区视频免费观看 | 91欧美| 日本精品裸体写真集在线观看 | 夜夜爽99久久国产综合精品女不卡 | 日本精品视频在线观看 | 日韩成人精品一区二区三区 | 亚洲一区二区不卡在线观看 | 天天综合亚洲 | 欧美影院 | 在线观看中文字幕 | 久久免费精品 | 在线观看视频亚洲 | jvid精品资源在线观看 | 国产清纯白嫩初高生在线播放视频 | 玖玖视频国产 | 91久久久久久 | 国产区第一页 | 日本成人免费网站 | 国内久久精品 | 粉嫩av久久一区二区三区 | 国产精品国产亚洲精品看不卡15 | 日日草夜夜草 | 成人精品视频在线观看 | 中文字幕高清在线 | 久久久久亚洲精品中文字幕 | 亚洲网在线 | 日本成人免费网站 |