成人免费xxxxx在线视频软件_久久精品久久久_亚洲国产精品久久久_天天色天天色_亚洲人成一区_欧美一级欧美三级在线观看

利用 MapReduce分析明星微博數據實戰

大數據
互聯網的飛速發展本身也造就了一批互聯網明星,這些人借助新的手段,最大程度發揮了粉絲經濟的能量和作用,在互聯網時代賺得盆滿缽滿。正是基于這樣一個大背景,今天我們做一個分析明星微博數據的小項目

互聯網時代的到來,使得名人的形象變得更加鮮活,也拉近了明星和粉絲之間的距離。歌星、影星、體育明星、作家等名人通過互聯網能夠輕易實現和粉絲的互動,賺錢也變得***的簡單。同時,互聯網的飛速發展本身也造就了一批互聯網明星,這些人借助新的手段,***程度發揮了粉絲經濟的能量和作用,在互聯網時代賺得盆滿缽滿。

正是基于這樣一個大背景,今天我們做一個分析明星微博數據的小項目。

1、項目需求

自定義輸入格式,將明星微博數據排序后按粉絲數關注數 微博數分別輸出到不同文件中。

2、數據集

明星 明星微博名稱 粉絲數 關注數 微博數

俞灝明 俞灝明 10591367 206 558

李敏鎬 李敏鎬 22898071 11 268

林心如 林心如 57488649 214 5940

黃曉明 黃曉明 22616497 506 2011

張靚穎 張靚穎 27878708 238 3846

李娜 李娜 23309493 81 631

徐小平 徐小平 11659926 1929 13795

唐嫣 唐嫣 24301532 200 2391

有斐君 有斐君 8779383 577 4251

3、分析

自定義InputFormat讀取明星微博數據,通過自定義getSortedHashtableByValue方法分別對明星的fan、followers、microblogs數據進行排序,然后利用MultipleOutputs輸出不同項到不同的文件中

4、實現

