成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

MapReduce連接:復制連接

數據庫
復制連接是map端的連接。復制連接得名于它的具體實現:連接中最小的數據集將會被復制到所有的map主機節點。復制連接有一個假設前提:在被連接的數據集中,有一個數據集足夠小到可以緩存在內存中。

如圖4.5所示,MapReduce復制連接工作原理如下:

  1. 使用分布式緩存(Districubted cache)將這個小數據集復制到所有運行map任務的節點。

  2. 用各個map任務初始化方法將這個小數據集裝載到一個哈希表(hashtable)中。

  3. 逐條用大數據集中的記錄遍歷這個哈希表,逐個判斷是否符合連接條件。

  4. 輸出符合連接條件的結果。

 

 

復制連接的實現非常直接明了。更具體的內容可以參考《Hadoop in Action》。附錄D提供了一個通用的框架來實現復制連接。這個框架支持任意類型的InputFormat和OutputFormat的數據。(我們將在下一個技術中使用這個框架。)復制連接框架根據內存足跡的大小從分布式緩存的內容和輸入塊(input split)兩者中動態地決定需要緩存的對象。

如果所有的輸入數據集都不能夠小到可以放到緩存中,那有沒有辦法來優化map端連接呢?那就到了看半連接(semi-join)的時間了。

附錄D.2 復制連接框架 

復制連接是map端連接,得名于它的具體實現:連接中最小的數據集將會被復制到所有的map主機節點。復制連接的實現非常直接明了。更具體的內容可以參考Chunk Lam的《Hadoop in Action》。 

這個部分的目標是:創建一個可以支持任意類型的數據集的通用的復制連接框架。這個框架中提供了一個優化的小功能:動態監測分布式緩存內容和輸入塊的大小,并判斷哪個更大。如果輸入塊較小,那么你就需要將map的輸入塊放到內存緩沖中,然后在map的cleanup方法中執行連接操作了。 

圖D.4是這個框架的類圖,這里提供了連接類(GenericReplicatedJoin)的具體實現,而不僅僅是一個抽象類。在這個框架外,這個類將和KeyValueTextInputFormat及TextOutputFormat協作。它的一個假設前提是:每個數據文件的***個標記是連接鍵。此外,連接類也可以被繼承擴展來支持任意類型的輸入和輸出。

圖D.5是連接框架的算法。Map的setup方法判斷在map的輸入塊和分布式緩存中的內容哪個大。如果分布式緩存的內容比較小,那么它將被裝載到內存緩存中。然后在Map函數開始連接操作。如果輸入塊比較小,map函數將輸入塊的鍵\值對裝載到內存緩存中。Map的cleanup方法將從分布式緩存中讀取記錄,逐條記錄和在內存緩存中的鍵\值對進行連接操作。

 

