欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

如何利用MapReduce分析明星微博數(shù)據(jù)

這篇文章主要介紹了如何利用MapReduce分析明星微博數(shù)據(jù),具有一定借鑒價(jià)值,感興趣的朋友可以參考下,希望大家閱讀完這篇文章之后大有收獲,下面讓小編帶著大家一起了解一下。

保靖網(wǎng)站建設(shè)公司創(chuàng)新互聯(lián)公司,保靖網(wǎng)站設(shè)計(jì)制作,有大型網(wǎng)站制作公司豐富經(jīng)驗(yàn)。已為保靖千余家提供企業(yè)網(wǎng)站建設(shè)服務(wù)。企業(yè)網(wǎng)站搭建\外貿(mào)網(wǎng)站制作要多少錢(qián),請(qǐng)找那個(gè)售后服務(wù)好的保靖做網(wǎng)站的公司定做!

1、項(xiàng)目需求

自定義輸入格式,將明星微博數(shù)據(jù)排序后按粉絲數(shù)關(guān)注數(shù) 微博數(shù)分別輸出到不同文件中。

2、數(shù)據(jù)集

明星 明星微博名稱 粉絲數(shù) 關(guān)注數(shù) 微博數(shù)

俞灝明 俞灝明 10591367 206 558

李敏鎬 李敏鎬 22898071 11 268

林心如 林心如 57488649 214 5940

黃曉明 黃曉明 22616497 506 2011

張靚穎 張靚穎 27878708 238 3846

李娜 李娜 23309493 81 631

徐小平 徐小平 11659926 1929 13795

唐嫣 唐嫣 24301532 200 2391

有斐君 有斐君 8779383 577 4251

3、分析

自定義InputFormat讀取明星微博數(shù)據(jù),通過(guò)自定義getSortedHashtableByValue方法分別對(duì)明星的fan、followers、microblogs數(shù)據(jù)進(jìn)行排序,然后利用MultipleOutputs輸出不同項(xiàng)到不同的文件中

4、實(shí)現(xiàn)

1)、定義WeiBo實(shí)體類,實(shí)現(xiàn)WritableComparable接口

package com.buaa;  import java.io.DataInput; import java.io.DataOutput; import java.io.IOException;  import org.apache.hadoop.io.WritableComparable;  /**  * @ProjectName MicroblogStar * @PackageName com.buaa * @ClassName WeiBo * @Description TODO * @Author 劉吉超 * @Date 2016-05-07 14:54:29 */ public class WeiBo implements WritableComparable<Object> {     // 粉絲     private int fan;     // 關(guān)注     private int followers;     // 微博數(shù)     private int microblogs;          public WeiBo(){};          public WeiBo(int fan,int followers,int microblogs){         this.fan = fan;         this.followers = followers;         this.microblogs = microblogs;     }          public void set(int fan,int followers,int microblogs){         this.fan = fan;         this.followers = followers;         this.microblogs = microblogs;     }          // 實(shí)現(xiàn)WritableComparable的readFields()方法,以便該數(shù)據(jù)能被序列化后完成網(wǎng)絡(luò)傳輸或文件輸入     @Override     public void readFields(DataInput in) throws IOException {         fan  = in.readInt();         followers = in.readInt();         microblogs = in.readInt();     }          // 實(shí)現(xiàn)WritableComparable的write()方法,以便該數(shù)據(jù)能被序列化后完成網(wǎng)絡(luò)傳輸或文件輸出      @Override     public void write(DataOutput out) throws IOException {         out.writeInt(fan);         out.writeInt(followers);         out.writeInt(microblogs);     }          @Override     public int compareTo(Object o) {         // TODO Auto-generated method stub         return 0;     }      public int getFan() {         return fan;     }      public void setFan(int fan) {         this.fan = fan;     }      public int getFollowers() {         return followers;     }      public void setFollowers(int followers) {         this.followers = followers;     }      public int getMicroblogs() {         return microblogs;     }      public void setMicroblogs(int microblogs) {         this.microblogs = microblogs;     } }

2)、自定義WeiboInputFormat,繼承FileInputFormat抽象類

