成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

如何將kafka中的數據快速導入Hadoop?

大數據 Kafka Hadoop
Kafka是一個分布式發布—訂閱系統,由于其強大的分布式和性能特性,迅速成為數據管道的關鍵部分。它可完成許多工作,例如消息傳遞、指標收集、流處理和日志聚合。Kafka的另一個有效用途是將數據導入Hadoop。

Kafka是一個分布式發布—訂閱系統,由于其強大的分布式和性能特性,迅速成為數據管道的關鍵部分。它可完成許多工作,例如消息傳遞、指標收集、流處理和日志聚合。Kafka的另一個有效用途是將數據導入Hadoop。使用Kafka的關鍵原因是它將數據生產者和消費者分離,允許擁有多個獨立的生產者(可能由不同的開發團隊編寫)。同樣,還有多個獨立的消費者(也可能由不同的團隊編寫)。此外,消費者可以是實時/同步或批量/離線/異步。當對比RabbitMQ等其他pub-sub工具時,后一個屬性有很大區別。

要使用Kafka,有一些需要理解的概念:

  • topic—topic是相關消息的訂閱源;
  • 分區—每個topic由一個或多個分區組成,這些分區是由日志文件支持的有序消息隊列;
  • 生產者和消費者—生產者和消費者將消息寫入分區并從分區讀取。

Brokers—Brokers是管理topic和分區并為生產者和消費者請求提供服務的Kafka流程。

Kafka不保證對topic的“完全”排序,只保證組成topic的各個分區是有序的。消費者應用程序可以根據需要強制執行對“全局”topic排序。 

如何將kafka中的數據快速導入Hadoop?

圖5.14 顯示了Kafka的概念模型

如何將kafka中的數據快速導入Hadoop?

圖5.15 顯示了如何在Kafka部署分發分區的示例

 

為了支持容錯,可以復制topic,這意味著每個分區可以在不同主機上具有可配置數量的副本。這提供了更高的容錯能力,這意味著單個服務器死亡對數據或生產者和消費者的可用性來說不是災難性的。

此處采用Kafka版本0.8和Camus的0.8.X。

實踐:使用Camus將Avro數據從Kafka復制到HDFS

該技巧在已經將數據流入Kafka用于其他目的并且希望將數據置于HDFS中的情況下非常有用。

問題

希望使用Kafka作為數據傳遞機制來將數據導入HDFS。

解決方案

使用LinkedIn開發的解決方案Camus將Kafka中的數據復制到HDFS。

討論

Camus是LinkedIn開發的一個開源項目。Kafka在LinkedIn大量部署,而Camus則用作將數據從Kafka復制到HDFS。

開箱即用,Camus支持Kafka中的兩種數據格式:JSON和Avro。在這種技術中,我們將通過Camus使用Avro數據。Camus對Avro的內置支持要求Kafka發布者以專有方式編寫Avro數據,因此對于這種技術,我們假設希望在Kafka中使用vanilla序列化數據。

讓這項技術發揮作用需要完成三個部分的工作:首先要將一些Avro數據寫入Kafka,然后編寫一個簡單的類來幫助Camus反序列化Avro數據,最后運行一個Camus作業來執行數據導入。

為了把Avro記錄寫入Kafka,在以下代碼中,需要通過配置必需的Kafka屬性來設置Kafka生成器,從文件加載一些Avro記錄,并將它們寫出到Kafka: 

如何將kafka中的數據快速導入Hadoop?

可以使用以下命令將樣本數據加載到名為test的Kafka的topic中:

如何將kafka中的數據快速導入Hadoop?

Kafka控制臺使用者可用于驗證數據是否已寫入Kafka,這會將二進制Avro數據轉儲到控制臺:

如何將kafka中的數據快速導入Hadoop?

完成后,編寫一些Camus代碼,以便可以在Camus中閱讀這些Avro記錄。

實踐:編寫Camus和模式注冊表

首先,需要了解三種Camus概念:

  • 解碼器—解碼器的工作是將從Kafka提取的原始數據轉換為Camus格式。
  • 編碼器—編碼器將解碼數據序列化為將存儲在HDFS中的格式。
  • Schema注冊表—提供有關正在編碼的Avro數據的schema信息。

