MapReduce連接:復制連接
如圖4.5所示,MapReduce復制連接工作原理如下:
-
使用分布式緩存(Districubted cache)將這個小數據集復制到所有運行map任務的節點。
-
用各個map任務初始化方法將這個小數據集裝載到一個哈希表(hashtable)中。
-
逐條用大數據集中的記錄遍歷這個哈希表,逐個判斷是否符合連接條件。
-
輸出符合連接條件的結果。
復制連接的實現非常直接明了。更具體的內容可以參考《Hadoop in Action》。附錄D提供了一個通用的框架來實現復制連接。這個框架支持任意類型的InputFormat和OutputFormat的數據。(我們將在下一個技術中使用這個框架。)復制連接框架根據內存足跡的大小從分布式緩存的內容和輸入塊(input split)兩者中動態地決定需要緩存的對象。
如果所有的輸入數據集都不能夠小到可以放到緩存中,那有沒有辦法來優化map端連接呢?那就到了看半連接(semi-join)的時間了。
附錄D.2 復制連接框架
復制連接是map端連接,得名于它的具體實現:連接中最小的數據集將會被復制到所有的map主機節點。復制連接的實現非常直接明了。更具體的內容可以參考Chunk Lam的《Hadoop in Action》。
這個部分的目標是:創建一個可以支持任意類型的數據集的通用的復制連接框架。這個框架中提供了一個優化的小功能:動態監測分布式緩存內容和輸入塊的大小,并判斷哪個更大。如果輸入塊較小,那么你就需要將map的輸入塊放到內存緩沖中,然后在map的cleanup方法中執行連接操作了。
圖D.4是這個框架的類圖,這里提供了連接類(GenericReplicatedJoin)的具體實現,而不僅僅是一個抽象類。在這個框架外,這個類將和KeyValueTextInputFormat及TextOutputFormat協作。它的一個假設前提是:每個數據文件的***個標記是連接鍵。此外,連接類也可以被繼承擴展來支持任意類型的輸入和輸出。
圖D.5是連接框架的算法。Map的setup方法判斷在map的輸入塊和分布式緩存中的內容哪個大。如果分布式緩存的內容比較小,那么它將被裝載到內存緩存中。然后在Map函數開始連接操作。如果輸入塊比較小,map函數將輸入塊的鍵\值對裝載到內存緩存中。Map的cleanup方法將從分布式緩存中讀取記錄,逐條記錄和在內存緩存中的鍵\值對進行連接操作。
以下代碼是GenericReplicatedJoin類中setup方法。它在map的初始化階段被調用的。這個方法判斷分布式緩存中的文件和輸入塊哪個大。如果文件比較小,則將文件裝載到HashMap中。
- @Override
- protected void setup(Context context)
- throws IOException, InterruptedException {
- distributedCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration());
- int distCacheSizes = 0;
- for (Path distFile : distributedCacheFiles) {
- File distributedCacheFile = new File(distFile.toString());
- distCacheSizes += distributedCacheFile.length();
- }
- if(context.getInputSplit() instanceof FileSplit) {
- FileSplit split = (FileSplit) context.getInputSplit();
- long inputSplitSize = split.getLength();
- distributedCacheIsSmaller = (distCacheSizes < inputSplitSize);
- } else {
- distributedCacheIsSmaller = true;
- }
- if (distributedCacheIsSmaller) {
- for (Path distFile : distributedCacheFiles) {
- File distributedCacheFile = new File(distFile.toString());
- DistributedCacheFileReader reader = getDistributedCacheReader();
- reader.init(distributedCacheFile);
- for (Pair p : (Iterable<Pair>) reader) {
- addToCache(p);
- }
- reader.close();
- }
- }
- }
根據setup方法是否將分布式緩存的內容裝載到內存的緩存中,Map方法將會有不同的行為。如果分布式緩存中的內容被裝載到內存中,那么map方法就將輸入塊的記錄和內存中的緩存做連接操作。如果分布式緩存中的內容沒有被裝載到內存中,那么map方法就將輸入塊的記錄裝載到內存中,然后在cleanup方法中使用。
- @Override
- protected void map(Object key, Object value, Context context)
- throws IOException, InterruptedException {
- Pair pair = readFromInputFormat(key, value);
- if (distributedCacheIsSmaller) {
- joinAndCollect(pair, context);
- } else {
- addToCache(pair);
- }
- }
- public void joinAndCollect(Pair p, Context context)
- throws IOException, InterruptedException {
- List<Pair> cached = cachedRecords.get(p.getKey());
- if (cached != null) {
- for (Pair cp : cached) {
- Pair result;
- if (distributedCacheIsSmaller) {
- result = join(p, cp);
- } else {
- result = join(cp, p);
- }
- if (result != null) {
- context.write(result.getKey(), result.getData());
- }
- }
- }
- }
- public Pair join(Pair inputSplitPair, Pair distCachePair) {
- StringBuilder sb = new StringBuilder();
- if (inputSplitPair.getData() != null) {
- sb.append(inputSplitPair.getData());
- }
- sb.append("\t");
- if (distCachePair.getData() != null) {
- sb.append(distCachePair.getData());
- }
- return new Pair<Text, Text>(
- new Text(inputSplitPair.getKey().toString()),
- new Text(sb.toString()));
- }
當所有的記錄都被傳輸給map方法后,MapReduce將會調用cleanup方法。如果分布式緩存中的內容比輸入塊大,連接將會在cleanup中進行。連接的對象是map函數的緩存中的輸入塊的記錄和分布式緩存中的記錄。
- @Override
- protected void cleanup(Context context)
- throws IOException, InterruptedException {
- if (!distributedCacheIsSmaller) {
- for (Path distFile : distributedCacheFiles) {
- File distributedCacheFile = new File(distFile.toString());
- DistributedCacheFileReader reader = getDistributedCacheReader();
- reader.init(distributedCacheFile);
- for (Pair p : (Iterable<Pair>) reader) {
- joinAndCollect(p, context);
- }
- reader.close();
- }
- }
- }
***,作業的驅動代碼必須指定需要裝載到分布式緩存中的文件。以下的代碼可以處理一個文件,也可以處理MapReduce輸入結果的一個目錄。
- Configuration conf = new Configuration();
- FileSystem fs = smallFilePath.getFileSystem(conf);
- FileStatus smallFilePathStatus = fs.getFileStatus(smallFilePath);
- if(smallFilePathStatus.isDir()) {
- for(FileStatus f: fs.listStatus(smallFilePath)) {
- if(f.getPath().getName().startsWith("part")) {
- DistributedCache.addCacheFile(f.getPath().toUri(), conf);
- }
- }
- } else {
- DistributedCache.addCacheFile(smallFilePath.toUri(), conf);
- }
這個框架假設分布式緩存中的內容和輸入塊的內容都可以被裝載到內存中。它的優點在于兩個數據集之中較小的才會裝載到內存中。
在論文《A Comparison of Join Algorithms for Log Processing in MapReduce》中,針對對于分布式緩存中的內容較大時的場景對這個方法進行了更多的優化。在他們的優化中,他們將分布式緩存分成N個分區,并將輸入塊放入N個哈希表。然后在cleanup方法中的優化就更加高效。
在map端的復制連接的問題在于,map任務必須在啟動時讀取分布式緩存。上述論文提到的另一個優化方案是重載FileInputFormat的splitting。將存在于同一個主機上的輸入塊合并成一個塊。然后就可以減少需要裝載分布式緩存的map任務的個數了。
***一個說明,Hadoop在org.apache.hadoop.mapred.join包中自帶了map端的連接。但是它需要有序的待連接的數據集的輸入文件,并要求將其分發到相同的分區中。這樣就造成了繁重的預處理工作。
原文鏈接:http://www.cnblogs.com/datacloud/p/3579333.html