從HDFS和MapReduce兩方面了解Hadoop
簡(jiǎn)介
Hadoop 是一個(gè)能夠?qū)Υ罅繑?shù)據(jù)進(jìn)行分布式處理的軟件框架,框架最核心的設(shè)計(jì)就是:HDFS 和 MapReduce。HDFS 為海量的數(shù)據(jù)提供了存儲(chǔ),而 MapReduce 則為海量的數(shù)據(jù)提供了計(jì)算。這篇文章就主要從 HDFS 和 MapReuce 兩個(gè)大的方面展開對(duì) Hadoop 講解,當(dāng)然為了直觀的測(cè)試 HDFS 提供的豐富的 API 以及我們編寫的 MapReduce 程序,在閱讀下面的內(nèi)容之前,你需要準(zhǔn)備一臺(tái)安裝了 Hadoop 的機(jī)器(也可以是虛擬機(jī)),如果你還沒(méi)有安裝的話,可以參考《在 Ubuntu 上安裝 Hadoop》。
HDFS
HDFS 概念
在說(shuō) HDFS 之前我們先來(lái)解釋一下什么是 DFS,DFS 的全稱是 Distributed File System,翻譯過(guò)來(lái)就是分布式文件系統(tǒng),而 HDFS 就是 Hadoop 自帶的分布式文件系統(tǒng)。
相關(guān)名詞
為了后面大家更容易理解文章,這里使用一定的篇幅來(lái)簡(jiǎn)單的介紹一下與 HDFS 相關(guān)的一些組件或者名詞的概念。
- NameNode,管理節(jié)點(diǎn),管理系統(tǒng)的命名空間,維護(hù)著整個(gè)文件系統(tǒng)的結(jié)構(gòu)和目錄信息,通常情況下一個(gè) Hadoop 集群只會(huì)有一個(gè)工作的 NameNode。
- DataNode,工作節(jié)點(diǎn),文件系統(tǒng)的工作節(jié)點(diǎn),主要是根據(jù)需要進(jìn)行存儲(chǔ)或者檢索數(shù)據(jù)塊,并且定期向 NameNode 報(bào)告它們所存儲(chǔ)的數(shù)據(jù)塊列表。
- 數(shù)據(jù)塊,同我們常使用的磁盤上的文件系統(tǒng)一樣,HDFS 也有數(shù)據(jù)塊的概念,默認(rèn)的大小為 128M。
- 塊緩存,一般情況下,我們通過(guò) HDFS 從 DataNode 中檢索數(shù)據(jù)時(shí),DataNode 都是從磁盤中讀取,但是對(duì)于訪問(wèn)很頻繁的文件,它所對(duì)于的數(shù)據(jù)塊可能會(huì)被緩存到 DataNode 的內(nèi)存中,以加快讀取速度,這就是所謂的塊緩存。
- 聯(lián)邦 HDFS,其實(shí)這個(gè)就是為了解決 Hadoop 不適合存儲(chǔ)數(shù)量龐大的文件的問(wèn)題,同時(shí)由多個(gè) NameNode 來(lái)維護(hù)整個(gè)文件系統(tǒng)的系統(tǒng)樹以及文件和目錄,每個(gè) NameNode 負(fù)責(zé)管理文件系統(tǒng)命名空間中的一部分。
特性
下面我們就一起來(lái)看下 HDFS 有哪些特性:
- 存儲(chǔ)超大文件,由于 HDFS 是分布式的文件系統(tǒng),所以不受單臺(tái)機(jī)器的存儲(chǔ)大小的限制,可以存儲(chǔ)超大文件,目前已經(jīng)達(dá)到了 PB 級(jí)了。
- 流式訪問(wèn)數(shù)據(jù)。
- HDFS 對(duì)硬件的要求并不是很高,可以運(yùn)行在廉價(jià)的商用硬件上。
- 不適合低延遲的數(shù)據(jù)訪問(wèn),由于 Hadoop 的流式數(shù)據(jù)訪問(wèn),訪問(wèn)數(shù)據(jù)會(huì)有寫延遲,所以不太適合低時(shí)間延遲的數(shù)據(jù)訪問(wèn),一般情況下這種需求我們會(huì)使用關(guān)系型數(shù)據(jù)庫(kù)來(lái)實(shí)現(xiàn)。
- 不適合大量的小文件存儲(chǔ),原因是 NameNode 將文件系統(tǒng)的元數(shù)據(jù)存儲(chǔ)在內(nèi)存中,每存儲(chǔ)一個(gè)文件都需要在 NameNode 中存儲(chǔ)該文件的目錄、存儲(chǔ)在哪個(gè) DataNode 中等等的數(shù)據(jù)。所以如果文件的數(shù)量達(dá)到數(shù)十億的話 NameNode 的內(nèi)存很可能不夠用了。
- 不支持多用戶寫入,以及任意的修改文件,只可以在文件末尾添加內(nèi)容。
HDFS 的命令行操作
命令行接口是 HDFS 所有類型的接口中最簡(jiǎn)單的,也是每個(gè)開發(fā)者都必須要掌握的。下面我們就列舉幾個(gè)簡(jiǎn)單的命令行操作,但是在操作前你必須按照***章的內(nèi)容安裝好了 Hadoop,并且啟動(dòng)了 HDFS。
創(chuàng)建目錄。
- 清單 1. 創(chuàng)建目錄命令
- hadoop fs -mkdir /test
查看目錄。
- 清單 2. 創(chuàng)建目錄命令
- hadoop fs -ls /
上傳文件,緊跟-put 后面的 test.txt 是要推送到 HDFS 中的文件,/test 是指定要推送到 HDFS 上哪個(gè)目錄下面。
- 清單 3. 上傳文件
- hadoop fs -put test.txt /test
刪除文件。
- 清單 4. 上傳文件
- hadoop fs -rm /test/test.txt
其實(shí)通過(guò)上面例舉的幾個(gè)命令我們可以看出 HDFS 的文件操作命令幾乎和 Linux 上的命令一致,這樣我們使用起來(lái)會(huì)很容易上手。
HDFS 的 JavaAPI
在 Java 項(xiàng)目中使用 HDFS 提供的 API 我們需要依賴 hadoop-common 和 hadoop-hdfs 兩個(gè)包,為了方便測(cè)試,我們這里還引入了 junit,篇幅原因這里就不對(duì)項(xiàng)目本身做太多的講解,這里附上項(xiàng)目源碼地址供大家參考。
讀取 HDFS 中文件的內(nèi)容。
- 清單 5. JavaApi 讀取文件內(nèi)容
- @Test
- public void read() throws IOException {
- // 文件地址。
- URI uri = URI.create("/test/test.txt");
- // 用于接收讀取的數(shù)據(jù)流。
- FSDataInputStream in = null;
- try {
- in = fs.open(new Path(uri));
- // ***的一個(gè) boolean 類型的參數(shù)是指是否在調(diào)用結(jié)束后關(guān)閉流,我們這里選擇在 finally 里面手動(dòng)關(guān)閉。
- IOUtils.copyBytes(in, System.out, 4096, false);
- } finally {
- IOUtils.closeStream(in);
- }
- }
- }
不出意外的話,你可以在控制臺(tái)看到你指定文件的內(nèi)容。在這一步我遇到一個(gè)問(wèn)題,就是無(wú)法直接在 windows 下操作 HDFS,具體的解決方法可以參照這篇文章。FSDataInputStream.seek()方法還可以實(shí)現(xiàn)從文件輸入流的任意一個(gè)絕對(duì)位置讀取文件內(nèi)容,比如我們可以在上面代碼中添加如下的內(nèi)容來(lái)實(shí)現(xiàn)在控制臺(tái)重復(fù)打印文件內(nèi)容。
- 清單 6. JavaApi 任意位置讀取文件內(nèi)容
- in.seek(0);
- tils.copyBytes(in, System.out, 4096, false);
創(chuàng)建目錄。
- 清單 7. JavaApi 創(chuàng)建目錄
- @Test
- public void mkdir() throws IOException {
- fs.mkdirs(new Path("/test/api"));
- }
查詢文件目錄。
- 清單 8. JavaApi 查詢文件目錄
- @Test
- public void ls() throws IOException {
- FileStatus[] fileStatuses = fs.listStatus(new Path("/"));
- if (null == fileStatuses || fileStatuses.length == 0) {
- return;
- }
- for (FileStatus fileStatus : fileStatuses) {
- System.out.println(fileStatus.getPath() + " " + fileStatus.getPermission());
- }
- }
這里引入一個(gè)類 FileStatus,這個(gè)類封裝了 HDFS 中文件和目錄的元數(shù)據(jù),包括文件長(zhǎng)度、塊大小、復(fù)本、修改時(shí)間、所有者以及權(quán)限信息。FileSystem 里面提供的 listStatus 方法可以獲取一個(gè)目錄下的所有目錄或者文件的 FileStatus,但是它不會(huì)遞歸獲取下級(jí)目錄的內(nèi)容,這里可以開發(fā)你的想象自己實(shí)現(xiàn)一下(Tips:fileStatus.isDirectory()可以判斷這個(gè) fileStatus 是否是一個(gè)文件夾)。
刪除文件或目錄。
- 清單 9. JavaApi 刪除文件或目錄
- @Test
- public void delete() throws IOException {
- fs.delete(new Path("/test/api"), false);
- }
- @Test
- public void deleteNonEmptyDir() throws IOException {
- fs.delete(new Path("/test"), true);
- }
我們可以看到 fs.delete()這個(gè)方法有兩個(gè)參數(shù),***個(gè)參數(shù)很好理解,就是我們要?jiǎng)h除的目錄或者文件的地址。那么第二個(gè) Boolean 類型的參數(shù)呢,如果刪除的是文件或者空目錄這個(gè)參數(shù)實(shí)際上是會(huì)被忽略的,如果刪除的是非空目錄,只有在這個(gè)參數(shù)值為 true 的時(shí)候才會(huì)成功刪除。
創(chuàng)建文件和文件寫入。
我們通過(guò) FileSystem.create()方法來(lái)創(chuàng)建一個(gè)文件,這個(gè)方法會(huì)順帶著創(chuàng)建不存在的父級(jí)目錄,如果不需要這個(gè)的話,***是在創(chuàng)建之前調(diào)用 exists()方法來(lái)判斷一下,如果父級(jí)目錄不存在直接報(bào)錯(cuò)即可。
- 清單 10. JavaApi 創(chuàng)建文件和文件寫入
- @Test
- public void create() throws IOException {
- FSDataOutputStream out = null;
- try {
- out = fs.create(new Path("/test/api/test.txt"));
- out.writeChars("hello hdfs.");
- } finally {
- IOUtils.closeStream(out);
- }
- }
文件創(chuàng)建好后,可以通過(guò) append()方法在文件末尾添加內(nèi)容。
- 清單 11. JavaApi 追加文件內(nèi)容
- @Test
- public void append() throws IOException {
- FSDataOutputStream out = null;
- try {
- out = fs.append(new Path("/test/api/test.txt"));
- out.writeChars("hello hdfs.");
- } finally {
- out.close();
- }
- }
從本地上傳文件到 HDFS。
- 清單 12. JavaApi 上傳文件至 HDFS
- @Test
- public void copyFromLocal() throws IOException {
- fs.copyFromLocalFile(new Path("d:/local.txt"), new Path("/test/api"));
- }
從 HDFS 上下載文件。
- 清單 13. JavaApi 從 HDFS 下載文件
- @Test
- public void copyToLocal() throws IOException {
- fs.copyToLocalFile(new Path("/test/api/local.txt"), new Path("E:/"));
- }
MapReduce 實(shí)戰(zhàn)
什么是 MapReduce
MapReduce 是一種編程模型,"Map(映射)"和"Reduce(歸約)",是它們的主要思想,我們通過(guò) Map 函數(shù)來(lái)分布式處理輸入數(shù)據(jù),然后通過(guò) Reduce 匯總結(jié)果并輸出。我們編寫一個(gè) MapReduce 程序的一般步驟是:
- 編寫 map 程序。
- 編寫 reduce 程序。
- 編寫程序驅(qū)動(dòng)。
本章節(jié)的目標(biāo)
本節(jié)中我們將使用 MapReduce 框架來(lái)編寫一個(gè)簡(jiǎn)單的例子,這個(gè)例子是用來(lái)統(tǒng)計(jì) HDFS 指定目錄下的文件中每個(gè)字符出現(xiàn)的次數(shù)并將統(tǒng)計(jì)結(jié)果輸出到 HDFS 的指定目錄中。點(diǎn)擊此處獲取本章節(jié)源代碼。
Map 程序
我們繼承 Mapper 類并重寫了其 map 方法。Map 階段輸入的數(shù)據(jù)是從 hdfs 中拿到的原數(shù)據(jù),輸入的 key 為某一行起始位置相對(duì)于文件起始位置的偏移量,value 為該行的文本。輸出的內(nèi)容同樣也為鍵-值對(duì),這個(gè)時(shí)候輸出數(shù)據(jù)的鍵值對(duì)的類型可以自己指定,在本例中 key 是 Text 類型的,value 是 LongWritable 類型的。輸出的結(jié)果將會(huì)被發(fā)送到 reduce 函數(shù)進(jìn)一步處理。
- 清單 14. Map 程序
- public class CharCountMapper extends Mapper< LongWritable, Text, Text, LongWritable> {
- @Override
- protected void map(LongWritable key, Text value, Context context)
- throws IOException, InterruptedException {
- // 將這一行文本轉(zhuǎn)為字符數(shù)組
- char[] chars = value.toString().toCharArray();
- for (char c : chars) {
- // 某個(gè)字符出現(xiàn)一次,便輸出其出現(xiàn) 1 次。
- context.write(new Text(c + ""), new LongWritable(1));
- }
- }
- }
Reduce 程序
我們繼承 Reducer 類并重寫了其 reduce 方法。在本例中 Reduce 階段的輸入是 Map 階段的輸出,輸出的結(jié)果可以作為最終的輸出結(jié)果。相信你也注意到了,reduce 方法的第二個(gè)參數(shù)是一個(gè) Iterable,MapReduce 會(huì)將 map 階段中相同字符的輸出匯總到一起作為 reduce 的輸入。
- 清單 15. Reduce 程序
- public class CharCountReducer extends Reducer< Text, LongWritable, Text, LongWritable> {
- @Override
- protected void reduce(Text key, Iterable< LongWritable> values, Context context)
- throws IOException, InterruptedException {
- long count = 0;
- for (LongWritable value : values) {
- count += value.get();
- }
- context.write(key, new LongWritable(count));
- }
- }
驅(qū)動(dòng)程序
到目前為止,我們已經(jīng)有了 map 程序和 reduce 程序,我們還需要一個(gè)驅(qū)動(dòng)程序來(lái)運(yùn)行整個(gè)作業(yè)。可以看到我們?cè)谶@里初始化了一個(gè) Job 對(duì)象。Job 對(duì)象指定整個(gè) MapReduce 作業(yè)的執(zhí)行規(guī)范。我們用它來(lái)控制整個(gè)作業(yè)的運(yùn)作,在這里我們指定了 jar 包位置還有我們的 Map 程序、Reduce 程序、Map 程序的輸出類型、整個(gè)作業(yè)的輸出類型還有輸入輸出文件的地址。
- 清單 16. 驅(qū)動(dòng)程序
- public class CharCountDriver {
- public static void main(String[] args) throws Exception {
- Configuration configuration = new Configuration();
- Job job = Job.getInstance(configuration);
- // Hadoop 會(huì)自動(dòng)根據(jù)驅(qū)動(dòng)程序的類路徑來(lái)掃描該作業(yè)的 Jar 包。
- job.setJarByClass(cn.itweknow.mr.CharCountDriver.class);
- // 指定 mapper
- job.setMapperClass(CharCountMapper.class);
- // 指定 reducer
- job.setReducerClass(CharCountReducer.class);
- // map 程序的輸出鍵-值對(duì)類型
- job.setMapOutputKeyClass(Text.class);
- job.setMapOutputValueClass(LongWritable.class);
- // 輸出鍵-值對(duì)類型
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(LongWritable.class);
- // 輸入文件的路徑
- FileInputFormat.setInputPaths(job, new Path(args[0]));
- // 輸入文件路徑
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- boolean res = job.waitForCompletion(true);
- System.exit(res?0:1);
- }
- }
執(zhí)行 MapReduce 作業(yè)
打包作業(yè),我們需要將我們的 MapReduce 程序打成 jar 包。
- 清單 17. 打包作業(yè)
- mvn package -Dmaven.test.skip=true
將 jar 包復(fù)制到 hadoop 機(jī)器上。
在 HDFS 上準(zhǔn)備好要統(tǒng)計(jì)的文件,我準(zhǔn)備的文件在 HDFS 上的/mr/input/目錄下,內(nèi)容為"hello hadoop hdfs.I am coming."。
執(zhí)行 jar。
- 清單 18. 執(zhí)行作業(yè)
- hadoop jar mr-test-1.0-SNAPSHOT.jar cn.itweknow.mr.CharCountDriver /mr/input/ /mr/output/out.txt
查看結(jié)果。
我們先看看輸出目錄,結(jié)果如下,最終輸出的結(jié)果就存放在/mr/output/part-r-00000 文件中。
圖 1. MapReduce 作業(yè)輸出目錄
然后我們?cè)倏纯摧敵鑫募械木唧w內(nèi)容,如下所示:
圖 2. MapReduce 作業(yè)輸出結(jié)果
MapReduce 運(yùn)行原理
我們可以將一個(gè) MapReduce 作業(yè)的運(yùn)行過(guò)程簡(jiǎn)單的拆分成 6 個(gè)過(guò)程,分別是作業(yè)的提交、作業(yè)初始化、任務(wù)分配、任務(wù)執(zhí)行、進(jìn)度和狀態(tài)的更新、作業(yè)完成。下面我就一起來(lái)具體了解下這么幾個(gè)步驟。
作業(yè)的提交
當(dāng)我們調(diào)用 job.submit()或者 job.waitForCompletion()方法(其內(nèi)部也會(huì)調(diào)用 submit()方法)的時(shí)候,會(huì)創(chuàng)建一個(gè) JobSubmitter 對(duì)象,在 JobSubmitter 內(nèi)部所實(shí)現(xiàn)的作業(yè)提交過(guò)程如下:
- 向資源管理器請(qǐng)求新的應(yīng)用 ID 作為 MapReduce 作業(yè)的作業(yè) ID。
- 檢查作業(yè)的輸出目錄,如果沒(méi)有指定輸出目錄或者輸出目錄已經(jīng)存在就會(huì)拋出錯(cuò)誤,這也就是為啥我們?cè)趫?zhí)行 MapReduce 作業(yè)時(shí)為啥需要保證指定的輸出目錄不存在。
- 將作業(yè)運(yùn)行所需要的資源文件(作業(yè) JAR 包,配置文件,輸入分片)復(fù)制到一起(一個(gè)以作業(yè) ID 命名的目錄下)。
- 調(diào)用 submitApplication()方法提交作業(yè)。
作業(yè)的初始化
- 首先資源管理器會(huì)將作業(yè)請(qǐng)求傳遞給 YARN 調(diào)度器。
- 調(diào)度器會(huì)為作業(yè)分配一個(gè)容器。
- 資源管理器在節(jié)點(diǎn)管理器的管理下在容器中啟動(dòng) application master。
- application master 的主類 MRAppMaster 會(huì)創(chuàng)建多個(gè)簿記對(duì)象來(lái)跟蹤作業(yè)的進(jìn)度。
- 接收輸入分片。
- application master 為每個(gè)分片創(chuàng)建 map 任務(wù)以及確定 reduce 任務(wù),并且分配任務(wù) ID。
任務(wù)的分配
application master 會(huì)為創(chuàng)建的任務(wù)向資源管理器請(qǐng)求容器,先是為 map 任務(wù)請(qǐng)求資源,后為 reduce 任務(wù)請(qǐng)求資源。為 map 任務(wù)分配資源的時(shí)候需要考慮到數(shù)據(jù)本地化的局限,會(huì)盡量保證運(yùn)行的 map 任務(wù)所需要的數(shù)據(jù)塊存儲(chǔ)在當(dāng)前機(jī)器或者當(dāng)前機(jī)架中,這樣會(huì)極大的節(jié)省帶寬資源。而 reduce 任務(wù)則不存在這個(gè)限制。
任務(wù)的執(zhí)行
- 資源管理器為任務(wù)分配好容器后,application master 就通過(guò)與節(jié)點(diǎn)管理器通信啟動(dòng)容器。
- 在運(yùn)行任務(wù)之前,會(huì)將任務(wù)所需要的資源本地化。
- 運(yùn)行任務(wù)。
進(jìn)度和狀態(tài)的更新
任務(wù)在運(yùn)行的過(guò)程中,會(huì)對(duì)其精度保持追蹤,對(duì)與 map 任務(wù),其任務(wù)進(jìn)度就是已經(jīng)處理的輸入所占總輸入的比例。對(duì)與 reduce 任務(wù)來(lái)講就比較復(fù)雜了,因?yàn)檫@個(gè)部分包含資源復(fù)制階段、排序階段和 reduce 階段三個(gè)階段,每個(gè)階段都占整個(gè)完成比例的 1/3,也就是說(shuō)當(dāng)我們完成 reduce 的一半的時(shí)候進(jìn)度應(yīng)該為 5/6。對(duì)與狀態(tài)的更新,客戶端會(huì)每秒輪詢一次 application master 以接收***的任務(wù)狀態(tài)。
作業(yè)的完成
當(dāng) application master 收到作業(yè)***一個(gè)任務(wù)已經(jīng)完成的通知后,便把作業(yè)的狀態(tài)設(shè)置為"成功"。
為了方便大家理解,我這里將整個(gè)過(guò)程總結(jié)為一張圖,貼在這里僅供大家參考。
圖 3. MapReduce 程序運(yùn)行圖解
Shuffle
簡(jiǎn)介,什么是 Shuffle
MapReduce 程序會(huì)確保每個(gè) reduce 函數(shù)的輸入都是按鍵排序的。系統(tǒng)執(zhí)行排序以及將 map 函數(shù)的輸出傳給 reduce 函數(shù)的過(guò)程稱之為 shuffle。整個(gè) Shuffle 分為 Map 端和 Reduce 端,下圖是 MapReduce 的 Shuffle 的一個(gè)整體概覽圖,大家先看一下整個(gè)圖,我們后面再做進(jìn)一步的解釋說(shuō)明。
圖 4. Shuffle 概覽圖
Map 端 Shuffle
其實(shí) Map 函數(shù)產(chǎn)生的輸出會(huì)寫到磁盤上而不是 HDFS。但是它也不是簡(jiǎn)簡(jiǎn)單單的直接寫到磁盤,這中間有一個(gè)復(fù)雜的過(guò)程,下面我們就來(lái)拆解一下。
從上面的圖可以看到每個(gè) Map 任務(wù)都會(huì)有一個(gè)緩沖區(qū),這個(gè)緩沖區(qū)會(huì)臨時(shí)存儲(chǔ) map 函數(shù)輸出的內(nèi)容,緩沖區(qū)的個(gè)大小默認(rèn)是 100M,我們可以通過(guò) mapreduce.task.io.sort.mb 這個(gè)配置項(xiàng)配置,當(dāng)緩沖區(qū)中的內(nèi)容達(dá)到其設(shè)定的閾值(閾值的設(shè)置值是占整個(gè)緩沖區(qū)的大小,默認(rèn)為 0.8,我們可以通過(guò) mapreduce.map.sort.spill.percent 來(lái)配置)時(shí)就會(huì)產(chǎn)生溢出,這個(gè)時(shí)候會(huì)有一個(gè)后臺(tái)線程將緩沖區(qū)中的內(nèi)容分區(qū)(根據(jù)最終要傳給的 Reduce 任務(wù)分成不同的區(qū),分區(qū)的目的是將輸出劃分到不同的 Reducer 上去,后面的 Reducer 就會(huì)根據(jù)分區(qū)來(lái)讀取自己對(duì)應(yīng)的數(shù)據(jù))
然后區(qū)內(nèi)按照 key 排序,如果我們?cè)O(shè)置了 Combiner(Combiner 的本質(zhì)也是一個(gè) Reducer,其目的是對(duì)將要寫入到磁盤上的文件先進(jìn)行一次處理,這樣,寫入到磁盤的數(shù)據(jù)量就會(huì)減少。)
的話,這個(gè)時(shí)候會(huì)運(yùn)行 Combiner 函數(shù),***再寫入磁盤。而在這個(gè)過(guò)程中 Map 任務(wù)還會(huì)繼續(xù)往緩沖區(qū)中輸出內(nèi)容,如果出現(xiàn)緩沖區(qū)空間被占滿的情況,Map 任務(wù)就會(huì)阻塞直到緩沖區(qū)中的內(nèi)容被全部寫到磁盤中為止。
每次緩沖區(qū)溢出時(shí)都會(huì)新建一個(gè)新的溢出文件,這樣***其實(shí)是會(huì)出現(xiàn)多個(gè)溢出文件的,在 Map 任務(wù)結(jié)束前這些溢出文件會(huì)被合并到一個(gè)整的輸出文件。
Reduce 端 Shuffle
Reduce 端的 Shuffle 分為三個(gè)階段,復(fù)制階段、合并階段和 Reduce。
首先是復(fù)制階段,Reduce 任務(wù)需要集群上若干個(gè) map 輸出作為其輸入內(nèi)容,在每個(gè) Map 任務(wù)完成的時(shí)候 Reduce 任務(wù)就開復(fù)制其輸出,上面也提到過(guò) Map 任務(wù)在寫入磁盤前會(huì)將輸出進(jìn)行根據(jù) Reduce 任務(wù)進(jìn)行分區(qū),所以這里 Reduce 任務(wù)在復(fù)制的時(shí)候只會(huì)復(fù)制自己的那個(gè)分區(qū)里的內(nèi)容。如果 Map 的輸出非常小,那么 Reduce 會(huì)直接將其復(fù)制到內(nèi)存中,否則會(huì)被復(fù)制到磁盤。
合并階段,因?yàn)橛泻芏嗟?Map 任務(wù),所以 Reduce 復(fù)制過(guò)來(lái)的 map 輸出會(huì)有很多個(gè),在這個(gè)階段主要就是將這些 Map 輸出合并成為一個(gè)文件。
Reduce 階段,這個(gè)階段主要就是執(zhí)行我們的 Reduce 函數(shù)的代碼了,并產(chǎn)生最終的結(jié)果,然后寫入到 HDFS 中。
在文章的***,提供我在撰寫本文的過(guò)程中所編寫的一些源代碼,供大家參考。也希望大家能夠從本文中收獲一些幫助。
結(jié)束語(yǔ)
本文主要從 HDFS 和 MapReduce 兩個(gè)大的方面講解了 Hadoop 的相關(guān)知識(shí),并且編寫了一個(gè)簡(jiǎn)單的 MapReduce 程序,***還深入了解了一下 MapReduce 程序的運(yùn)行原理以及 Shuffle 相關(guān)的內(nèi)容。