如何將數據移入和移出Hadoop?
企業在項目中完全使用Hadoop之前,數據移動是必須解決的事情之一。如何將數千臺主機日志數據放入Hadoop?從關系型或者No/NewSQL系統以及Hadoop中獲取數據的最有效方法是什么?如何將Hadoop中生成的Lucene索引輸出到服務器?這些流程如何實現自動化?
本文是《Hadoop從入門到精通》大型專題的第5章,將完全解答上述問題,讓企業走上無憂數據移動之路。本章,我們將首先介紹如何將不同位置和不同格式的數據移動到Hadoop,之后將講解如何將數據移出Hadoop。
5.1 數據移動的關鍵要素
將大量數據移入和移出Hadoop面臨很多挑戰,包括數據一致性和資源對數據源和目標的影響。然而,在深入研究這些技術之前,我們需要討論在處理數據移動時應該注意的因素。
冪等
在編程中,一個冪等操作的特點是其任意多次執行所產生的影響均與一次執行的影響相同。換句話說,冪等操作無論執行多少次都會產生相同的結果。在關系數據庫中,插入通常不是冪等的,因為多次執行不會產生相同的結果數據庫狀態。或者,更新通常是冪等的,因為它們將產生相同的最終結果。
無論何時寫入數據,都應該考慮冪等性,Hadoop中的數據入口和出口沒有很大區別。分布式日志收集框架如何處理數據重傳?如何在多個任務并行插入數據庫的MapReduce作業中確保冪等行為?
聚合
數據聚合過程組合了多個數據元素。在數據入口的上下文中,這將大量小文件移動到HDFS。在實際操作中,我們可能會面臨NameNode內存以及MapReduce執行時間慢等問題,將文件或數據聚合在一起可以緩解此類問題,這是一個需要考慮的功能。
數據格式轉換
數據格式轉換將一種數據轉換為另一種格式的操作。通常,源數據格式不適合在MapReduce等工具中進行處理。例如,如果源數據采用多行XML或JSON格式,則可能需要考慮預處理步驟。這會將數據轉換為可以拆分的形式,例如每行一個JSON或XML元素,或者將其轉換為Avro等格式。
數據壓縮
數據壓縮不僅有助于減少靜態數據的占用空間,而且在讀取和寫入數據時也具有I/O優勢。
可用性和可恢復性
可恢復性允許入口或出口工具在操作失敗時重試。由于任何數據源,接收器或Hadoop本身都不可能100%可用,因此在發生故障時可重試非常重要。
可靠的數據傳輸和數據驗證
在數據傳輸中,檢查正確性的方法是驗證數據在傳輸過程中是否發生損壞。當使用異構系統(如Hadoop數據入口和出口)時,數據通過不同主機,網絡和協議傳輸只會增加數據傳輸過程中出現問題的可能性。檢查原始數據(如存儲設備)正確性的常用方法是循環冗余校驗(CRC),這是HDFS內部用于維護塊級完整性的常用方法。
此外,由于生成數據的軟件存在錯誤,源數據本身可能存在問題。在入口時執行數據驗證允許進行一次性檢查,而不是在發生問題時處理數據的所有下游消費者,強迫這些消費者必須更新以處理數據中的錯誤。
資源消耗和性能
資源消耗和性能分別是系統資源利用率和系統效率的度量。入口和出口工具通常不會對系統施加大量負載(資源消耗),除非有非??捎^的數據量。對于性能,要考慮的問題包括工具是否并行執行操作,如果是,提供了什么機制來調整并行度。如果數據源是生產數據庫并且正在使用MapReduce提取該數據,請不要使用大量并發map任務來導入數據。
監控
監控確保功能在自動化系統中按預期執行。對于數據入口和出口,監控分為兩部分:確保入口和出口中涉及的進程存活,并驗證源和目標數據是否按預期生成。監控還應包括驗證正在移動的數據量是否達到預期水平; 數據中意外的下降或高電流將提醒潛在的系統問題或軟件錯誤。
推測執行
MapReduce具有一個稱為推測(Speculative)執行的功能,可以在作業結束時為仍在執行的任務啟動備份,這有助于防止緩慢的硬件影響作業執行時間。但是,這種做法也可能有問題,如果使用map任務執行插入關系數據庫,你應該知道可以有兩個并行進程插入相同的數據。
補充:推測執行(Speculative Execution)是指在集群環境下運行MapReduce,可能是程序Bug,負載不均或者其他的一些問題,導致在一個JOB下的多個TASK速度不一致,比如有的任務已經完成,有的卻只跑了10%,根據木桶原理,這些任務將成為整個JOB的短板,如果集群啟動了推測執行,這時為了***限度的提高短板,Hadoop會為該task啟動備份任務,讓speculative task與原始task同時處理一份數據,哪個先運行完,則將哪個結果作為最終結果,并且在運行完成后Kill掉另外一個任務。
5.2 將數據移入Hadoop
在Hadoop中處理數據的***步是將其提供給Hadoop。有兩種主要方法可用于將數據移入Hadoop:在HDFS層(數據推送)寫入外部數據,或在MapReduce層讀取外部數據(更像是拉取)。在MapReduce中讀取數據具有以下優點:操作可以輕松并行并具有容錯能力。然而,并非所有數據都可以使用MapReduce訪問,例如在日志文件下,其他系統需要依賴傳輸,包括用于最終數據hop的HDFS。
本節,我們將介紹將源數據移動到Hadoop的方法,將使用上一節中的設計注意事項作為檢查和理解不同工具的標準。
5.2.1 HDFS命令行
Hadoop捆綁了許多方法來將數據導入HDFS。本節將介紹這些內置工具如何滿足數據移動中的各種需求,可以使用的***個也是最簡單的工具是HDFS命令行。
為作業選擇正確的數據獲取工具
本節中的低級工具適用于一次性文件移動,或者處理基于文件的舊數據源和目標。但是,以這種方式移動數據很輕易就會被Flume和Kafka(本章稍后介紹)等工具所淘汰,這些工具提供了自動數據移動管道。
注:Kafka是一個更好的平臺,用于從A到B(B可以是Hadoop集群)移動數據,而不是老式的“復制文件”。使用Kafka,只需將數據泵入其中,就擁有了實時(例如通過Storm)或離線/批量作業(例如通過Camus)消費數據。這種方法將在之后的章節中介紹。
使用CLI加載文件
如果需要手動執行,那么HDFS命令行界面(CLI)就是最合適的工具。它允許執行在常規Linux文件系統上可執行的大多數操作。本節,我們將重點介紹如何將數據從本地文件系統復制到HDFS中。
問題
使用shell將文件復制到HDFS。
解決方案
HDFS命令行界面可用于一次性移動,或者可以將其合并到腳本中以進行一系列移動。
討論
使用hadoop命令將文件從本地磁盤復制到HDFS:
- $ hadoop fs -put local-file.txt hdfs-file.txt
Hadoop -put命令的行為與Linux中的Linux cp命令不同,如果目標已存在,則會被覆蓋; 在Hadoop中,副本失敗并顯示錯誤:
- put: `hdfs-file.txt': File exists
必須添加-f選項以強制覆蓋文件:
- $ hadoop fs -put -f local-file.txt hdfs-file.txt
與Linux cp命令非常相似,可以使用相同的命令復制多個文件。在這種情況下,***一個參數必須是HDFS中復制本地文件的目錄:
- $ hadoop fs -put local-file1.txt local-file2.txt /hdfs/dest/
可以使用Linux管道將命令輸出傳遞到HDFS文件——使用相同的-put命令并在其后添加單獨的連字符,這告訴Hadoop從標準輸入讀?。?/p>
- $ echo "the cat sat on the mat" | hadoop fs -put - hdfs-file.txt
要測試文件或目錄是否存在,請使用-test命令和-e或-d選項分別測試文件或目錄是否存在。如果文件或目錄存在,則命令的代碼為0;如果不存在,則為1:
- $ hadoop fs -test -e hdfs-file.txt
- $ echo $?
- 1
- $ hadoop fs -touchz hdfs-file.txt
- $ hadoop fs -test -e hdfs-file.txt
- $ echo $?
- $ hadoop fs -test -d hdfs-file.txt
- $ echo $?
- 1
如果只想在HDFS中“touch”文件(創建一個新的空文件),那么touchz選項可以完成該工作:


CLI專為交互式HDFS活動而設計,它也可以合并到腳本中,以用于自動執行某些任務。CLI的缺點是級別較低,并且沒有內置任何自動化機制。它需要為每個命令分配一個fork,如果在bash腳本中使用可能沒問題,但如果試圖將HDFS功能集成到Python或Java應用程序中,可能就會出現問題。在這種情況下,為每個命令啟動外部進程的開銷可能也是想要避免的。
使用REST加載文件
CLI便于快速運行命令和編寫腳本。但是,它會產生為每個命令分配一個單獨進程的開銷,這可能是想要避免的,特別是編程語言與HDFS連接時。
問題
沒有HDFS本機接口的編程語言如何與HDFS交互。
解決方案
使用Hadoop的WebHDFS接口,該接口為HDFS操作提供全功能的REST API。
討論
在開始之前,需要確保在集群上啟用WebHDFS(默認不啟動),這由dfs.webhdfs.enabled屬性控制。如果未啟用,則需要更新hdfs-site.xml并添加以下內容:

在這種技術中,我們將介紹在不安全的Hadoop集群上運行WebHDFS.3的情況。如果正在使用安全的Hadoop集群,則不會提供user.name參數。
相反,我們將在與WebHDFS交互之前使用kinit對Kerberos進行身份驗證,然后在curl命令行中提供-negotiate -u:youruser。
警告:如果為已經關閉了安全性的集群啟用WebHDFS,則可以輕松地將其用作集群中任意用戶命令(只需將URL中的用戶名更改為簇)。建議僅在打開安全性的情況下運行WebHDFS。
要想在此技術中使用HTTP與NameNode進行通信,需要知道運行NameNode RPC服務的主機和端口,這是使用dfs.namenode.http-address屬性配置的。在偽分布式設置中,這很可能設置為0.0.0.0:50070。我們假設其余技術的偽分布式——替換適當的主機和端口進行設置。
首先使用CLI在HDFS中創建文件:
- $ echo "the cat sat on the mat" | hadoop fs -put - /tmp/hdfs-file.txt
使用WebHDFS獲取有關該文件的各種有趣的元數據(用戶名替換為以下URL中的aholmes):

命令語法由兩部分組成:一是路徑;二是正在執行的操作。但是,需要提供執行操作的用戶名,否則HDFS將假定你是一個訪問受限的匿名用戶。

圖5.1 解析WebHDFS URL路徑
從HDFS讀取文件只需將OPEN指定為operation:

使用WebHDFS編寫文件分為兩步:***步通知NameNode創建新文件的意圖,可以使用HTTP PUT命令執行此操作:

此時,文件尚未寫入。只是讓NameNode有機會確定要寫入哪個DataNode,這是在“Location”標頭中指定的。需要獲取該URL,然后發出第二個HTTP PUT執行實際寫入:

可以通過讀取文件來驗證寫入是否成功:

WebHDFS支持可以使用常規命令行執行所有HDFS操作,它更有用,因為它可以訪問結構化JSON表單中的元數據,從而更容易解析數據。
值得一提的是WebHDFS提供的一些附加功能。首先,文件的***個塊存放數據位置。NameNode將客戶端重定向到承載***個塊的DataNode,提供強大的數據位置。對于文件中的后續塊,DataNode充當代理,并將數據流入保存塊數據的節點或從中保存數據。
WebHDFS還與Hadoop的安全身份驗證集成,這意味著可以啟用Kerberos并在HTTP請求中使用委派令牌。此外,API將保持跨Hadoop版本的兼容性,這意味著目前發布的命令將適用于未來版本的Hadoop(反之亦然)。 這是一個有用的工具,用于訪問運行不同Hadoop版本的多個集群。

表5.1 WebHDFS庫
當客戶端可以訪問所有NameNode和DataNode時,WebHDFS非常有用。在鎖定環境中,情況可能并非如此,可能需要查看HttpFS。
從防火墻后面訪問HDFS
生產Hadoop環境通常被鎖定以保護這些集群中的數據。部分安全程序可能包括將集群置于防火墻之后,如果嘗試從防火墻外部讀取或寫入HDFS,這將是一件麻煩事。 這種技術著眼于HttpFS網關,它可以使用HTTP(通常在防火墻上打開)提供HDFS訪問。
問題
想要寫入HDFS,但有一個防火墻限制對NameNode或DataNode的訪問。
解決方案
使用HttpFS網關,它是一個獨立的服務器,可通過HTTP提供對HDFS的訪問。因為它是一個單獨的服務而且是HTTP,所以可以配置為在任何可以訪問Hadoop節點的主機上運行,并且可以打開防火墻規則以允許流量到服務。
討論
HttpFS非常有用,因為它不僅允許使用REST訪問HDFS,而且具有完整的Hadoop文件系統實現,這意味著可以使用CLI和本機HDFS Java客戶端與HDFS進行通信,如圖5.2所示。

圖5.2 HttpFS網關架構
要啟動并運行HttpFS,必須指定代理用戶。這是將運行HttpFS進程的用戶,此用戶也將在Hadoop中配置為代理用戶。假設有一個名為foxyproxy的用戶,你將其指定為代理用戶。你用以下代碼更新core-site.xml:

基本上,這表明Hadoop應該只接受來自主機localhost的代理請求,并且foxyproxy可以冒充任何用戶(你可以通過提供以逗號分隔的組列表來鎖定可以模擬的用戶集名)。更改用戶名,主機和組值,以便它們在環境中有意義。
在對core-site.xml進行更改后,我們需要啟動HttpFS進程:
- $ sbin/httpfs.sh start
現在,可以使用WebHDFS發出與之前技術中相同的curl命令。這是關于HttpFS網關的好處之一 :語法完全相同。要在根目錄上執行目錄列表,需要執行以下操作:

此curl命令與先前技術中使用的curl命令的唯一區別是端口號。默認情況下,HttpFS在端口14000上運行,但可以通過編輯httpfs-env.sh來更改。表5.2中顯示了可以在文件中更改的一些有趣屬性。

表5.2 HttpFS屬性
可以在httpfs-site.xml中配置其他Kerberos以及用戶和組級別設置。
WebHDFS和HttpFS之間的差異
WebHDFS和HttpFS之間的主要區別在于客戶端對所有數據節點的可訪問性。如果客戶端可以訪問所有數據節點,那么WebHDFS將正常工作,因為讀取和寫入文件涉及客戶端直接與數據節點通信以進行數據傳輸。另一方面,如果位于防火墻之后,客戶端可能無法訪問所有數據節點,在這種情況下,HttpFS選項最適合。使用HttpFS,服務器將與數據節點通信,客戶端只需要與單個HttpFS服務器通信。
如果可以,請選擇WebHDFS,因為客戶端直接與數據節點通信具有固有的優勢:這允許輕松擴展多個主機并發客戶端數量,而不會遇到通過HttpFS流式傳輸數據的網絡瓶頸。如果客戶端本身在數據節點上運行,則更是如此,因為將通過直接從本地文件系統而不是網絡流式傳輸本地托管的HDFS數據塊來使用WebHDFS的優勢。
使用NFS掛載Hadoop
通常,如果Hadoop數據可以作為文件系統的常規安裝來訪問,那么使用Hadoop數據要容易得多。這允許使用現有腳本,工具和編程語言,并與HDFS中的數據進行交互。本節介紹如何使用NFS掛載輕松地將數據復制到HDFS中和從HDFS復制數據。
問題
將HDFS視為常規Linux文件系統,并使用標準Linux工具與HDFS進行交互。
解決方案
使用Hadoop的NFS實現來訪問HDFS中的數據。
討論
在Hadoop 2.1之前,NFS安裝HDFS的唯一方法是使用FUSE。由于各種性能和可靠性問題,不建議將其用于一般用途。它還引入了額外的負擔,要求在任何客戶端計算機上安裝驅動程序(換句話說,它沒有提供NFS網關)。
Hadoop中的新NFS實現解決了舊的基于FUSE系統的所有缺點。這是一個合適的NFSv3實現,允許運行一個或多個NFS網關以提高可用性和吞吐量。
要啟動并運行NFS服務,首先需要停止在主機上運行的NFS服務。在Linux系統上,可以使用以下命令實現:


圖5.3 Hadoop NFS
接下來,需要啟動Hadoop NFS服務。啟動的***個服務是portmap,它為協議及其關聯的傳輸和端口提供注冊服務。在受限的端口上運行,因此需要以root用戶身份啟動:
- $ sudo hadoop-daemon.sh start portmap
接下來,你需要啟動實際的NFS服務,運行此服務的用戶一定要與運行HDFS的用戶相同,這一點非常重要:
- $ hadoop-daemon.sh start nfs3
通過運行rpcinfo和showmount來驗證服務是否正在運行,應該看到類似于以下的輸出:

現在,需要在主機目錄上安裝HDFS。以下示例選擇/hdfs作為安裝目錄。第二個mount命令驗證是否已創建安裝:

現在,可以使用掛載的文件系統直接操作HDFS。
使用NFS網關時需要考慮以下幾點:
- HDFS是僅附加文件系統??梢愿郊拥轿募?,但不能執行隨機寫入。如果需要使用支持隨機寫入的文件系統來使用Hadoop,那么應該看看MapR的Hadoop distribution。
- Hadoop 2.2版不支持Hadoop安全驗證(Kerberos),并且有一個添加該支持的開放票證。
- 在Hadoop 2.4(或3.0)之前,不支持代理用戶。這實質上意味著以前版本的Hadoop將以超級用戶身份執行所有命令,因為要求NFS網關作為與HDFS本身相同的用戶運行。
由于這些限制,建議將NFS網關保留用于實驗用途,或者用于不考慮用戶級安全性的單租戶集群。
使用DistCp在集群內和集群間復制數據
如果移入或移出Hadoop的數據量很大,通過單個主機匯集數據,一定要盡可能優化數據移動。DistCp可以在Hadoop集群之間以及進出NFS安裝的數據之間高效復制數據。
問題
在Hadoop集群之間高效復制大量數據,并且進行增量復制。
解決方案
使用DistCp,一種內置于Hadoop中的并行文件復制工具。
討論
本節,我們將首先介紹DistCp的重要配置。之后,我們將繼續查看使用DistCp的特定方案,以及配置和運行DistCp的***方法。
此技術涵蓋了Hadoop 2中可用的DistCp新版本,名為DistCp 2。此代碼被反向移植到Hadoop 1.2.0中,可通過使用distcp2作為命令啟用Hadoop 2來替換現有的DistCp,然后就可以正常使用distcp命令。
DistCp 2支持與DistCp的舊版本相同的命令行參數集,并帶來了許多有用的優勢:
- 使用大量文件時減少了設置和執行時間,因為驅動程序不再需要預處理所有輸入(現在這已推遲到mapper)。
- 具有功能齊全的Java接口,無需Java客戶端將參數序列化為字符串。
- 原子提交允許全部復制語義。
- 使用-update跳過目標中已存在的文件,如果文件屬性與源文件不同,將導致文件屬性發生更改。
- 作為副本的一部分,不再跳過空目錄。
DistCp使用僅map的MapReduce作業來執行復制。以下是一個非常簡單的示例,在單個Hadoop集群中用于將源目錄,/ hello,復制到目標目錄,/world:
- $ hadoop distcp /hello /world
如果/ world目錄尚不存在,則此命令將創建/ world目錄,然后將/ hello(其所有文件和目錄遞歸)的內容復制到/ world。
處理已存在的目標文件
目標中已存在的文件和目錄保持不變(即使文件不同)。
可以通過查看作業完成時轉儲到標準輸出的SKIP計數器來查看跳過的文件數:

-update和-overwrite參數巧妙地改變了復制內容的行為。如果沒有這些選項,如果源是目錄,則在目標目錄下創建該目錄。使用-update或-overwrite參數,僅復制文件和子目錄,而不復制源目錄。通過一個例子證明這一點:

忽略錯誤
當使用DistCp復制大量文件時,使用-i標志執行命令以忽略錯誤是明智的。這樣,單個錯誤不會導致整個復制過程失敗,可以通過使用-update選項重新發出相同的DistCp命令來再次嘗試復制失敗文件。
動態復制策略
DistCp的默認行為是通過均勻地傳播所有文件以使所有mapper復制大致相同的字節數來為每個mapper預分配工作。從理論上講,這聽起來像是一種公平分配工作的好方法,但實際上,諸如硬件,硬件錯誤和配置不良等因素往往導致長尾工作執行,少數落后的mapper占用時間比其他要長。
使用DistCp 2,可以使用替代策略,其中mapper直接接收工作而不是預先分配,這被稱為動態復制策略,使用-strategy動態參數激活,添加此參數的效果是改進復制時間。
原子提交
DistCp 2的另一個有用功能是原子提交。DistCp默認將每個文件寫入臨時文件,然后移動到最終目標。這意味著無法撤消在作業中遇到錯誤之前復制的文件。
因此,原子提交允許在復制所有文件時將實際的“提交”推遲到作業結束,這樣如果遇到錯誤,你將看不到任何部分寫入,可以使用-atomic參數啟用此功能。
并行性和mapper數量
目前,DistCp最細的工作單元是文件級別。因此,無論文件多大,都只使用一個mapper來復制,提高作業的mapper數量對提高復制速度沒有任何影響。
默認情況下,DistCp使用20個mapper運行,每個mapper副本對應的文件由選擇的復制策略確定。Hadoop開發人員考慮了mapper數量的默認設置,選擇正確的值是想要使用多少網絡帶寬以及希望在復制期間占用多少任務的函數,可以通過指定-m后跟的值來更改mapper的數量。
帶寬
***一個考慮因素是復制期間使用的網絡帶寬。大型副本可能會使集群之間的網絡飽和。企業中網絡運營人員保持運行良好的一種方法是使用-bandwidth參數來指定每個map任務在復制期間消耗的帶寬量上限。此參數的值以兆字節/秒(MBps)為單位。
其他
到目前為止,我們已經看到了DistCp中一些更有趣的選項。要查看完整的選項列表,可以運行distcp命令,或者查看Hadoop文檔。