成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

MapReduce連接:重分區連接

數據庫
連接是關系運算,可以用于合并關系(relation)。對于數據庫中的表連接操作,可能已經廣為人知了。在MapReduce中,連接可以用于合并兩個或多個數據集。例如,用戶基本信息和用戶活動詳情信息。用戶基本信息來自于OLTP數據庫。用戶活動詳情信息來自于日志文件。

 MapReduce的連接操作可以用于以下場景:

  • 用戶的人口統計信息的聚合操作(例如:青少年和中年人的習慣差異)。

  • 當用戶超過一定時間沒有使用網站后,發郵件提醒他們。(這個一定時間的閾值是用戶自己預定義的)

  • 分析用戶的瀏覽習慣。讓系統可以基于這個分析提示用戶有哪些網站特性還沒有使用到。進而形成一個反饋循環。

所有這些場景都要求將多個數據集連接起來。

最常用的兩個連接類型是內連接(inner join)和外連接(outer join)。如下圖所示,內連接比較兩個關系中所有的元組,判斷是否滿足連接條件,然后生成一個滿足連接條件的結果集。與內連接相反的是,外連接并不需要兩個關系的元組都滿足連接條件。在連接條件不滿足的時候,外連接可以將其中一方的數據保留在結果集中。

為了實現內連接和外連接,MapReduce中有三種連接策略,如下所示。這三種連接策略有的在map階段,有的在reduce階段。它們都針對MapReduce的排序-合并(sort-merge)的架構進行了優化。

  1. 重分區連接(Repartition join)—— reduce端連接。使用場景:連接兩個或多個大型數據集。

  2. 復制連接(Replication join)—— map端連接。使用場景:待連接的數據集中有一個數據集足夠小到可以完全放在緩存中。

  3. 半連接(Semi-join)—— 另一個map端連接。使用場景:待連接的數據集中有一個數據集非常大,但同時這個數據集可以被過濾成小到可以放在緩存中。

在介紹完這些連接策略之后,還會介紹另一個策略:決策樹。可以根據實際情況選擇最優策略。

4.1.1 重分區連接(Repartition join)

重分區連接是reduce端連接。它利用MapReduce的排序-合并機制來分組數據。它只使用一個單獨的MapReduce任務,并支持多路連接(N-way join)。多路指的是多個數據集。

Map階段負責從多個數據集中讀取數據,決定每個數據的連接值,將連接值作為輸出鍵(output key)。輸出值(output value)則包含將在reduce階段被合并的值。

Reduce階段,一個reduce接收map函數傳來的每一個輸出鍵的所有輸出值,并將數據分為多個分區。在此之后,reduce對所有的分區進行笛卡爾積(Cartersian product)連接運算,并生成全部的結果集。

以上MapReduce過程如圖4.2所示:

注:過濾(filtering)和投影(projection)

在MapReduce重分區連接中,最好能夠減少map階段傳輸到reduce階段的數據量。因為通過網絡在這兩個階段中排序和傳輸數據會產生很高的成本。如果不能避免reduce端的工作,那么一個最佳實踐就是盡可能在map階段多過濾數據和投影。過濾指的是將map極端的輸入數據中不需要的部分丟棄。投影是關系代數的概念。投影用于減少發送給reduce的字段。例如:在分析用戶數據的時候,如果只關注用戶的年齡,那么在map任務中應該只投影(或輸出)年齡字段,不考慮用戶的其他的字段。

技術19:優化重分區連接

《Hadoop in Action》給出了一個例子,說明如何使用Hadoop的社區包(contrib package)org.apache.hadoop.contrib.utils.join實現重分區連接。這個貢獻包打包了所有的處理細節,僅僅需要實現一個非常簡單的方法。

然而,這個社區包對重分區的實現方法的空間效率低下。它需要將待連接的所有輸出值都讀取到內存中,然后進行多路連接(multiway join)。實際上,如果僅僅將小數據集讀取到內存中,然后用小數據集遍歷大數據集來進行連接,這樣將更加高效。

問題

需要在MapReduce中進行重分區連接,但是不希望在reduce階段將所有的數據都放到緩存中。

解決方案

這個技術運用了優化后的重分區框架。它僅僅將一個待連接的數據集放在緩存中,減少了reduce需要放在緩存中的數據。

討論

附錄D中介紹了優化后的重分區框架的實現。這個實現是根據org.apache.hadoop.contrib.utils.join社區包進行建模。這個優化后的框架僅僅緩存兩個數據集中比較小的那一個,以減少內存消耗。圖4.3是優化后的重分區連接的流程圖:

圖4.4是實現的類圖。類圖中包含兩個部分,一個通用框架和一些類的實現樣例。

使用這個連接框架需要實現抽象類OptimizedDataJoinMapperBase和OptimizedDataJoinReducerBase。

