用Hadoop和MapReduce進行大數據分析
如此大規模的數據一度僅限于大企業、學校和政府機構 — 這些機構有能力購買昂貴的超級計算機、能夠雇用員工保障其運行。今天,由于存儲成本的降低和處理能力的商品化,一些小公司,甚至個人都可以存儲和挖掘同樣的數據,推動新一輪的應用程序創新。
大數據革命技術之一是MapReduce,一個編程模式,是Google針對大規模、分布式數據而開發的。在本文中,我將介紹Apache的開源MapReduce實現、Hadoop,也有人將其稱之為云計算的殺手應用程序。
關于Hadoop
Apache的Hadoop框架本質上是一個用于分析大數據集的機制,不一定位于數據存儲中。Hadoop提取出了MapReduce的大規模數據分析引擎,更易于開發人員理解。Hadoop可以擴展到無數個節點,可以處理所有活動和相關數據存儲的協調。
Hadoop的眾多特性和配置使其成為一個十分有用且功能強大的框架,其用途和功能令人驚訝。Yahoo!以及其他許多組織已經找到了一個高效機制來分析成堆的字節數。在單個節點上運行Hadoop也很容易;您所需要的只是一些需要分析的數據,以及熟悉一般的Java代碼。Hadoop也可和 uby、Python以及C++一起使用。
作為處理大數據集的概念框架,MapReduce對于使用許多計算機來解決分布式問題而言是高度優化的。顧名思義,這個框架由兩個函數構成。map 函數專用于獲取大數據輸入,并將其分成小片段,然后交由其他進程進行操作。reduce函數整理map收集的各個回應,然后顯示最后的輸出。
在Hadoop中,您可以通過擴展Hadoop自身的基類來定義map和reduce實現。實現和輸入輸出格式被一個指定它們的配置聯系在一起。 Hadoop非常適合處理包含結構數據的大型文件。Hadoop可以對輸入文件進行原始解析,這一點特別有用,這樣您就可以每次處理一行。定義一個map 函數實際上只是一個關于確定您從即將輸入的文本行中捕獲什么內容的問題。
數據,無處不在的數據!
美國政府產生大量數據,只有一部分是普通民眾所感興趣的。各種政府機構免費發布關于US經濟健康狀況和更改社會人口統計資料的數據。U.S. Geological Survey (USGS)發布國內外地震數據。
世界各地每天都有很多個小型地震發生。其中大多數發生在地殼深處,沒有人能感覺到,盡管如此,但是監聽站仍然會進行記錄。USGS以CSV(或逗號分隔值)文件的格式發布每周地震數據。
每周文件平均不是很大 — 只有大約100KB左右。但是,它可以作為學習Hadoop的基礎。記住,Hadoop有能力處理更 大的數據集。
跟蹤震動
我近期從USGS網站下載的CSV文件有大約920多行。如 清單 1 所示:
清單 1.一個USGS地震數據文件的行數統計
- $> wc -l eqs7day-M1.txt
- 920 eqs7day-M1.txt
CVS文件內容如清單2所示(這是前兩行):
清單 2. CVS文件的前兩行
- $> head -n 2 eqs7day-M1.txt
- Src,Eqid,Version,Datetime,Lat,Lon,Magnitude,Depth,NST,Region
- ci,14896484,2,"Sunday, December 12, 2010 23:23:20 UTC",33.3040,-116.4130,1.0,11.70,22,
- "Southern California"
這就是我稱之為信息豐富 的文件,尤其是當您想到它總共有920行記錄時。然而我只想知道在該文件報告的這一周內每一天有多少次地震發生。我想知道在這7天內哪個區域是地震頻發區。
我第一個想到的就是使用簡單的grep命令來搜索每天的地震數。看看這個文件,我發現數據記錄是從12月12開始的。因此我對該字符串執行了一次grep-c,其結果如清單3所示:
清單 3.12月12有多少次地震發生?
- $> grep -c 'December 12' eqs7day-M1.txt
- 98
安裝Hadoop如果您之前沒有安裝Hadoop,那么現在就裝。第一步,下載最新版二進制文件,解壓,然后在您的路徑上設置Hadoop的bin目錄。完成這些您就可以直接執行hadoop命令了。使用Hadoop要求您執行它的hadoop命令,而不是像您所見到的那樣調用java命令。您可以向 hadoop命令傳選項,諸如在哪里可以找到您的Java二進制文件(例如,表示您的map和reduce實現)。在我的示例中,我創建了一個jar文件,告訴Hadoop我想在我的jar文件內運行哪個任務。我也向Hadoop類路徑添加了一些運行我的應用程序所需的附加二進制文件。
現在,我知道在12月12日有98條記錄,也就是說有98次地震。我只能沿著這條記錄向下,對12月10日的記錄執行一次grep,接著是 11 號,等等。這聽起來有點乏味。更糟糕的是,我還需要知道在該文件中的是哪幾天。我確實不關心這些,甚至有時候我可能無法獲取該信息。事實上,我只想知道在七天這樣一個時間段內任何一天的地震次數,使用Hadoop我就可以很容易的獲取這一信息。
Hadoop只需要幾條信息就可以回答我的第一個和第二個問題:即,要處理哪條輸入以及如何處理map和reduce。我也必須提供了一個可以將每件事都聯系起來的作業。在我開始處理這些代碼之前,我需要花點時間確定我的CSV數據整齊有序。
使用opencsv進行數據解析
除了地震CSV文件的第一行之外,第一行是文件頭,每一行都是一系列逗號分隔數據值。我只對數據的3個部分感興趣:日期、地點和震級。為了獲取這些資料,我將使用一個很棒的開源庫opencsv,它將會幫助我分析CSV文件。
作為一個測試優先的工具,我首先編寫一個快捷JUnit測試,確認我可以從CSV文件的一個樣例行獲取的我所需要的信息,如清單 4 所示:
清單 4. 解析一個CSV行
- public class CSVProcessingTest {
- private final String LINE = "ci,14897012,2,\"Monday, December 13, 2010 " +
- "14:10:32 UTC\",33.0290,-115." +
- "5388,1.9,15.70,41,\"Southern California\"";
- @Test
- public void testReadingOneLine() throws Exception {
- String[] lines = new CSVParser().parseLine(LINE);
- assertEquals("should be Monday, December 13, 2010 14:10:32 UTC",
- "Monday, December 13, 2010 14:10:32 UTC", lines[3]);
- assertEquals("should be Southern California",
- "Southern California", lines[9]);
- assertEquals("should be 1.9", "1.9", lines[6]);
- }
- }
正如您在清單4中所看到的,opencsv處理逗號分隔值非常容易。該解析器僅返回一組String,所以有可能獲取位置信息(別忘了,在 Java語言中數組和集合的訪問是從零開始的)。
轉換日期格式
當使用MapReduce進行處理時,map函數的任務是選擇一些要處理的值,以及一些鍵。這就是說,map主要處理和返回兩個元素:一個鍵和一個值。回到我之前的需求,我首先想知道每天會發生多少次地震。因此,當我在分析地震文件時,我將發布兩個值:鍵是日期,值是一個計數器。reduce函數將對計數器(只是一些值為1的整數)進行總計。因此,提供給我的是在目標地震文件中某一個日期出現的次數。
由于我只對24小時時段內的信息感興趣,我得剔除每個文件中的日期的時間部分。在 清單5中,我編寫了一個快速測試,驗證如何將一個傳入文件中的特定日期信息轉換成一個更一般的24小時日期:
清單 5.日期格式轉換
- @Test
- public void testParsingDate() throws Exception {
- String datest = "Monday, December 13, 2010 14:10:32 UTC";
- SimpleDateFormat formatter = new SimpleDateFormat("EEEEE, MMMMM dd, yyyy HH:mm:ss Z");
- Date dt = formatter.parse(datest);
- formatter.applyPattern("dd-MM-yyyy");
- String dtstr = formatter.format(dt);
- assertEquals("should be 13-12-2010", "13-12-2010", dtstr);
- }
在清單5中,我使用了SimpleDateFormat Java對象,將CSV文件中格式為Monday, December 13, 2010 14:10:32 UTC的日期String轉換成了更一般的13-12-2010。
Hadoop的map和reduce
現在我已經找到了處理CSV文件以及其日期格式的解決方法。我要開始在Hadoop中實施我的map和reduce函數了。這個過程需要理解 Java 泛型,因為 Hadoop 選擇使用顯式類型,為了安全起見。
當我使用 Hadoop 定義一個映射實現時,我只擴展Hadoop的Mapper類。然后我可以使用泛型來為傳出鍵和值指定顯式類。類型子句也指定了傳入鍵和值,這對于讀取文件分別是字節數和文本行數。
EarthQuakesPerDateMapper 類擴展了Hadoop的Mapper對象。它顯式地將其輸出鍵指定為一個Text對象,將其值指定為一個IntWritable,這是一個Hadoop特定類,實質上是一個整數。還要注意,class子句的前兩個類型是LongWritable和Text,分別是字節數和文本行數。
由于類定義中的類型子句,我將傳入map方法的參數類型設置為在context.write子句內帶有該方法的輸出。如果我想指定其他內容,將會出現一個編譯器問題,或Hadoop將輸出一個錯誤消息,描述類型不匹配的消息。
清單 6.一個映射實現清單 6 中的map實現比較簡單:本質上是,Hadoop為在輸入文件中找到的每一行文本調用這個類。為了避免除了CSV頭部,首先檢查是否字節數(key 對象)為零。然后執行清單4和5中的步驟:捕獲傳入日期,進行轉換,然后設置為傳出鍵。我也提供了一個數:1。就是說,我為每個日期編寫一個計數器,當 reduce實現被調用時,獲取一個鍵和一系列值。在本例中,鍵是日期及其值,如 清單7所示:
清單 7.一個 map 輸出和 reduce 輸入的邏輯視圖
以下是引用片段: |
注意,context.write(new Text(dtstr), new IntWritable(1))(在清單6中)構建了如 清單7所示的邏輯集合。正如您所了解的,context是一個保存各種信息的Hadoop數據結構。context被傳遞到reduce實現,reduce獲取這些值為1的值然后總和起來。因此,一個 reduce 實現邏輯上創建如 清單8所示的數據結構:
清單 8.一個reduce輸出視圖
以下是引用片段: |
我的reduce實現如 清單9所示。與Hadoop的Mapper一樣,Reducer被參數化了:前兩個參數是傳入的鍵類型(Text)和值類型(IntWritable),后兩個參數是輸出類型:鍵和值,這在本例中是相同的。
清單 9.reduce實現
- public class EarthQuakesPerDateReducer extends Reducer<Text, IntWritable, Text,
- IntWritable> {
- @Override
- protected void reduce(Text key, Iterable<IntWritable> values, Context context)
- throws IOException, InterruptedException {
- int count = 0;
- for (IntWritable value : values) {
- count++;
- }
- context.write(key, new IntWritable(count));
- }
- }
我的reduce實現非常簡單。正如我在清單7中所指出的,傳入的是實際上是一個值的集合,在本例中是1的集合,我所做的就是將它們加起來,然后寫出一個新鍵值對表示日期和次數。我的 reduce 代碼可以挑出您在清單8中所見到的這幾行。邏輯流程看起來像這樣:
以下是引用片段: |
當然,這個清單的抽象形式是map -> reduce。
定義一個Hadoop Job
現在我已經對我的map和reduce實現進行了編碼,接下來所要做的是將所有這一切鏈接到一個Hadoop Job。定義一個Job比較簡單:您需要提供輸入和輸出、map和reduce實現(如清單6和清單9所示)以及輸出類型。在本例中我的輸出類型和 reduce 實現所用的是同一個類型。
清單 10. 一個將map和redece綁在一起的Job
- public class EarthQuakesPerDayJob {
- public static void main(String[] args) throws Throwable {
- Job job = new Job();
- job.setJarByClass(EarthQuakesPerDayJob.class);
- FileInputFormat.addInputPath(job, new Path(args[0]));
- FileOutputFormat.setOutputPath(job, new Path(args[1]));
- job.setMapperClass(EarthQuakesPerDateMapper.class);
- job.setReducerClass(EarthQuakesPerDateReducer.class);
- job.setOutputKeyClass(Text.class);
- job.setOutputValueClass(IntWritable.class);
- System.exit(job.waitForCompletion(true) ? 0 : 1);
- }
- }
在清單10中,我使用一個main方法將所有這一切綁在一起,該方法有兩個參數:地震CSV文件的目錄,以及生成報告的輸出目錄(Hadoop 更喜歡創建該目錄)。
為了執行這個小框架,我需要將這些類打包。我還需要告知 Hadoop 在哪里可以找到opencsv二進制文件。然后可以通過命令行執行Hadoop,如 清單11所示:
清單 11.執行 Hadoop
- $> export HADOOP_CLASSPATH=lib/opencsv-2.2.jar
- $> hadoop jar target/quake.jar com.b50.hadoop.quake.EarthQuakesPerDayJob
- ~/temp/mreduce/in/ ~/temp/mreduce/out
運行這些代碼,Hadoop開始運行時您將可以看到一堆文本在屏幕上一閃而過。我所用的CSV文件相比專門用于處理這種情況的Hadoop,那真是小巫見大巫!hadoop應該可以在幾秒鐘內完成,具體取決于您的處理功能。
完成這些后,您可以使用任何編輯器查看輸出文件內容。還可以選擇直接使用hadoop命令。正如 清單12所示:
清單 12.讀取Hadoop輸出
以下是引用片段: |
如果您像我一樣,在清單12中首先會注意到的就是每天地震數—12月9日就有178次地震。希望您也會注意到Hadoop實現了我所想要的:整齊地列出我的研究范圍內每天的地震次數。
編寫另一個Mapper
接下來,我想找到地震發生在哪里,以及如何快速計算出在我的研究范圍內記錄地震次數最多的是哪個區域。當然,您已經猜到了,Hadoop可以輕松地做到。在這個案例中,鍵不再是日期而是區域。因此,我編寫了一個新的Mapper類。
清單 13.一個新的map實現
- public class EarthQuakeLocationMapper extends Mapper<LongWritable, Text, Text,
- IntWritable> {
- @Override
- protected void map(LongWritable key, Text value, Context context) throws IOException,
- InterruptedException {
- if (key.get() > 0) {
- String[] lines = new CSVParser().parseLine(value.toString());
- context.write(new Text(lines[9]), new IntWritable(1));
- }
- }
- }
和之前獲取日期然后進行轉換相比,在清單13中我所作的是獲取位置,這是CSV陣列中的最后一個條目。
相比一個龐大的位置和數字列表,我將結果限制在那些7天內出現10次的區域。
清單 14.哪里的地震較多?
- public class EarthQuakeLocationReducer extends
- Reducer<Text, IntWritable, Text, IntWritable> { @Override protected void
- reduce(Text key, Iterable<IntWritable> values, Context context) throws
- IOException, InterruptedException { int count = 0; for (IntWritable value :
- values) { count++; } if (count >= 10) { context.write(key, new
- IntWritable(count)); } } }
清單14中的代碼和清單9中的代碼非常類似;然而,在本例中,我限制了輸出大于或等于10。接下來,我將map和reduce,以及其他 Job 實現綁在一起,進行打包,然后和平常一樣執行Hadoop獲取我的新答案。
使用hadoop dfs目錄顯示我所請求的新值:
清單 15.地震區域分布
以下是引用片段: |
從清單15還可以得到什么?首先,北美洲西海岸,從墨西哥到阿拉斯加是地震高發區。其次,阿肯色州明顯位于斷帶層上,這是我沒有意識到的。最后,如果您居住在北部或者是南加州(很多軟件開發人員都居住于此),您周圍的地方每隔 13 分鐘會震動一次。
結束語
使用Hadoop分析數據輕松且高效,對于它對數據分析所提供的支持,我只是了解皮毛而已。Hadoop的設計旨在以一種分布式方式運行,處理運行 map和reduce的各個節點之間的協調性。作為示例,本文中我只在一個JVM上運行Hadoop,該JVM僅有一個無足輕重的文件。
Hadoop本身是一個功能強大的工具,圍繞它還有一個完整的、不斷擴展的生態系統,可以提供子項目至基于云計算的Hadoop服務。Hadoop生態系統演示了項目背后豐富的社區活動。來自社區的許多工具證實了大數據分析作為一個全球業務活動的可行性。有了Hadoop,分布式數據挖掘和分析對所有軟件創新者和企業家都是可用的,包括但不限于Google和Yahoo!這類大企業。
【編輯推薦】