正如前面提到的,Camus支持Avro數據,但確實需要Kafka生產者使用Camus KafkaAvroMessageEncoder類來編寫數據,該類為Avro序列化二進制數據添加了部分專有數據,可能是因為Camus中的解碼器可以驗證它是由該類編寫的。

在此示例中,使用 Avro serialization進行序列化,因此需要編寫自己的解碼器。幸運的是,這很簡單:

如何將kafka中的數據快速導入Hadoop? 

如何將kafka中的數據快速導入Hadoop?  

你可能已經注意到我們在Kafka中寫了一個特定的Avro記錄,但在Camus中我們將該記錄讀作通用的Avro記錄,而不是特定的Avro記錄,這是因為CamusWrapper類僅支持通用Avro記錄。否則,特定的Avro記錄可以更簡單地使用,因為可以使用生成的代碼并具有隨之而來的所有安全特征。

CamusWrapper對象是從Kafka提取的數據。此類存在的原因是允許將元數據粘貼到envelope中,例如時間戳,服務器名稱和服務詳細信息。強烈建議使用的任何數據都有一些與每條記錄相關的有意義的時間戳(通常這將是創建或生成記錄的時間)。然后,可以使用接受時間戳作為參數的CamusWrapper構造函數:

  1. public CamusWrapper(R record, long timestamp) { ... } 

如果未設置時間戳,則Camus將在創建包裝器時創建新的時間戳。在確定輸出記錄的HDFS位置時,在Camus中使用此時間戳和其他元數據。

接下來,需要編寫一個schema注冊表,以便Camus Avro編碼器知道正在寫入HDFS的Avro記錄的schema詳細信息。注冊架構時,還要指定從中拉出Avro記錄的Kafka的topic名稱:

如何將kafka中的數據快速導入Hadoop?

運行Camus

Camus在Hadoop集群上作為MapReduce作業運行,希望在該集群中導入Kafka數據。需要向Camus提供一堆屬性,可以使用命令行或者使用屬性文件來執行此操作,我們將使用此技術的屬性文件:  

如何將kafka中的數據快速導入Hadoop?

從屬性中可以看出,無需明確告訴Camus要導入哪些topic。Camus自動與Kafka通信以發現topic(和分區)以及當前的開始和結束偏移。

如果想要精確控制導入的topic,可以分別使用kafka.whitelist.topics和kafka.blacklist.topics列舉白名單(限制topic)和黑名單(排除topic),可以使用逗號作為分隔符指定多個topic,還支持正則表達式,如以下示例所示,其匹配topic的“topic1”或以“abc”開頭,后跟一個或多個數字的任何topic,可以使用與value完全相同的語法指定黑名單:

  1. kafka.whitelist.topics=topic1,abc[0-9]+ 

一旦屬性全部設置完畢,就可以運行Camus作業了:

如何將kafka中的數據快速導入Hadoop?

如何將kafka中的數據快速導入Hadoop?

這將導致Avro數據在HDFS中著陸。我們來看看HDFS中的內容:  

如何將kafka中的數據快速導入Hadoop?

第一個文件包含已導入的數據,其他供Camus管理。

可以使用AvroDump實用程序查看HDFS中的數據文件:

如何將kafka中的數據快速導入Hadoop?

那么,當Camus工作正在運行時究竟發生了什么? Camus導入過程作為MapReduce作業執行,如圖5.16所示。 

如何將kafka中的數據快速導入Hadoop?

隨著MapReduce中的Camus任務成功,Camus OutputCommitter(允許在任務完成時執行自定義工作的MapReduce構造)以原子方式將任務的數據文件移動到目標目錄。OutputCommitter還為任務正在處理的所有分區創建偏移文件,同一作業中的其他任務可能會失敗,但這不會影響成功任務的狀態——成功任務的數據和偏移輸出仍然存在,因此后續的Camus執行將從最后一個已知的成功狀態恢復處理。

接下來,讓我們看看Camus導入數據的位置以及如何控制行為。

數據分區

之前,我們看到了Camus導入位于Kafka的Avro數據,讓我們仔細看看HDFS路徑結構,如圖5.17所示,看看可以做些什么來確定位置。 

如何將kafka中的數據快速導入Hadoop?