以下代碼是GenericReplicatedJoin類中setup方法。它在map的初始化階段被調用的。這個方法判斷分布式緩存中的文件和輸入塊哪個大。如果文件比較小,則將文件裝載到HashMap中。

  1. @Override 
  2. protected void setup(Context context) 
  3.     throws IOException, InterruptedException { 
  4.      
  5.     distributedCacheFiles = DistributedCache.getLocalCacheFiles(context.getConfiguration()); 
  6.     int distCacheSizes = 0; 
  7.      
  8.     for (Path distFile : distributedCacheFiles) { 
  9.         File distributedCacheFile = new File(distFile.toString()); 
  10.         distCacheSizes += distributedCacheFile.length(); 
  11.     } 
  12.      
  13.     if(context.getInputSplit() instanceof FileSplit) { 
  14.         FileSplit split = (FileSplit) context.getInputSplit(); 
  15.         long inputSplitSize = split.getLength(); 
  16.         distributedCacheIsSmaller = (distCacheSizes < inputSplitSize); 
  17.     } else { 
  18.         distributedCacheIsSmaller = true
  19.     } 
  20.      
  21.     if (distributedCacheIsSmaller) { 
  22.         for (Path distFile : distributedCacheFiles) { 
  23.             File distributedCacheFile = new File(distFile.toString()); 
  24.             DistributedCacheFileReader reader = getDistributedCacheReader(); 
  25.             reader.init(distributedCacheFile); 
  26.              
  27.             for (Pair p : (Iterable<Pair>) reader) { 
  28.                 addToCache(p); 
  29.             } 
  30.              
  31.             reader.close(); 
  32.         } 
  33.     } 

根據setup方法是否將分布式緩存的內容裝載到內存的緩存中,Map方法將會有不同的行為。如果分布式緩存中的內容被裝載到內存中,那么map方法就將輸入塊的記錄和內存中的緩存做連接操作。如果分布式緩存中的內容沒有被裝載到內存中,那么map方法就將輸入塊的記錄裝載到內存中,然后在cleanup方法中使用。

  1. @Override 
  2. protected void map(Object key, Object value, Context context) 
  3.     throws IOException, InterruptedException { 
  4.     Pair pair = readFromInputFormat(key, value); 
  5.      
  6.     if (distributedCacheIsSmaller) { 
  7.         joinAndCollect(pair, context); 
  8.     } else { 
  9.         addToCache(pair); 
  10.     } 
  11.  
  12. public void joinAndCollect(Pair p, Context context) 
  13.     throws IOException, InterruptedException { 
  14.     List<Pair> cached = cachedRecords.get(p.getKey()); 
  15.      
  16.     if (cached != null) { 
  17.         for (Pair cp : cached) { 
  18.             Pair result; 
  19.              
  20.             if (distributedCacheIsSmaller) { 
  21.                 result = join(p, cp); 
  22.             } else { 
  23.                 result = join(cp, p); 
  24.             } 
  25.              
  26.             if (result != null) { 
  27.                 context.write(result.getKey(), result.getData()); 
  28.             } 
  29.         } 
  30.     } 
  31.  
  32. public Pair join(Pair inputSplitPair, Pair distCachePair) { 
  33.     StringBuilder sb = new StringBuilder(); 
  34.      
  35.     if (inputSplitPair.getData() != null) { 
  36.         sb.append(inputSplitPair.getData()); 
  37.     } 
  38.      
  39.     sb.append("\t"); 
  40.      
  41.     if (distCachePair.getData() != null) { 
  42.         sb.append(distCachePair.getData()); 
  43.     } 
  44.      
  45.     return new Pair<Text, Text>( 
  46.                 new Text(inputSplitPair.getKey().toString()), 
  47.                 new Text(sb.toString())); 

當所有的記錄都被傳輸給map方法后,MapReduce將會調用cleanup方法。如果分布式緩存中的內容比輸入塊大,連接將會在cleanup中進行。連接的對象是map函數的緩存中的輸入塊的記錄和分布式緩存中的記錄。

  1. @Override 
  2. protected void cleanup(Context context) 
  3.     throws IOException, InterruptedException { 
  4.      
  5.     if (!distributedCacheIsSmaller) { 
  6.      
  7.         for (Path distFile : distributedCacheFiles) { 
  8.             File distributedCacheFile = new File(distFile.toString()); 
  9.             DistributedCacheFileReader reader = getDistributedCacheReader(); 
  10.             reader.init(distributedCacheFile); 
  11.              
  12.             for (Pair p : (Iterable<Pair>) reader) { 
  13.                 joinAndCollect(p, context); 
  14.             } 
  15.          
  16.             reader.close(); 
  17.         } 
  18.     } 

***,作業的驅動代碼必須指定需要裝載到分布式緩存中的文件。以下的代碼可以處理一個文件,也可以處理MapReduce輸入結果的一個目錄。

  1. Configuration conf = new Configuration(); 
  2.  
  3. FileSystem fs = smallFilePath.getFileSystem(conf); 
  4. FileStatus smallFilePathStatus = fs.getFileStatus(smallFilePath); 
  5.  
  6. if(smallFilePathStatus.isDir()) { 
  7.     for(FileStatus f: fs.listStatus(smallFilePath)) { 
  8.         if(f.getPath().getName().startsWith("part")) { 
  9.             DistributedCache.addCacheFile(f.getPath().toUri(), conf); 
  10.         } 
  11.     } 
  12. else { 
  13.     DistributedCache.addCacheFile(smallFilePath.toUri(), conf); 

這個框架假設分布式緩存中的內容和輸入塊的內容都可以被裝載到內存中。它的優點在于兩個數據集之中較小的才會裝載到內存中。

在論文《A Comparison of Join Algorithms for Log Processing in MapReduce》中,針對對于分布式緩存中的內容較大時的場景對這個方法進行了更多的優化。在他們的優化中,他們將分布式緩存分成N個分區,并將輸入塊放入N個哈希表。然后在cleanup方法中的優化就更加高效。

在map端的復制連接的問題在于,map任務必須在啟動時讀取分布式緩存。上述論文提到的另一個優化方案是重載FileInputFormat的splitting。將存在于同一個主機上的輸入塊合并成一個塊。然后就可以減少需要裝載分布式緩存的map任務的個數了。

***一個說明,Hadoop在org.apache.hadoop.mapred.join包中自帶了map端的連接。但是它需要有序的待連接的數據集的輸入文件,并要求將其分發到相同的分區中。這樣就造成了繁重的預處理工作。

原文鏈接:http://www.cnblogs.com/datacloud/p/3579333.html

責任編輯:彭凡 來源: 博客園
相關推薦

2014-03-18 10:23:11

MapReduce

2017-06-23 22:00:13

MySqlsslcentos

2015-08-21 13:50:49

Oracle連接

2009-07-22 10:53:42

MySQL左連接

2010-05-10 15:48:37

Unix連接

2021-03-24 09:06:01

MySQL長連接短連接

2018-06-06 11:01:25

HTTP長連接短連接

2011-03-28 14:04:10

SQL左連接右連接

2011-06-01 13:54:10

MySQL

2015-04-23 18:46:38

TCPTCP協議

2010-01-04 09:51:52

ADO連接對象

2010-11-08 15:47:01

SQL Server外

2010-11-11 13:51:36

SQL Server內

2014-01-02 14:04:39

PostgreSQLPerl

2014-01-02 15:41:24

PostgreSQLPHP

2014-01-02 13:22:01

PythonPostgreSQL

2019-09-16 09:29:01

TCP全連接隊列半連接隊列

2010-06-07 15:24:34

Java連接MYSQL

2023-01-31 18:09:12

物聯網移動物聯網

2010-08-24 09:29:37

內連接全連接
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 国产精品视频专区 | 亚洲精品国产电影 | 日本二区 | 精品欧美视频 | 特黄色一级毛片 | 国产精品日日做人人爱 | 久草视频在 | 欧美一二区 | 奇米超碰 | 精品国产视频 | 免费一看一级毛片 | 亚洲欧美一区二区三区视频 | 丝袜 亚洲 另类 欧美 综合 | 乱码av午夜噜噜噜噜动漫 | 在线观看国产视频 | 亚洲国产一区视频 | 欧美一区二区三区视频在线播放 | 国产高清免费在线 | 日韩亚洲视频在线 | 久久久久国产一区二区三区四区 | 日韩免费在线视频 | 中文字幕一区在线观看视频 | 亚洲国产精品精华素 | 国产一区二区不卡 | 五十女人一级毛片 | 国产精品久久久久久久久久久免费看 | 在线免费观看成人 | 99色播 | 国产一级久久久久 | 色综合天天综合网国产成人网 | 国产色婷婷精品综合在线手机播放 | 久久精品成人 | 日本中文字幕日韩精品免费 | 艹逼网 | 欧美精品综合在线 | 国产精品久久久久久福利一牛影视 | 毛片在线免费 | 日韩男人天堂 | 亚洲视频 欧美视频 | 操操日 | 欧美成人精品欧美一级 |