成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

hadoop中mapreduce的常用類

網絡 網絡管理 網絡運維 Hadoop
新舊API是同時存在于1.1.2的hadoop中的。以前還一直納悶兒為什么有時候是jobClient提交任務,有時是Job...不管API是否更新,下面這些類也還是存在于API中的,經過自己跟蹤源碼,發現原理還是這些。只不過進行了重新組織,進行了一些封裝,使得擴展性更好。

寫這個文章的時候才意識到新舊API是同時存在于1.1.2的hadoop中的。以前還一直納悶兒為什么有時候是jobClient提交任務,有時是Job...不管API是否更新,下面這些類也還是存在于API中的,經過自己跟蹤源碼,發現原理還是這些。只不過進行了重新組織,進行了一些封裝,使得擴展性更好。所以還是把這些東西從記事本貼進來吧。

關于這些類的介紹以及使用,有的是在自己debug中看到的,多數為純翻譯API的注釋,但是翻譯的過程受益良多。

GenericOptionsParser

parseGeneralOptions(Options opts, Configuration conf, String[] args)解析命令行參數

GenericOptionsParser是為hadoop框架解析命令行參數的工具類。它能夠辨認標準的命令行參數,使app能夠輕松指定namenode,jobtracker,以及額外的配置資源或信息等。它支持的功能有:

-conf 指定配置文件;

-D 指定配置信息;

-fs 指定namenode

-jt 指定jobtracker

-files 指定需要copy到MR集群的文件,以逗號分隔

-libjars指定需要copy到MR集群的classpath的jar包,以逗號分隔

-archives指定需要copy到MR集群的壓縮文件,以逗號分隔,會自動解壓縮

1. String[] otherArgs = new GenericOptionsParser(job, args)

2. .getRemainingArgs();

3. if (otherArgs.length != 2) {

4. System.err.println("Usage: wordcount ");

5. System.exit(2);

6. }

ToolRunner

用來跑實現Tool接口的工具。它與GenericOptionsParser合作來解析命令行參數,只在此次運行中更改configuration的參數。

Tool