1)、定義WeiBo實體類,實現WritableComparable接口

  1. package com.buaa; 
  2.  
  3. import java.io.DataInput; 
  4. import java.io.DataOutput; 
  5. import java.io.IOException; 
  6.  
  7. import org.apache.hadoop.io.WritableComparable; 
  8.  
  9. /**  
  10. * @ProjectName MicroblogStar 
  11. * @PackageName com.buaa 
  12. * @ClassName WeiBo 
  13. * @Description TODO 
  14. * @Author 劉吉超 
  15. * @Date 2016-05-07 14:54:29 
  16. */ 
  17. public class WeiBo implements WritableComparable<Object> { 
  18.     // 粉絲 
  19.     private int fan; 
  20.     // 關注 
  21.     private int followers; 
  22.     // 微博數 
  23.     private int microblogs; 
  24.      
  25.     public WeiBo(){}; 
  26.      
  27.     public WeiBo(int fan,int followers,int microblogs){ 
  28.         this.fan = fan; 
  29.         this.followers = followers; 
  30.         this.microblogs = microblogs; 
  31.     } 
  32.      
  33.     public void set(int fan,int followers,int microblogs){ 
  34.         this.fan = fan; 
  35.         this.followers = followers; 
  36.         this.microblogs = microblogs; 
  37.     } 
  38.      
  39.     // 實現WritableComparable的readFields()方法,以便該數據能被序列化后完成網絡傳輸或文件輸入 
  40.     @Override 
  41.     public void readFields(DataInput inthrows IOException { 
  42.         fan  = in.readInt(); 
  43.         followers = in.readInt(); 
  44.         microblogs = in.readInt(); 
  45.     } 
  46.      
  47.     // 實現WritableComparable的write()方法,以便該數據能被序列化后完成網絡傳輸或文件輸出  
  48.     @Override 
  49.     public void write(DataOutput out) throws IOException { 
  50.         out.writeInt(fan); 
  51.         out.writeInt(followers); 
  52.         out.writeInt(microblogs); 
  53.     } 
  54.      
  55.     @Override 
  56.     public int compareTo(Object o) { 
  57.         // TODO Auto-generated method stub 
  58.         return 0; 
  59.     } 
  60.  
  61.     public int getFan() { 
  62.         return fan; 
  63.     } 
  64.  
  65.     public void setFan(int fan) { 
  66.         this.fan = fan; 
  67.     } 
  68.  
  69.     public int getFollowers() { 
  70.         return followers; 
  71.     } 
  72.  
  73.     public void setFollowers(int followers) { 
  74.         this.followers = followers; 
  75.     } 
  76.  
  77.     public int getMicroblogs() { 
  78.         return microblogs; 
  79.     } 
  80.  
  81.     public void setMicroblogs(int microblogs) { 
  82.         this.microblogs = microblogs; 
  83.     } 

2)、自定義WeiboInputFormat,繼承FileInputFormat抽象類

  1. package com.buaa; 
  2.  
  3. import java.io.IOException; 
  4.  
  5. import org.apache.hadoop.conf.Configuration; 
  6. import org.apache.hadoop.fs.FSDataInputStream; 
  7. import org.apache.hadoop.fs.FileSystem; 
  8. import org.apache.hadoop.fs.Path; 
  9. import org.apache.hadoop.io.Text; 
  10. import org.apache.hadoop.mapreduce.InputSplit; 
  11. import org.apache.hadoop.mapreduce.RecordReader; 
  12. import org.apache.hadoop.mapreduce.TaskAttemptContext; 
  13. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
  14. import org.apache.hadoop.mapreduce.lib.input.FileSplit; 
  15. import org.apache.hadoop.util.LineReader; 
  16.  
  17. /**  
  18. * @ProjectName MicroblogStar 
  19. * @PackageName com.buaa 
  20. * @ClassName WeiboInputFormat 
  21. * @Description TODO 
  22. * @Author 劉吉超 
  23. * @Date 2016-05-07 10:23:28 
  24. */ 
  25. public class WeiboInputFormat extends FileInputFormat<Text,WeiBo>{ 
  26.  
  27.      @Override 
  28.      public RecordReader<Text, WeiBo> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException { 
  29.           // 自定義WeiboRecordReader類,按行讀取 
  30.           return new WeiboRecordReader(); 
  31.      } 
  32.  
  33.      public class WeiboRecordReader extends RecordReader<Text, WeiBo>{ 
  34.             public LineReader in;  
  35.             // 聲明key類型 
  36.             public Text lineKey = new Text(); 
  37.             // 聲明 value類型 
  38.             public WeiBo lineValue = new WeiBo(); 
  39.              
  40.             @Override 
  41.             public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException { 
  42.                 // 獲取split 
  43.                 FileSplit split = (FileSplit)input; 
  44.                 // 獲取配置  
  45.                 Configuration job = context.getConfiguration(); 
  46.                 // 分片路徑  
  47.                 Path file = split.getPath(); 
  48.                  
  49.                 FileSystem fs = file.getFileSystem(job);  
  50.                 // 打開文件    
  51.                 FSDataInputStream filein = fs.open(file); 
  52.                  
  53.                 in = new LineReader(filein,job);  
  54.             } 
  55.  
  56.             @Override 
  57.             public boolean nextKeyValue() throws IOException, InterruptedException { 
  58.                 // 一行數據 
  59.                 Text line = new Text(); 
  60.                  
  61.                 int linesize = in.readLine(line); 
  62.                  
  63.                 if(linesize == 0)  
  64.                     return false;  
  65.                  
  66.                 // 通過分隔符'\t',將每行的數據解析成數組 
  67.                 String[] pieces = line.toString().split("\t"); 
  68.                  
  69.                 if(pieces.length != 5){   
  70.                     throw new IOException("Invalid record received");   
  71.                 }  
  72.                  
  73.                 int a,b,c; 
  74.                 try{   
  75.                     // 粉絲   
  76.                     a = Integer.parseInt(pieces[2].trim()); 
  77.                     // 關注 
  78.                     b = Integer.parseInt(pieces[3].trim()); 
  79.                     // 微博數 
  80.                     c = Integer.parseInt(pieces[4].trim()); 
  81.                 }catch(NumberFormatException nfe){   
  82.                     throw new IOException("Error parsing floating poing value in record");   
  83.                 } 
  84.                  
  85.                 //自定義key和value值 
  86.                 lineKey.set(pieces[0]);   
  87.                 lineValue.set(a, b, c); 
  88.                  
  89.                 return true
  90.             } 
  91.              
  92.             @Override 
  93.             public void close() throws IOException { 
  94.                 if(in != null){ 
  95.                     in.close(); 
  96.                 } 
  97.             } 
  98.  
  99.             @Override 
  100.             public Text getCurrentKey() throws IOException, InterruptedException { 
  101.                 return lineKey; 
  102.             } 
  103.  
  104.             @Override 
  105.             public WeiBo getCurrentValue() throws IOException, InterruptedException { 
  106.                 return lineValue; 
  107.             } 
  108.  
  109.             @Override 
  110.             public float getProgress() throws IOException, InterruptedException { 
  111.                 return 0; 
  112.             } 
  113.              
  114.         } 

3)、編寫mr程序

  1. package com.buaa; 
  2.  
  3. import java.io.IOException; 
  4. import java.util.Arrays; 
  5. import java.util.Comparator; 
  6. import java.util.HashMap; 
  7. import java.util.Map; 
  8. import java.util.Map.Entry; 
  9.  
  10. import org.apache.hadoop.conf.Configuration; 
  11. import org.apache.hadoop.conf.Configured; 
  12. import org.apache.hadoop.fs.FileSystem; 
  13. import org.apache.hadoop.fs.Path; 
  14. import org.apache.hadoop.io.IntWritable; 
  15. import org.apache.hadoop.io.Text; 
  16. import org.apache.hadoop.mapreduce.Job; 
  17. import org.apache.hadoop.mapreduce.Mapper; 
  18. import org.apache.hadoop.mapreduce.Reducer; 
  19. import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; 
  20. import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; 
  21. import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; 
  22. import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; 
  23. import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; 
  24. import org.apache.hadoop.util.Tool; 
  25. import org.apache.hadoop.util.ToolRunner; 
  26.  
  27. /**  
  28. * @ProjectName MicroblogStar 
  29. * @PackageName com.buaa 
  30. * @ClassName WeiboCount 
  31. * @Description TODO 
  32. * @Author 劉吉超 
  33. * @Date 2016-05-07 09:07:36 
  34. */ 
  35. public class WeiboCount extends Configured implements Tool { 
  36.     // tab分隔符 
  37.     private static String TAB_SEPARATOR = "\t"
  38.     // 粉絲 
  39.     private static String FAN = "fan"
  40.     // 關注 
  41.     private static String FOLLOWERS = "followers"
  42.     // 微博數 
  43.     private static String MICROBLOGS = "microblogs"
  44.      
  45.     public static class WeiBoMapper extends Mapper<Text, WeiBo, Text, Text> { 
  46.         @Override 
  47.         protected void map(Text key, WeiBo value, Context context) throws IOException, InterruptedException { 
  48.             // 粉絲 
  49.             context.write(new Text(FAN), new Text(key.toString() + TAB_SEPARATOR + value.getFan())); 
  50.             // 關注 
  51.             context.write(new Text(FOLLOWERS), new Text(key.toString() + TAB_SEPARATOR + value.getFollowers())); 
  52.             // 微博數 
  53.             context.write(new Text(MICROBLOGS), new Text(key.toString() + TAB_SEPARATOR + value.getMicroblogs())); 
  54.         } 
  55.     } 
  56.      
  57.     public static class WeiBoReducer extends Reducer<Text, Text, Text, IntWritable> { 
  58.         private MultipleOutputs<Text, IntWritable> mos; 
  59.  
  60.         protected void setup(Context context) throws IOException, InterruptedException { 
  61.             mos = new MultipleOutputs<Text, IntWritable>(context); 
  62.         } 
  63.  
  64.         protected void reduce(Text Key, Iterable<Text> Values,Context context) throws IOException, InterruptedException { 
  65.             Map<String,Integer> map = new HashMap< String,Integer>(); 
  66.              
  67.             for(Text value : Values){ 
  68.                 // value = 名稱 + (粉絲數 或 關注數 或 微博數) 
  69.                 String[] records = value.toString().split(TAB_SEPARATOR); 
  70.                 map.put(records[0], Integer.parseInt(records[1].toString())); 
  71.             } 
  72.              
  73.             // 對Map內的數據進行排序 
  74.             Map.Entry<String, Integer>[] entries = getSortedHashtableByValue(map); 
  75.              
  76.             for(int i = 0; i < entries.length;i++){ 
  77.                 mos.write(Key.toString(),entries[i].getKey(), entries[i].getValue()); 
  78.             }                
  79.         } 
  80.  
  81.         protected void cleanup(Context context) throws IOException, InterruptedException { 
  82.             mos.close(); 
  83.         } 
  84.     } 
  85.      
  86.     @SuppressWarnings("deprecation"
  87.     @Override 
  88.     public int run(String[] args) throws Exception { 
  89.         // 配置文件對象 
  90.         Configuration conf = new Configuration(); 
  91.          
  92.         // 判斷路徑是否存在,如果存在,則刪除 
  93.         Path mypath = new Path(args[1]); 
  94.         FileSystem hdfs = mypath.getFileSystem(conf); 
  95.         if (hdfs.isDirectory(mypath)) { 
  96.             hdfs.delete(mypath, true); 
  97.         } 
  98.          
  99.         // 構造任務 
  100.         Job job = new Job(conf, "weibo"); 
  101.         // 主類 
  102.         job.setJarByClass(WeiboCount.class); 
  103.  
  104.         // Mapper 
  105.         job.setMapperClass(WeiBoMapper.class); 
  106.         // Mapper key輸出類型 
  107.         job.setMapOutputKeyClass(Text.class); 
  108.         // Mapper value輸出類型 
  109.         job.setMapOutputValueClass(Text.class); 
  110.          
  111.         // Reducer 
  112.         job.setReducerClass(WeiBoReducer.class); 
  113.         // Reducer key輸出類型 
  114.         job.setOutputKeyClass(Text.class); 
  115.         // Reducer value輸出類型 
  116.         job.setOutputValueClass(IntWritable.class); 
  117.          
  118.         // 輸入路徑 
  119.         FileInputFormat.addInputPath(job, new Path(args[0])); 
  120.         // 輸出路徑 
  121.         FileOutputFormat.setOutputPath(job, new Path(args[1])); 
  122.          
  123.         // 自定義輸入格式 
  124.         job.setInputFormatClass(WeiboInputFormat.class) ; 
  125.         //自定義文件輸出類別 
  126.         MultipleOutputs.addNamedOutput(job, FAN, TextOutputFormat.class, Text.class, IntWritable.class); 
  127.         MultipleOutputs.addNamedOutput(job, FOLLOWERS, TextOutputFormat.class, Text.class, IntWritable.class); 
  128.         MultipleOutputs.addNamedOutput(job, MICROBLOGS, TextOutputFormat.class, Text.class, IntWritable.class); 
  129.          
  130.         // 去掉job設置outputFormatClass,改為通過LazyOutputFormat設置   
  131.         LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);  
  132.          
  133.          //提交任務   
  134.         return job.waitForCompletion(true)?0:1; 
  135.     } 
  136.      
  137.     // 對Map內的數據進行排序(只適合小數據量) 
  138.     @SuppressWarnings("unchecked"
  139.     public static Entry<String, Integer>[] getSortedHashtableByValue(Map<String, Integer> h) {   
  140.         Entry<String, Integer>[] entries = (Entry<String, Integer>[]) h.entrySet().toArray(new Entry[0]);   
  141.         // 排序 
  142.         Arrays.sort(entries, new Comparator<Entry<String, Integer>>() { 
  143.             public int compare(Entry<String, Integer> entry1, Entry<String, Integer> entry2) { 
  144.                 return entry2.getValue().compareTo(entry1.getValue()); 
  145.             }  
  146.         }); 
  147.         return entries;   
  148.     } 
  149.      
  150.     public static void main(String[] args) throws Exception { 
  151.         String[] args0 = { 
  152.                 "hdfs://ljc:9000/buaa/microblog/weibo.txt"
  153.                 "hdfs://ljc:9000/buaa/microblog/out/"  
  154.         }; 
  155.         int ec = ToolRunner.run(new Configuration(), new WeiboCount(), args0); 
  156.         System.exit(ec); 
  157.     } 

5、運行結果

 

責任編輯:Ophira 來源: 博客園
相關推薦

2011-12-20 09:54:43

微博

2012-02-07 16:20:55

訊飛語音

2024-10-14 14:19:02

2013-04-27 13:55:34

大數據全球技術峰會

2017-03-13 09:48:26

pysparkhive數據

2013-04-23 14:36:54

2016-04-06 10:02:23

手機微博運維監控

2011-08-30 14:48:02

2013-09-13 13:35:41

微淘微信微博

2012-06-14 10:22:21

網易微博HTML5開發

2021-11-09 09:46:09

ScrapyPython爬蟲

2021-11-08 14:38:50

框架Scrapy 爬蟲

2012-06-18 14:22:09

HTML5

2020-02-20 10:45:51

Python數據疾病

2014-10-15 16:32:43

MapReducehadoop

2009-04-17 10:07:42

2015-11-24 09:43:37

微博Docker混合云

2011-04-21 15:25:48

微博管理方案發帖審計

2011-04-21 14:47:52

微博管理方案用戶識別

2011-10-21 09:43:28

Python
點贊
收藏

51CTO技術棧公眾號

主站蜘蛛池模板: 日韩h | 欧美极品在线播放 | 欧美一区二区二区 | 91精品国产综合久久久久 | 性国产xxxx乳高跟 | 99精品国产一区二区青青牛奶 | 日本精品一区二区三区视频 | 国产精品久久久久久久久婷婷 | 国产 日韩 欧美 中文 在线播放 | 欧美视频网 | 欧美一区二区免费在线 | 精品毛片| 日本精品一区二区三区视频 | 精品一二区 | 国产99久久久国产精品 | 国产精品国色综合久久 | 国产精品影视 | 久久尤物免费一区二区三区 | 亚洲欧美日韩久久久 | 国产精品96久久久久久 | 午夜精品一区二区三区在线观看 | 久草青青 | 亚洲一区中文 | 精品国产乱码久久久久久丨区2区 | 精品国产一区二区三区日日嗨 | 欧美日韩精品一区 | 国产成人精品福利 | 91视频大全 | 中文字幕在线一区 | h视频在线免费 | 久久精品一级 | 青青草在线视频免费观看 | 国产精品精品久久久 | 国产一区二区免费在线 | 欧美一级淫片免费视频黄 | 午夜免费观看体验区 | www.天堂av.com | 欧美黄页| 亚洲欧美日韩精品久久亚洲区 | 综合色播 | 古装三级在线播放 |