package com.buaa;  import java.io.IOException;  import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.InputSplit; import org.apache.hadoop.mapreduce.RecordReader; import org.apache.hadoop.mapreduce.TaskAttemptContext; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.input.FileSplit; import org.apache.hadoop.util.LineReader;  /**  * @ProjectName MicroblogStar * @PackageName com.buaa * @ClassName WeiboInputFormat * @Description TODO * @Author 劉吉超 * @Date 2016-05-07 10:23:28 */ public class WeiboInputFormat extends FileInputFormat<Text,WeiBo>{       @Override      public RecordReader<Text, WeiBo> createRecordReader(InputSplit arg0, TaskAttemptContext arg1) throws IOException, InterruptedException {           // 自定義WeiboRecordReader類,按行讀取           return new WeiboRecordReader();      }       public class WeiboRecordReader extends RecordReader<Text, WeiBo>{             public LineReader in;              // 聲明key類型             public Text lineKey = new Text();             // 聲明 value類型             public WeiBo lineValue = new WeiBo();                          @Override             public void initialize(InputSplit input, TaskAttemptContext context) throws IOException, InterruptedException {                 // 獲取split                 FileSplit split = (FileSplit)input;                 // 獲取配置                  Configuration job = context.getConfiguration();                 // 分片路徑                  Path file = split.getPath();                                  FileSystem fs = file.getFileSystem(job);                  // 打開(kāi)文件                    FSDataInputStream filein = fs.open(file);                                  in = new LineReader(filein,job);              }              @Override             public boolean nextKeyValue() throws IOException, InterruptedException {                 // 一行數(shù)據(jù)                 Text line = new Text();                                  int linesize = in.readLine(line);                                  if(linesize == 0)                      return false;                                   // 通過(guò)分隔符'\t',將每行的數(shù)據(jù)解析成數(shù)組                 String[] pieces = line.toString().split("\t");                                  if(pieces.length != 5){                       throw new IOException("Invalid record received");                   }                                   int a,b,c;                 try{                       // 粉絲                       a = Integer.parseInt(pieces[2].trim());                     // 關(guān)注                     b = Integer.parseInt(pieces[3].trim());                     // 微博數(shù)                     c = Integer.parseInt(pieces[4].trim());                 }catch(NumberFormatException nfe){                       throw new IOException("Error parsing floating poing value in record");                   }                                  //自定義key和value值                 lineKey.set(pieces[0]);                   lineValue.set(a, b, c);                                  return true;             }                          @Override             public void close() throws IOException {                 if(in != null){                     in.close();                 }             }              @Override             public Text getCurrentKey() throws IOException, InterruptedException {                 return lineKey;             }              @Override             public WeiBo getCurrentValue() throws IOException, InterruptedException {                 return lineValue;             }              @Override             public float getProgress() throws IOException, InterruptedException {                 return 0;             }                      } }

3)、編寫(xiě)mr程序

package com.buaa;  import java.io.IOException; import java.util.Arrays; import java.util.Comparator; import java.util.HashMap; import java.util.Map; import java.util.Map.Entry;  import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configured; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IntWritable; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; import org.apache.hadoop.mapreduce.Mapper; import org.apache.hadoop.mapreduce.Reducer; import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat; import org.apache.hadoop.mapreduce.lib.output.LazyOutputFormat; import org.apache.hadoop.mapreduce.lib.output.MultipleOutputs; import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; import org.apache.hadoop.util.Tool; import org.apache.hadoop.util.ToolRunner;  /**  * @ProjectName MicroblogStar * @PackageName com.buaa * @ClassName WeiboCount * @Description TODO * @Author 劉吉超 * @Date 2016-05-07 09:07:36 */ public class WeiboCount extends Configured implements Tool {     // tab分隔符     private static String TAB_SEPARATOR = "\t";     // 粉絲     private static String FAN = "fan";     // 關(guān)注     private static String FOLLOWERS = "followers";     // 微博數(shù)     private static String MICROBLOGS = "microblogs";          public static class WeiBoMapper extends Mapper<Text, WeiBo, Text, Text> {         @Override         protected void map(Text key, WeiBo value, Context context) throws IOException, InterruptedException {             // 粉絲             context.write(new Text(FAN), new Text(key.toString() + TAB_SEPARATOR + value.getFan()));             // 關(guān)注             context.write(new Text(FOLLOWERS), new Text(key.toString() + TAB_SEPARATOR + value.getFollowers()));             // 微博數(shù)             context.write(new Text(MICROBLOGS), new Text(key.toString() + TAB_SEPARATOR + value.getMicroblogs()));         }     }          public static class WeiBoReducer extends Reducer<Text, Text, Text, IntWritable> {         private MultipleOutputs<Text, IntWritable> mos;          protected void setup(Context context) throws IOException, InterruptedException {             mos = new MultipleOutputs<Text, IntWritable>(context);         }          protected void reduce(Text Key, Iterable<Text> Values,Context context) throws IOException, InterruptedException {             Map<String,Integer> map = new HashMap< String,Integer>();                          for(Text value : Values){                 // value = 名稱 + (粉絲數(shù) 或 關(guān)注數(shù) 或 微博數(shù))                 String[] records = value.toString().split(TAB_SEPARATOR);                 map.put(records[0], Integer.parseInt(records[1].toString()));             }                          // 對(duì)Map內(nèi)的數(shù)據(jù)進(jìn)行排序             Map.Entry<String, Integer>[] entries = getSortedHashtableByValue(map);                          for(int i = 0; i < entries.length;i++){                 mos.write(Key.toString(),entries[i].getKey(), entries[i].getValue());             }                        }          protected void cleanup(Context context) throws IOException, InterruptedException {             mos.close();         }     }          @SuppressWarnings("deprecation")     @Override     public int run(String[] args) throws Exception {         // 配置文件對(duì)象         Configuration conf = new Configuration();                  // 判斷路徑是否存在,如果存在,則刪除         Path mypath = new Path(args[1]);         FileSystem hdfs = mypath.getFileSystem(conf);         if (hdfs.isDirectory(mypath)) {             hdfs.delete(mypath, true);         }                  // 構(gòu)造任務(wù)         Job job = new Job(conf, "weibo");         // 主類         job.setJarByClass(WeiboCount.class);          // Mapper         job.setMapperClass(WeiBoMapper.class);         // Mapper key輸出類型         job.setMapOutputKeyClass(Text.class);         // Mapper value輸出類型         job.setMapOutputValueClass(Text.class);                  // Reducer         job.setReducerClass(WeiBoReducer.class);         // Reducer key輸出類型         job.setOutputKeyClass(Text.class);         // Reducer value輸出類型         job.setOutputValueClass(IntWritable.class);                  // 輸入路徑         FileInputFormat.addInputPath(job, new Path(args[0]));         // 輸出路徑         FileOutputFormat.setOutputPath(job, new Path(args[1]));                  // 自定義輸入格式         job.setInputFormatClass(WeiboInputFormat.class) ;         //自定義文件輸出類別         MultipleOutputs.addNamedOutput(job, FAN, TextOutputFormat.class, Text.class, IntWritable.class);         MultipleOutputs.addNamedOutput(job, FOLLOWERS, TextOutputFormat.class, Text.class, IntWritable.class);         MultipleOutputs.addNamedOutput(job, MICROBLOGS, TextOutputFormat.class, Text.class, IntWritable.class);                  // 去掉job設(shè)置outputFormatClass,改為通過(guò)LazyOutputFormat設(shè)置           LazyOutputFormat.setOutputFormatClass(job, TextOutputFormat.class);                    //提交任務(wù)           return job.waitForCompletion(true)?0:1;     }          // 對(duì)Map內(nèi)的數(shù)據(jù)進(jìn)行排序(只適合小數(shù)據(jù)量)     @SuppressWarnings("unchecked")     public static Entry<String, Integer>[] getSortedHashtableByValue(Map<String, Integer> h) {           Entry<String, Integer>[] entries = (Entry<String, Integer>[]) h.entrySet().toArray(new Entry[0]);           // 排序         Arrays.sort(entries, new Comparator<Entry<String, Integer>>() {             public int compare(Entry<String, Integer> entry1, Entry<String, Integer> entry2) {                 return entry2.getValue().compareTo(entry1.getValue());             }          });         return entries;       }          public static void main(String[] args) throws Exception {         String[] args0 = {                 "hdfs://ljc:9000/buaa/microblog/weibo.txt",                 "hdfs://ljc:9000/buaa/microblog/out/"          };         int ec = ToolRunner.run(new Configuration(), new WeiboCount(), args0);         System.exit(ec);     } }

5、運(yùn)行結(jié)果

如何利用MapReduce分析明星微博數(shù)據(jù)

感謝你能夠認(rèn)真閱讀完這篇文章,希望小編分享的“如何利用MapReduce分析明星微博數(shù)據(jù)”這篇文章對(duì)大家有幫助,同時(shí)也希望大家多多支持創(chuàng)新互聯(lián),關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道,更多相關(guān)知識(shí)等著你來(lái)學(xué)習(xí)!

本文標(biāo)題:如何利用MapReduce分析明星微博數(shù)據(jù)
標(biāo)題鏈接:http://aaarwkj.com/article44/ihhjhe.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供標(biāo)簽優(yōu)化網(wǎng)站設(shè)計(jì)公司、外貿(mào)網(wǎng)站建設(shè)、定制開(kāi)發(fā)虛擬主機(jī)、企業(yè)網(wǎng)站制作

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請(qǐng)盡快告知,我們將會(huì)在第一時(shí)間刪除。文章觀點(diǎn)不代表本網(wǎng)站立場(chǎng),如需處理請(qǐng)聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時(shí)需注明來(lái)源: 創(chuàng)新互聯(lián)

營(yíng)銷(xiāo)型網(wǎng)站建設(shè)
亚洲熟妇人妻一区二区三区| 欧美日韩精品视频专区| 国产精品欧美色区福利在线| 国产欧美一区二区另类精品| 伊人性伊人情亚洲综合| 久久精品国产一区二区三区不卡| 日韩精品一区二区毛片| 亚洲av色男人天堂网| 日韩精品视频性色首页| 未满十八禁止观看免费观看| 国产av日韩精品一区二区三区| 欧美中文字幕在线精品| 国产剧情av一区在线观看| 国产精品不卡一不卡二| 国产午夜草莓视频在线观看| 亚洲伦理国产一国产二| 年轻的母亲韩国三级| 精品国产91久久粉嫩懂色| 四虎在线观看精品一区| 国产日韩欧在线视频| 日韩免费中文视频不卡| 日韩亚洲欧美国产另类| 日本在线视频精品一区| 在线免费观看视频97| 精品久久人妻中文字幕免费| 青青草日韩视频在线观看 | 亚洲三级黄片在线观看| 日韩精品福利片午夜免费| 精品亚洲韩国一区二区三区| 亚洲日本精品免费在线观看| 少妇太爽高潮在线播放| 日韩av熟女中文字幕| 国产夫妻性生活国产视频| 天天操天天干夜夜骑| 亚洲综合精品久久久一区| 成人福利午夜一区二区| 久久91超碰青草在哪里看| 亚洲午夜天堂精品福利天堂| 免费国产成人在线视频| 国产精品亚洲欧美日韩综合| 青青草免费公开视频久久|