例如,需要連接用戶詳情數據和用戶活動日志。第一步,判斷兩個數據集中那一個比較小。對于一般的網站來說,用戶詳情數據會比較小,用戶活動日志會比較大。

在如下示例中,用戶數據中有用戶姓名,年齡和所在州

  1. $ cat test-data/ch4/users.txt 
  2. anne 22 NY 
  3. joe 39 CO 
  4. alison 35 NY 
  5. mike 69 VA 
  6. marie 27 OR 
  7. jim 21 OR 
  8. bob 71 CA 
  9. mary 53 NY 
  10. dave 36 VA 
  11. dude 50 CA 

用戶活動日志中有用戶姓名,進行的動作,來源IP。這個文件一般都要比用戶數據要大得多。

  1. $ cat test-data/ch4/user-logs.txt 
  2. jim logout 93.24.237.12 
  3. mike new_tweet 87.124.79.252 
  4. bob new_tweet 58.133.120.100 
  5. mike logout 55.237.104.36 
  6. jim new_tweet 93.24.237.12 
  7. marie view_user 122.158.130.90 

首先,必須實現抽象類OptimizedDataJoinMapperBase。這個將在map端被調用。這個類將創建map的輸出鍵和輸出值。同時,它還將提示整個框架,當前處理的文件是不是比較小的那個。

  1. public class SampleMap extends OptimizedDataJoinMapperBase { 
  2.  
  3.   private boolean smaller; 
  4.  
  5.   @Override 
  6.   protected Text generateInputTag(String inputFile) { 
  7.     // tag the row with input file name (data source) 
  8.     smaller = inputFile.contains("users.txt"); 
  9.     return new Text(inputFile); 
  10.   } 
  11.  
  12.   @Override 
  13.   protected String genGroupKey(Object key, OutputValue output) { 
  14.     return key.toString(); 
  15.   } 
  16.  
  17.   @Override 
  18.   protected boolean isInputSmaller(String inputFile) { 
  19.     return smaller; 
  20.   } 
  21.  
  22.   @Override 
  23.   protected OutputValue genMapOutputValue(Object o) { 
  24.     return new TextTaggedOutputValue((Text) o); 
  25.   } 

下一步,你需要實現抽象類 OptimizedDataJoinReducerBase。它將在reduce端被調用。在這個類中,將從map端傳入不同數據集的輸出鍵和輸出值,然后返回reduce端的輸出數組。

  1. public class SampleReduce extends OptimizedDataJoinReducerBase { 
  2.  
  3.   private TextTaggedOutputValue output = new TextTaggedOutputValue(); 
  4.   private Text textOutput = new Text(); 
  5.  
  6.   @Override 
  7.   protected OutputValue combine(String key, 
  8.                                 OutputValue smallValue, 
  9.                                 OutputValue largeValue) { 
  10.     if(smallValue == null || largeValue == null) { 
  11.       return null; 
  12.     } 
  13.     Object[] values = { 
  14.         smallValue.getData(), largeValue.getData() 
  15.     }; 
  16.     textOutput.set(StringUtils.join(values, "\t")); 
  17.     output.setData(textOutput); 
  18.     return output; 
  19.   } 

最后,任務的主代碼(driver code)需要指明InputFormat類,并設置次排序(Secondary sort)。

  1. job.setInputFormat(KeyValueTextInputFormat.class); 
  2.  
  3.     job.setMapOutputKeyClass(CompositeKey.class); 
  4.     job.setMapOutputValueClass(TextTaggedOutputValue.class); 
  5.     job.setOutputKeyClass(Text.class); 
  6.     job.setOutputValueClass(Text.class); 
  7.  
  8.     job.setPartitionerClass(CompositeKeyPartitioner.class); 
  9.     job.setOutputKeyComparatorClass(CompositeKeyComparator.class); 
  10.     job.setOutputValueGroupingComparator(CompositeKeyOnlyComparator.class); 

現在連接的準備工作就做完了,可以開始運行連接:

  1. $ hadoop fs -put test-data/ch4/users.txt users.txt 
  2. $ hadoop fs -put test-data/ch4/user-logs.txt user-logs.txt 
  3. $ bin/run.sh com.manning.hip.ch4.joins.improved.SampleMain users.txt,user-logs.txt output 
  4. $ hadoop fs -cat output/part* 
  5. bob 71 CA new_tweet 58.133.120.100 
  6. jim 21 OR logout 93.24.237.12 
  7. jim 21 OR new_tweet 93.24.237.12 
  8. jim 21 OR login 198.184.237.49 
  9. marie 27 OR login 58.133.120.100 
  10. marie 27 OR view_user 122.158.130.90 
  11. mike 69 VA new_tweet 87.124.79.252 
  12. mike 69 VA logout 55.237.104.36 

如果和連接的源文件相對比,可以看到因為實現了一個內連接,輸出中不包括用戶anne,alison等不存在于日志文件中的記錄。

小結:

這個連接的實現通過只緩存比較小的數據集來提高來Hadoop社區包的效率。但是,當數據從map階段傳輸到reduce階段的時候,仍然產生了很高的網絡成本。

此外,Hadoop社區包支持多路連接,這里的實現只支持二路連接。

如果要更多地減少reduce端連接的內存足跡(memory footprint),一個簡單的機制是在map函數中更多地進行投影操作。投影減少了map階段的輸出中的字段。例如:在分析用戶數據的時候,如果只關注用戶的年齡,那么在map任務中應該只投影(或輸出)年齡字段,不考慮用戶的其他的字段。這樣就減少了map和reduce之間的網絡負擔,也減少了reduce在連接時的內存消耗。

和原始的社區包一樣,這里的重分區的實現也支持過濾和投影。通過允許genMapOutputValue方法返回空值,就可以支持過濾。通過在genMapOutputValue方法中定義輸出值的內容,就可以支持投影。

如果你既想輸出所有的數據到reduce,又想避免排序的損耗,就需要考慮另外兩種連接策略,復制連接和半連接。

附錄D 優化后的MapReduce連接框架

在這個附錄,我們將討論在第4張中使用的兩個連接框架。第一個是重連接框架。它減少了org.apache.hadoop.contrib.utils.join包的實現的Hadoop連接的內存足跡。第二個是復制連接框架。它可以將較小的數據集放在緩存中。

D.1 優化后的重分區框架

Hadoop社區連接包需要將每個鍵的所有值都讀取到內存中。如何才能在reduce端的連接減少內存開銷呢?本文提供的優化中,只需要緩存較小的數據集,然后在連接中遍歷較大數據集中的數據。這個方法中還包括針對map的輸出數據的次排序,那么reducer先接收到較小的數據集,然后接收到較大的數據集。圖D.1是這個過程的流程圖。

圖D.2是實現的類圖。類圖中包含兩個部分,一個通用框架和一些類的實現樣例。

連接框架

我們以和Hadoop社區連接包的近似的風格編寫連接的代碼。目標是創建可以處理任意數據集的通用重分區機制。為簡潔起見,我們重點說明主要部分。

首先是OptimizedDataJoinMapperBase類。這個類的作用是辨認出較小的數據集,并生成輸出鍵和輸出值。Configure方法在mapper創建時被調用。Configure方法的作用之一是標識每一個數據集,讓reducer可以區分數據的源數據集。另一個作用是辨認當前的輸入數據是否是較小的數據集。

  1. protected abstract Text generateInputTag(String inputFile); 
  2.  
  3. protected abstract boolean isInputSmaller(String inputFile); 
  4.  
  5. public void configure(JobConf job) { 
  6.  
  7.     this.inputFile = job.get("map.input.file"); 
  8.     this.inputTag = generateInputTag(this.inputFile); 
  9.      
  10.     if(isInputSmaller(this.inputFile)) { 
  11.         smaller = new BooleanWritable(true); 
  12.         outputKey.setOrder(0); 
  13.     } else { 
  14.         smaller = new BooleanWritable(false); 
  15.         outputKey.setOrder(1); 
  16.     } 

Map方法首先調用自定義的方法 (generateTaggedMapOutput) 來生成OutputValue對象。這個對象包含了在連接中需要使用的值(也可能包含了最終輸出的值),和一個標識較大或較小數據集的布爾值。如果map方法可以調用自定義的方法 (generateGroupKey) 來得到可以在連接中使用的鍵,那么這個鍵就作為map的輸出鍵。

  1. protected abstract OptimizedTaggedMapOutput generateTaggedMapOutput(Object value); 
  2.  
  3. protected abstract String generateGroupKey(Object key, OptimizedTaggedMapOutput aRecord); 
  4.  
  5. public void map(Object key, Object value, OutputCollector output, Reporter reporter) 
  6.     throws IOException { 
  7.      
  8.     OptimizedTaggedMapOutput aRecord = generateTaggedMapOutput(value); 
  9.      
  10.     if (aRecord == null) { 
  11.         return
  12.     } 
  13.      
  14.     aRecord.setSmaller(smaller); 
  15.     String groupKey = generateGroupKey(aRecord); 
  16.      
  17.     if (groupKey == null) { 
  18.         return
  19.     } 
  20.      
  21.     outputKey.setKey(groupKey); 
  22.     output.collect(outputKey, aRecord); 

圖D.3 說明了map輸出的組合鍵(composite 可以)和組合值。次排序將會根據連接鍵(join key)進行分區,并用整個組合鍵來進行排序。組合鍵包括一個標識源數據集(較大或較小)的整形值,因此可以根據這個整形值來保證較小源數據集的值先于較大源數據的值被reduce接收。

下一步是深入reduce。此前已經可以保證較小源數據集的值將會先于較大源數據集的值被接收。這里就可以將所有的較小源數據集的值放到緩存中。在開始接收較大源數據集的值的時候,就開始和緩存中的值做連接操作。

  1. public void reduce(Object key, Iterator values, OutputCollector output, Reporter reporter) 
  2.     throws IOException {  
  3.  
  4.     CompositeKey k = (CompositeKey) key; 
  5.     List<OptimizedTaggedMapOutput> smaller = new ArrayList<OptimizedTaggedMapOutput>(); 
  6.      
  7.     while (values.hasNext()) { 
  8.         Object value = values.next(); 
  9.         OptimizedTaggedMapOutput cloned =((OptimizedTaggedMapOutput) value).clone(job); 
  10.          
  11.         if (cloned.isSmaller().get()) { 
  12.             smaller.add(cloned); 
  13.         } else { 
  14.             joinAndCollect(k, smaller, cloned, output, reporter); 
  15.         } 
  16.     } 

方法joinAndCollect包含了兩個數據集的值,并輸出它們。

 

  1. protected abstract OptimizedTaggedMapOutput combine( 
  2.                         String key, 
  3.                         OptimizedTaggedMapOutput value1, 
  4.                         OptimizedTaggedMapOutput value2); 
  5.                          
  6. private void joinAndCollect(CompositeKey key, 
  7.                             List<OptimizedTaggedMapOutput> smaller, 
  8.                             OptimizedTaggedMapOutput value, 
  9.                             OutputCollector output, 
  10.                             Reporter reporter) 
  11.     throws IOException { 
  12.      
  13.     if (smaller.size() < 1) { 
  14.         OptimizedTaggedMapOutput combined = combine(key.getKey(), null, value); 
  15.         collect(key, combined, output, reporter); 
  16.     } else { 
  17.         for (OptimizedTaggedMapOutput small : smaller) { 
  18.             OptimizedTaggedMapOutput combined = combine(key.getKey(), small, value); 
  19.             collect(key, combined, output, reporter); 
  20.         } 
  21.     } 

這些就是這個框架的主要內容。

原文鏈接:http://www.cnblogs.com/datacloud/p/3578509.html

責任編輯:彭凡 來源: 博客園
相關推薦

2014-03-20 09:49:51

MapReduce

2015-08-21 13:50:49

Oracle連接

2009-07-22 10:53:42

MySQL左連接

2010-05-10 15:48:37

Unix連接

2021-03-24 09:06:01

MySQL長連接短連接

2011-03-28 14:04:10

SQL左連接右連接

2018-06-06 11:01:25

HTTP長連接短連接

2011-06-01 13:54:10

MySQL

2022-01-11 08:46:56

Oracle 在線重定義數據庫

2015-04-23 18:46:38

TCPTCP協議

2010-01-04 09:51:52

ADO連接對象

2010-11-08 15:47:01

SQL Server外

2010-11-11 13:51:36

SQL Server內

2014-01-02 13:22:01

PythonPostgreSQL

2014-01-02 14:04:39

PostgreSQLPerl

2014-01-02 15:41:24

PostgreSQLPHP

2019-09-16 09:29:01

TCP全連接隊列半連接隊列

2010-06-07 15:24:34

Java連接MYSQL

2023-01-31 18:09:12

物聯網移動物聯網

2010-08-24 09:29:37

內連接全連接
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 免费观看一级毛片 | 精品福利一区二区三区 | 欧美日本韩国一区二区三区 | 久久久91精品国产一区二区精品 | 亚洲一区二区三区四区五区午夜 | 亚洲三区在线观看 | 欧美日韩视频一区二区 | 91久久综合 | 污片在线免费观看 | 亚洲成av片人久久久 | 久久精品久久综合 | 午夜影院在线观看免费 | www.婷婷 | 夜夜久久 | 91视频18 | 色综合久久久 | 日韩精品视频在线播放 | 国产免费让你躁在线视频 | 欧美日韩国产中文字幕 | 国产一区二区三区视频 | 欧美日韩久久精品 | 国产成人福利 | 九九亚洲精品 | 久久国产香蕉 | 欧美一区免费 | 亚洲国产午夜 | 国产精品成人一区二区三区夜夜夜 | 欧美日韩国产一区二区三区不卡 | 午夜视频在线观看视频 | 国产精品视频中文字幕 | 国产69久久精品成人看动漫 | 亚洲成色777777在线观看影院 | 午夜天堂| 999久久久| 久久精品国产亚洲夜色av网站 | 91网视频| 一起操网站| 成人不卡| 日韩一区二区在线视频 | 欧美一区二区三区在线播放 | 欧美在线视频a |