專(zhuān)家剖析 Hadoop源代碼中的Task類(lèi)用法
本節(jié)和大家一起學(xué)習(xí)一下Hadoop源代碼中的Task類(lèi),Task是一個(gè)虛基類(lèi),它有兩個(gè)子類(lèi):MapTask,ReduceTask,分別對(duì)應(yīng)著Map和Reduce。下面就來(lái)看一下本節(jié)的具體介紹吧。
Hadoop源代碼中的類(lèi)Task
Task是一個(gè)虛基類(lèi),它有兩個(gè)子類(lèi):MapTask,ReduceTask,分別對(duì)應(yīng)著Map和Reduce。先從成員變量開(kāi)始:
首先是和作業(yè)任務(wù)相關(guān)的信息,包括jobFile,作業(yè)的配置文件;taskId,任務(wù)ID,從中可以獲取作業(yè)ID;partition,Job內(nèi)ID;taskStatus,任務(wù)狀態(tài)。jobCleanup,jobSetup和taskCleanup是三個(gè)標(biāo)志位。
接下來(lái)是一組和錯(cuò)誤回復(fù)的變量。我們知道,如果在Task執(zhí)行過(guò)程中出錯(cuò),很有可能是因?yàn)檩斎胗袉?wèn)題,一個(gè)常用的策略是在下一次回復(fù)性執(zhí)行過(guò)程中,忽略這部分輸入,skipRanges,skipping和writeSkipRecs就是用來(lái)控制這個(gè)行為的。
currentRecStartIndex和currentRecIndexIterator配合,可以得到當(dāng)前的任務(wù)輸入。
conf保存了當(dāng)前任務(wù)的配置(JobConf形式),MapOutputFile上一部分已經(jīng)介紹了,用于管理臨時(shí)文件,跟它配合的是lDirAlloc,類(lèi)型為L(zhǎng)ocalDirAllocator,是本地文件分配器。jobContext和taskContext保持了Job和Task的上下文。committer定制了和Task生命周期相關(guān)的一些特殊處理(也可以看出是上下文)。
最后一部分應(yīng)該是輸出outputFormat。
和統(tǒng)計(jì)/狀態(tài)監(jiān)視的成員變量分散在類(lèi)的各處,如spilledRecordsCounter,taskProgress,counters等,我們就不再介紹了。
下面我們開(kāi)始來(lái)進(jìn)行分析一下Hadoop源代碼中類(lèi)Task的成員函數(shù),首先是虛方法,Task包含了下面3個(gè)虛方法:
publicabstractvoidrun(JobConfjob,TaskUmbilicalProtocolumbilical)
throwsIOException,ClassNotFoundException,InterruptedException;
執(zhí)行Task;
publicabstractTaskRunnercreateRunner(TaskTrackertracker,TaskTracker.TaskInProgresstip)throwsIOException;
創(chuàng)建一個(gè)TaskRunner;
publicabstractbooleanisMapTask();
是否是一個(gè)Map任務(wù)。上面這3個(gè)方法自然是和MapTask,ReduceTask相關(guān),也需要它們實(shí)現(xiàn)。
Hadoop源代碼中的構(gòu)造函數(shù)很簡(jiǎn)單,主要是初始化一些成員函數(shù)。initialize也用于初始化成員,它被Task的子類(lèi)調(diào)用,用于子類(lèi)傳入一些子類(lèi)中構(gòu)造的對(duì)象。構(gòu)造函數(shù)后面是一系列的setter和getter,還有實(shí)現(xiàn)Writable的write和readFields。
localizeConfiguration函數(shù)用于將一些和Task相關(guān)的信息存放到JobConf里,這也是HadoopMapReduce中重要的參數(shù)傳遞方式。
接下來(lái)分析的是一系列和Task生命周期相關(guān)的函數(shù)。
publicvoiddone(TaskUmbilicalProtocolumbilical,TaskReporterreporter)
done被多個(gè)方法調(diào)用(下圖),用于做結(jié)束任務(wù)的一些清理工作,步驟如下:
l更新計(jì)數(shù)器updateCounters();
l如果任務(wù)需要提交,設(shè)置Taks狀態(tài)為COMMIT_PENDING,并利用TaskUmbilicalProtocol,匯報(bào)Task完成,等待提交;然后調(diào)用commit提交任務(wù)(下面分析)
l設(shè)置任務(wù)結(jié)束標(biāo)志位;結(jié)束Reporter通信線(xiàn)程;
l發(fā)送最后一次統(tǒng)計(jì)報(bào)告(通過(guò)sendLastUpdate方法,很簡(jiǎn)單);
l利用TaskUmbilicalProtocol報(bào)告結(jié)束狀態(tài)(通過(guò)sendDone方法,很簡(jiǎn)單)。
commit方法被done方法調(diào)用,用于等待TaskTracker的可提交信號(hào)。通過(guò)這種機(jī)制,Task可以等待TaskTracker上需要的一些后續(xù)處理,比方說(shuō),把Task的結(jié)果取走,需要TaskTracker的協(xié)調(diào)和確認(rèn)。commit還會(huì)調(diào)用org.apache.hadoop.mapreduce.OutputCommitter的commitTask方法,執(zhí)行一些子類(lèi)需要的commit事件處理。
runJobCleanupTask,runJobSetupTask和runTaskCleanupTask應(yīng)用在Maptask和ReduceTask的run方法中,用于做一些準(zhǔn)備和可能的清除任務(wù)。
runJobSetupTask:為建立Job做準(zhǔn)備,執(zhí)行狀態(tài)設(shè)置,然后調(diào)用org.apache.hadoop.mapreduce.OutputCommitter的setupJob,最后通過(guò)done,通知TaskTracker任務(wù)完成。
runJobCleanupTask:清理Job,包括步驟狀態(tài)設(shè)置,更新?tīng)顟B(tài)到TaskTracker,調(diào)用org.apache.hadoop.mapreduce.OutputCommitter的相關(guān)方法,通過(guò)done,通知TaskTracker任務(wù)完成。
runTaskCleanupTask:清理Task任務(wù),和runJobCleanupTask類(lèi)似。
應(yīng)該說(shuō),這些方法只是提供了一個(gè)通用的框架,具體需要的執(zhí)行,在于org.apache.hadoop.mapreduce.OutputCommitter的具體實(shí)現(xiàn)。本節(jié)關(guān)于Hadoop源代碼中的類(lèi)Task相關(guān)內(nèi)容介紹完畢。
【編輯推薦】
- 學(xué)習(xí)筆記 Hadoop集群如何搭建
- Hadoop0.20.2集群配置入門(mén)指導(dǎo)手冊(cè)
- Hadoop文件系統(tǒng)如何快速安裝?
- Hadoop集群搭建過(guò)程中相關(guān)環(huán)境配置詳解
- Hadoop完全分布模式安裝實(shí)現(xiàn)詳解