處理命令行參數的接口。Tool是MR的任何tool/app的標準。這些實現應該代理對標準命令行參數的處理。下面是典型實現:

  1. public class MyApp extends Configured implements Tool {   
  2.            
  3. public int run(String[] args) throws Exception {   
  4. // 即將被ToolRunner執行的Configuration   
  5. Configuration conf = getConf();   
  6.             
  7. // 使用conf建立JobConf   
  8. JobConf job = new JobConf(conf, MyApp.class);   
  9.         
  10. // 執行客戶端參數   
  11. Path in = new Path(args[1]);   
  12. Path out = new Path(args[2]);   
  13.             
  14. // 指定job相關的參數        
  15. job.setJobName("my-app");   
  16. job.setInputPath(in);   
  17. job.setOutputPath(out);   
  18. job.setMapperClass(MyApp.MyMapper.class);   
  19. job.setReducerClass(MyApp.MyReducer.class);   
  20. *   
  21. // 提交job,然后監視進度直到job完成   
  22. JobClient.runJob(job);   
  23. }   
  24.           
  25.  public static void main(String[] args) throws Exception {   
  26. // 讓ToolRunner 處理命令行參數    
  27. int res = ToolRunner.run(new Configuration(), new Sort(), //這里封裝了GenericOptionsParser解析args   
  28.             
  29. System.exit(res);   
  30. }   
  31. }   

MultipleOutputFormat

自定義輸出文件名稱或者說名稱格式。在jobconf中setOutputFormat(MultipleOutputFormat的子類)就行了。而不是那種part-r-00000啥的了。。。并且可以分配結果到多個文件中。

MultipleOutputFormat繼承了FileOutputFormat, 允許將輸出數據寫進不同的輸出文件中。有三種應用場景:

a. 最少有一個reducer的mapreduce任務。這個reducer想要根據實際的key將輸出寫進不同的文件中。假設一個key編碼了實際的key和為實際的key指定的位置

b. 只有map的任務。這個任務想要把輸入文件或者輸入內容的部分名稱設為輸出文件名。

c. 只有map的任務。這個任務為輸出命名時,需要依賴keys和輸入文件名。 

  1. //這里是根據key生成多個文件的地方,可以看到還有value,name等參數   
  2. @Override   
  3. protected String generateFileNameForKeyValue(Text key,   
  4. IntWritable value, String name) {   
  5. char c = key.toString().toLowerCase().charAt(0);   
  6. if (c >= 'a' && c <= 'z') {   
  7. return c + ".txt";   
  8. }   
  9. return "result.txt";   
  10. }   

DistributedCache

在集群中快速分發大的只讀文件。DistributedCache是MR用來緩存app需要的諸如text,archive,jar等的文件的。app通過jobconf中的url來指定需要緩存的文件。它會假定指定的這個文件已經在url指定的對應位置上了。在job在node上執行之前,DistributedCache會copy必要的文件到這個slave node。它的功效就是為每個job只copy一次,而且copy到指定位置,能夠自動解壓縮。

DistributedCache可以用來分發簡單的只讀文件,或者一些復雜的例如archive,jar文件等。archive文件會自動解壓縮,而jar文件會被自動放置到任務的classpath中(lib)。分發壓縮archive時,可以指定解壓名稱如:dict.zip#dict。這樣就會解壓到dict中,否則默認是dict.zip中。

文件是有執行權限的。用戶可以選擇在任務的工作目錄下建立指向DistributedCache的軟鏈接。

  1. DistributedCache.createSymlink(conf);     
  2. DistributedCache.addCacheFile(new Path("hdfs://host:port/absolute-path#link-name").toUri(), conf);      

DistributedCache.createSymlink(Configuration)方法讓DistributedCache 在當前工作目錄下創建到緩存文件的符號鏈接。則在task的當前工作目錄會有link-name的鏈接,相當于快捷方法,鏈接到expr.txt文件,在setup方法使用的情況則要簡單許多。或者通過設置配置文件屬性mapred.create.symlink為yes。 分布式緩存會截取URI的片段作為鏈接的名字。 例如,URI是 hdfs://namenode:port/lib.so.1#lib.so, 則在task當前工作目錄會有名為lib.so的鏈接, 它會鏈接分布式緩存中的lib.so.1#p#

DistributedCache會跟蹤修改緩存文件的timestamp。

下面是使用的例子, 為應用app設置緩存

  1. $ bin/hadoop fs -copyFromLocal lookup.dat /myapp/lookup.dat     
  2. $ bin/hadoop fs -copyFromLocal map.zip /myapp/map.zip     
  3. $ bin/hadoop fs -copyFromLocal mylib.jar /myapp/mylib.jar   
  4. $ bin/hadoop fs -copyFromLocal mytar.tar /myapp/mytar.tar   
  5. $ bin/hadoop fs -copyFromLocal mytgz.tgz /myapp/mytgz.tgz   
  6. $ bin/hadoop fs -copyFromLocal mytargz.tar.gz /myapp/mytargz.tar.gz   

2. 設置app的jobConf:

  1. JobConf job = new JobConf();   
  2. DistributedCache.addCacheFile(new URI("/myapp/lookup.dat#lookup.dat"),    
  3.  job);   
  4. DistributedCache.addCacheArchive(new URI("/myapp/map.zip", job);  
  5. DistributedCache.addFileToClassPath(new Path("/myapp/mylib.jar"), job);   
  6. DistributedCache.addCacheArchive(new URI("/myapp/mytar.tar", job);   
  7. DistributedCache.addCacheArchive(new URI("/myapp/mytgz.tgz", job);   
  8. DistributedCache.addCacheArchive(new URI("/myapp/mytargz.tar.gz", job);   

3. 在mapper或者reducer中使用緩存文件:

  1. public static class MapClass extends MapReduceBase     
  2. implements Mapper<K, V, K, V> {   
  3.      
  4. private Path[] localArchives;   
  5. private Path[] localFiles;   
  6.            
  7. public void configure(JobConf job) {   
  8. // 得到剛剛緩存的文件   
  9. localArchives = DistributedCache.getLocalCacheArchives(job);   
  10. localFiles = DistributedCache.getLocalCacheFiles(job);   
  11. }   
  12.            
  13. public void map(K key, V value,    
  14.  OutputCollector<K, V>; output, Reporter reporter)    
  15. throws IOException {   
  16. // 使用緩存文件   
  17. // ...   
  18. // ...   
  19. output.collect(k, v);   
  20. }   
  21. }   

它跟GenericOptionsParser的部分功能有異曲同工之妙。

PathFilter + 通配符。accept(Path path)篩選path是否通過。

NullWritable

不想輸出的時候,把它當做key。NullWritable是Writable的一個特殊類,序列化的長度為0,實現方法為空實現,不從數據流中讀數據,也不寫入數據,只充當占位符,如在MapReduce中,如果你不需要使用鍵或值,你就可以將鍵或值聲明為NullWritable,NullWritable是一個不可變的單實例類型。

FileInputFormat繼承于InputFormat

InputFormat的作用:

驗證輸入規范;

切分輸入文件為InputSpilts;

提供RecordReader來收集InputSplit中的輸入記錄,給Mapper進行執行。

RecordReader

將面向字節的InputSplit轉換為面向記錄的視圖,供Mapper或者Reducer使用運行。因此假定處理記錄的責任界限,為任務呈現key-value。

SequenceFile:

SequenceFile是包含二進制kv的扁平文件(序列化)。它提供Writer、Reader、Sorter來進行寫、讀、排序功能。基于CompressionType,SequenceFile有三種對于kv的壓縮方式:

●Writer:不壓縮records;

RecordCompressWriter: 只壓縮values;

BlockCompressWriter: 壓縮records,keys和values都被分開壓縮在block中,block的大小可以配置;

壓縮方式由合適的CompressionCodec指定。推薦使用此類的靜態方法createWriter來選擇格式。Reader作為橋接可以讀取以上任何一種壓縮格式。

CompressionCodec:

封裝了關于流式壓縮/解壓縮的相關方法。

Mapper

Mapper 將輸入的kv對映射成中間數據kv對集合。Maps 將輸入記錄轉變為中間記錄,其中被轉化后的記錄不必和輸入記錄類型相同。一個給定的輸入對可以映射為0或者多個輸出對。

在MRJob執行過程中,MapReduce框架根據提前指定的InputFormat(輸入格式對象)產生InputSplit(輸入分片),而每個InputSplit將會由一個map任務處理。

總起來講,Mapper實現類通過JobConfigurable.configure(JobConf)方法傳入JobConf對象來初始化,然后在每個map任務中調用map(WritableComparable,Writable,OutputCollector,Reporter)方法處理InputSplit的每個kv對。MR應用可以覆蓋Closeable.close方法去處理一些必須的清理工作。

輸出對不一定和輸入對類型相同。一個給定的輸入對可能映射成0或者很多的輸出對。輸出對是框架通過調用OutputCollector.colect(WritableComparable,Writable)得到。

MR應用可以使用Reporter匯報進度,設置應用層級的狀態信息,更新計數器或者只是顯示應用處于運行狀態等。

所有和給定的輸出key關聯的中間數據都會隨后被框架分組處理,并傳給Reducer處理以產生最終的輸出。用戶可以通過JobConf.setOutputKeyComparatorClass(Class)指定一個Comparator控制分組處理過程。

Mapper輸出都被排序后根據Reducer數量進行分區,分區數量等于reduce任務數量。用戶可以通過實現自定義的Partitioner來控制哪些keys(記錄)到哪個Reducer中去。

此外,用戶還可以指定一個Combiner,調用JobConf.setCombinerClass(Class)來實現。這個可以來對map輸出做本地的聚合,有助于減少從mapper到reducer的數據量。

經過排序的中間輸出數據通常以一種簡單的格式(key-len,key,value-len,value)存儲在SequenceFile中。應用可以決定是否或者怎樣被壓縮以及壓縮格式,可以通過JobConf來指定CompressionCodec.

如果job沒有reducer,那么mapper的輸出結果會不經過分組排序,直接寫進FileSystem.#p#

Map數

通常map數由輸入數據總大小決定,也就是所有輸入文件的blocks數目決定。

每個節點并行的運行的map數正常在10到100個。由于Map任務初始化本身需要一段時間所以map運行時間至少在1分鐘為好。

如此,如果有10T的數據文件,每個block大小128M,***使用為82000map數,除非使用setNumMapTasks(int)(這個方法僅僅對MR框架提供一個建議值)將map數值設置到更高。

Reducer

Reducer根據key將中間數據集合處理合并為更小的數據結果集。

用戶可以通過JobConf.setNumReduceTasks(int)設置作業的reducer數目。

整體而言,Reducer實現類通過JobConfigurable.configure(JobConf)方法將JobConf對象傳入,并為Job設置和初始化Reducer。MR框架調用 reduce(WritableComparable, Iterator, OutputCollector, Reporter) 來處理以key被分組的輸入數據。應用可以覆蓋Closeable.close()處理必要的清理操作。

Reducer由三個主要階段組成:shuffle,sort,reduce。

❈shuffle

 

輸入到Reducer的輸入數據是Mapper已經排過序的數據.在shuffle階段,根據partition算法獲取相關的mapper地址,并通過Http協議將mapper的相應輸出數據由reducer拉取到reducer機器上處理。

sort

 

框架在這個階段會根據key對reducer的輸入進行分組(因為不同的mapper輸出的數據中可能含有相同的key)。

shuffle和sort是同時進行的,同時reducer仍然在拉取map的輸出。

Secondary Sort

 

如果對中間數據key進行分組的規則和在處理化簡階段前對key分組規則不一致時,可以通過 JobConf.setOutputValueGroupingComparator(Class)設置一個Comparator。因為中間數據的分組策略是通過 JobConf.setOutputKeyComparatorClass(Class) 設置的,可以控制中間數據根據哪些key進行分組。而JobConf.setOutputValueGroupingComparator(Class)則可用于在數據連接情況下對value進行二次排序。

Reduce(化簡)

這個階段框架循環調用 reduce(WritableComparable, Iterator, OutputCollector, Reporter) 方法處理被分組的每個kv對。

reduce 任務一般通過 OutputCollector.collect(WritableComparable, Writable)將輸出數據寫入文件系統FileSystem。應用可以使用Reporter匯報作業執行進度、設置應用層級的狀態信息并更新計數器(Counter),或者只是提示作業在運行。

注意,Reducer的輸出不會再進行排序。

Reducer數目

合適的reducer數目可以這樣估算:(節點數目mapred.tasktracker.reduce.tasks.maximum)乘以0.95 或 乘以1.75。因子為0.95時,當所有map任務完成時所有reducer可以立即啟動,并開始從map機器上拉取數據。因子為1.75時,最快的一些節點將完成***輪reduce處理,此時框架開始啟動第二輪reduce任務,這樣可以達到比較好的作業負載均衡。提高reduce數目會增加框架的運行負擔,但有利于提升作業的負載均衡并降低失敗的成本。上述的因子使用***在作業執行時框架仍然有reduce槽為前提,畢竟框架還需要對作業進行可能的推測執行和失敗任務的處理。

不使用Reducer

如果不需要進行化簡處理,可以將reduce數目設為0。這種情況下,map的輸出會直接寫入到文件系統。輸出路徑通過setOutputPath(Path)指定。框架在寫入數據到文件系統之前不再對map結果進行排序。

Partitioner

Partitioner對數據按照key進行分區,從而控制map的輸出傳輸到哪個reducer上。默認的Partitioner算法是hash(哈希。分區數目由作業的reducer數目決定。HashPartitioner 是默認的Partitioner。

Reporter

Reporter為MR應用提供了進度報告、應用狀態信息設置,和計數器(Counter)更新等功能.

Mapper和Reducer實現可以使用Reporter匯報進度或者提示作業在正常運行。在一些場景下,應用在處理一些特殊的kv對時耗費了過多時間,這個可能會因為框架假定任務超時而強制停止了這些作業。為避免該情況,可以設置mapred.task.timeout 為一個比較高的值或者將其設置為0以避免超時發生。

應用也可以使用Reporter來更新計數(Counter)。

OutputCollector

OutputCollector是MR框架提供的通用工具來收集Mapper或者Reducer輸出數據(中間數據或者最終結果數據)。

Hadoop MapReduce提供了一些經常使用的mapper、reducer和partioner的實現類供我們進行學習。

 

以上有關configuration和job的部分在新的API中有所改變,簡單說就是在Mapper和Reducer中引入了MapContext和ReduceContext,它們封裝了configuration和outputcollector,以及reporter。

責任編輯:守望幸福 來源: 51CTO.com
相關推薦

2017-04-19 11:17:48

SparkHadoopMapReduce

2010-06-07 13:35:16

Hadoop簡介

2010-06-03 16:32:09

Hadoop MapR

2014-12-29 09:59:03

Spark 1.2MapReduce

2010-06-03 16:18:07

Hadoop MapR

2013-11-27 09:21:18

YARNMapReduceHadoop

2013-04-24 10:47:48

Hadoop集群

2014-11-10 15:02:21

大數據云計算Hadoop

2013-01-21 13:22:56

IBMdW

2019-10-31 09:52:02

HadoopJava大數據

2014-10-15 16:32:43

MapReducehadoop

2012-04-23 10:30:38

Hadoop

2010-06-07 11:12:52

Hadoop-0.20

2014-03-18 10:19:55

Hadoop部署hadoop集群腳本

2014-01-07 14:04:13

HadoopMapReduce

2013-12-17 09:52:13

pythonhadoopmapreduce

2010-06-07 11:30:24

Hadoop源代碼

2024-06-03 10:07:22

Vector類元素向量

2012-05-09 09:13:29

IDCHadoopMapReduce

2015-07-01 13:51:12

HadoopMapReduce數據分析
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 本地毛片 | 国产精品视频导航 | 色网在线观看 | 国产成人a亚洲精品 | 97人人草 | 精品国产乱码久久久久久影片 | 精久久久 | 久久久久久久久久久久91 | 在线播放国产一区二区三区 | 国产精品久久久久久久久久久久久 | 亚洲永久精品国产 | 久久精品无码一区二区三区 | 亚洲视频二区 | 91视频官网 | 成人欧美一区二区 | 久久精品国产亚洲 | 在线视频国产一区 | 黄色一级免费看 | 依人成人| 欧美精品在线一区二区三区 | 国产一在线观看 | 中文字幕在线不卡播放 | 国产在线视频一区二区董小宛性色 | a级在线免费视频 | 高清国产午夜精品久久久久久 | 亚洲成人高清 | 综合色影院 | 91免费在线 | 午夜影院视频在线观看 | 在线观看视频中文字幕 | 日韩在线欧美 | 午夜国产羞羞视频免费网站 | 阿v视频在线观看 | 91精品国产综合久久婷婷香蕉 | 午夜免费网 | 99久久免费精品国产免费高清 | 亚洲成人国产综合 | 91高清在线视频 | 亚洲美女在线一区 | 亚洲欧美一区二区三区国产精品 | 日韩视频一区 |