圖5.17 在HDFS中解析導出數據的Camus輸出路徑

 

路徑的日期/時間由從CamusWrapper中提取的時間戳確定,可以從MessageDecoder中的Kafka記錄中提取時間戳,并將它們提供給CamusWrapper,這將允許按照有意義的日期對數據進行分區,而不是默認值,這只是在MapReduce中讀取Kafka記錄的時間。

Camus支持可插拔分區程序,允許控制圖5.18所示路徑的一部分。 

如何將kafka中的數據快速導入Hadoop?

圖5.18 Camus分區路徑

 

Camus Partitioner接口提供了兩種必須實現的方法:

如何將kafka中的數據快速導入Hadoop?

例如,自定義分區程序可創建用于Hive分區的路徑。

總結

Camus提供了一個完整的解決方案,可以在HDFS中從Kafka獲取數據,并在出現問題時負責維護狀態和進行錯誤處理。通過將其與Azkaban或Oozie集成,可以輕松實現自動化,并根據消息時間組織HDFS數據執行簡單的數據管理。值得一提的是,當涉及到ETL時,與Flume相比,它的功能是無懈可擊的。

Kafka捆綁了一種將數據導入HDFS的機制。它有一個KafkaETLInputFormat輸入格式類,可用于在MapReduce作業中從Kafka提取數據。要求編寫MapReduce作業以執行導入,但優點是可以直接在MapReduce流中使用數據,而不是將HDFS用作數據的中間存儲。接下來,我們將討論如何將駐留在Hadoop中的數據傳輸到其他系統,例如文件系統和其他地方。

責任編輯:未麗燕 來源: it168網站
相關推薦

2018-10-15 13:57:38

Hadoop數據庫MySQL

2017-11-21 08:36:00

MongoDB關系型數據庫數據導入

2018-10-09 14:16:21

Hadoop數據移入數據傳輸

2011-08-11 10:16:15

2023-02-17 12:07:45

ChatGPTPython

2019-10-17 09:45:02

照片SD卡Windows 10

2012-06-14 09:37:45

Google地圖

2017-06-28 08:14:57

數據庫區塊鏈比特幣

2021-09-14 14:50:05

SASTDevSecOps應用安全

2018-07-30 15:05:26

Hadoop大數據集群

2022-10-10 11:00:29

數據分析云戰略

2022-11-25 16:27:07

應用開發鴻蒙

2015-10-16 10:48:03

Gate One嵌入Web

2019-05-13 08:20:33

Hadoop集群數據庫

2023-05-30 08:00:00

PandasQuestDBPython

2020-12-14 22:42:32

Linux終端

2010-08-11 15:35:47

Flex DataGr

2010-08-11 15:51:45

Flex DataGr

2009-08-14 11:35:44

C#數據查詢

2011-07-12 13:01:11

ExcelOracleSql Server
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产ts一区 | 欧美日韩精品久久久免费观看 | 国产一区二区在线免费播放 | 一区在线播放 | 不卡一区二区在线观看 | 911精品美国片911久久久 | 亚洲欧美日韩精品久久亚洲区 | 日韩高清国产一区在线 | 国产精品欧美一区二区三区 | 老头搡老女人毛片视频在线看 | 亚洲女人的天堂 | 91精品国产91久久久久久吃药 | 国产一区二区三区在线视频 | 免费高清av | 自拍偷拍亚洲欧美 | 中文在线一区 | 国产一区二区三区久久久久久久久 | 亚洲欧美中文日韩在线v日本 | 日韩欧美国产精品一区二区 | sese视频在线观看 | 午夜视频在线免费观看 | 国产日韩中文字幕 | 99视频免费在线观看 | 欧美日韩精品综合 | 欧美a区 | 精品真实国产乱文在线 | 福利一区在线观看 | 亚洲精品专区 | 日屁网站 | 狠狠操天天操 | 成人深夜小视频 | 国产一二三区在线 | 国产三级网站 | 国产线视频精品免费观看视频 | 农夫在线精品视频免费观看 | www.中文字幕.com | 精品久久99| 国产电影一区二区在线观看 | 久久国产福利 | 亚洲va国产日韩欧美精品色婷婷 | 91亚洲一区 |