目錄
創(chuàng)新互聯(lián)公司科技有限公司專業(yè)互聯(lián)網(wǎng)基礎(chǔ)服務商,為您提供綿陽服務器托管,高防主機,成都IDC機房托管,成都主機托管等互聯(lián)網(wǎng)服務。1、啟動Hadoop服務
2、創(chuàng)建文本文件
3、上傳文本文件
4、顯示文件內(nèi)容
5、完成排序任務
6、計算大利潤和平均利潤
7、統(tǒng)計學生總成績和平均成績
8、總結(jié)
1、啟動Hadoop服務在master
虛擬機上執(zhí)行命令:
start-all.sh
啟動hadoop服務進程
?
?
?
2、創(chuàng)建文本文件在master虛擬機上創(chuàng)建本地文件students.txt
李曉文 女 20
張曉航 男 19
鄭小剛 男 21
吳文華 女 18
肖云宇 男 22
陳燕文 女 19
李連杰 男 23
艾曉麗 女 21
童安格 男 18
?
3、上傳文本文件將students.txt
上傳到HDFS的/BigDtat
目錄
執(zhí)行命令將該文件復制到HDFS的HelloHadoop文件夾中
hdfs dfs -put /home/student.txt /BigData
?
webUI界面中查看上傳成功?
?
4、顯示文件內(nèi)容創(chuàng)建maven工程
?
創(chuàng)建maven工程并添加依賴
org.apache.hadoop
hadoop-client3.3.4 junit
junit 4.13.2
?
在?resources
目錄里創(chuàng)建?log4j.properties
文件
log4j.rootLogger=INFO, stdout, logfile
log4j.appender.stdout=org.apache.log4j.ConsoleAppender
log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
log4j.appender.stdout.layout.ConversionPattern=%d %p [%c] - %m%n
log4j.appender.logfile=org.apache.log4j.FileAppender
log4j.appender.logfile.File=target/wordcount.log
log4j.appender.logfile.layout=org.apache.log4j.PatternLayout
log4j.appender.logfile.layout.ConversionPattern=%d %p [%c] - %m%n
?
創(chuàng)建displayFile類用于顯示文件內(nèi)容
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.junit.Test;
import java.io.BufferedReader;
import java.io.InputStreamReader;
import java.net.URI;
public class displayFile {
@Test
public void read1() throws Exception {
// 創(chuàng)建配置對象
Configuration conf = new Configuration();
// 設(shè)置數(shù)據(jù)節(jié)點主機名屬性
conf.set("dfs.client.use.datanode.hostname", "true");
// 定義統(tǒng)一資源標識符(uri: uniform resource identifier)
String uri = "hdfs://master:9000";
// 創(chuàng)建文件系統(tǒng)對象(基于HDFS的文件系統(tǒng))
FileSystem fs = FileSystem.get(new URI(uri), conf, "root");
// 創(chuàng)建路徑對象(指向文件)
Path path = new Path(uri + "/BigData/student.txt");
System.out.println(path);
// 創(chuàng)建文件系統(tǒng)數(shù)據(jù)字節(jié)輸入流(進水管:數(shù)據(jù)從文件到程序)
FSDataInputStream in = fs.open(path);
// 創(chuàng)建緩沖字符輸入流,提高讀取效率(字節(jié)流-->字符流-->緩沖流)
BufferedReader br = new BufferedReader(new InputStreamReader(in));
// 定義行字符串變量
String nextLine = "";
// 通過循環(huán)遍歷緩沖字符輸入流
while ((nextLine = br.readLine()) != null) {
// 在控制臺輸出讀取的行
System.out.println(nextLine);
}
// 關(guān)閉緩沖字符輸入流
br.close();
// 關(guān)閉文件系統(tǒng)數(shù)據(jù)字節(jié)輸入流
in.close();
// 關(guān)閉文件系統(tǒng)
fs.close();
}
}
?
5、完成排序任務創(chuàng)建Maven項目SortByAge
,利用MapReduce計算框架,處理/BigData/student.txt
文件,輸出結(jié)果按照年齡降序排列
李曉文 女 20
張曉航 男 19
鄭小剛 男 21
吳文華 女 18
肖云宇 男 22
陳燕文 女 19
李連杰 男 23
艾曉麗 女 21
童安格 男 18
創(chuàng)建Student類
import org.apache.hadoop.io.WritableComparable;
import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
public class Student implements WritableComparable{
private String name;
private String gender;
private int age;
public String getName() {
return name;
}
public void setName(String name) {
this.name = name;
}
public String getGender() {
return gender;
}
public void setGender(String gender) {
this.gender = gender;
}
public int getAge() {
return age;
}
public void setAge(int age) {
this.age = age;
}
@Override
public String toString() {
return "Student{" +
"name='" + name + '\'' +
", gender='" + gender + '\'' +
", age=" + age + '\''+
'}';
}
public int compareTo(Student o) {
return o.getAge() - this.getAge(); // 降序
}
public void write(DataOutput out) throws IOException {
out.writeUTF(name);
out.writeUTF(gender);
out.writeInt(age);
}
public void readFields(DataInput in) throws IOException {
name = in.readUTF();
gender = in.readUTF();
age = in.readInt();
}
}
創(chuàng)建WordCountMapper類
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 獲取行內(nèi)容
String line = value.toString();
// 按空格拆分得到字段數(shù)組
String[] fields = line.split(" ");
// 獲取學生信息
String name = fields[0];
String gender = fields[1];
int age = Integer.parseInt(fields[2]);
// 創(chuàng)建學生對象
Student student = new Student();
// 設(shè)置學生對象屬性
student.setName(name);
student.setGender(gender);
student.setAge(age);
context.write(student, NullWritable.get());
}
}
創(chuàng)建WordCountReducer類
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
public class WordCountReducer extends Reducer{
@Override
protected void reduce(Student key, Iterablevalues, Context context)
throws IOException, InterruptedException {
for (NullWritable value : values) {
// 獲取學生對象
Student student = key;
// 拼接學生信息
String studentInfo = student.getName() + "\t"
+ student.getGender() + "\t"
+ student.getAge();
context.write(new Text(studentInfo), NullWritable.get());
}
}
}
創(chuàng)建WordCountDriver類
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.net.URI;
public class WordCountDriver {
public static void main(String[] args) throws Exception {
// 創(chuàng)建配置對象
Configuration conf = new Configuration();
// 設(shè)置數(shù)據(jù)節(jié)點主機名屬性
conf.set("dfs.client.use.datanode.hostname", "true");
// 獲取作業(yè)實例
Job job = Job.getInstance(conf);
// 設(shè)置作業(yè)啟動類
job.setJarByClass(WordCountDriver.class);
// 設(shè)置Mapper類
job.setMapperClass(WordCountMapper.class);
// 設(shè)置map任務輸出鍵類型
job.setMapOutputKeyClass(Student.class);
// 設(shè)置map任務輸出值類型
job.setMapOutputValueClass(NullWritable.class);
// 設(shè)置Reducer類
job.setReducerClass(WordCountReducer.class);
// 設(shè)置reduce任務輸出鍵類型
job.setOutputKeyClass(Student.class);
// 設(shè)置reduce任務輸出值類型
job.setOutputValueClass(NullWritable.class);
// 定義uri字符串
String uri = "hdfs://master:9000";
// 創(chuàng)建輸入目錄
Path inputPath = new Path(uri + "/BigData");
// 創(chuàng)建輸出目錄
Path outputPath = new Path(uri + "/output");
// 獲取文件系統(tǒng)
FileSystem fs = FileSystem.get(new URI(uri), conf);
// 刪除輸出目錄(第二個參數(shù)設(shè)置是否遞歸)
fs.delete(outputPath, true);
// 給作業(yè)添加輸入目錄(允許多個)
FileInputFormat.addInputPath(job, inputPath);
// 給作業(yè)設(shè)置輸出目錄(只能一個)
FileOutputFormat.setOutputPath(job, outputPath);
// 等待作業(yè)完成
job.waitForCompletion(true);
// 輸出統(tǒng)計結(jié)果
System.out.println("======統(tǒng)計結(jié)果======");
FileStatus[] fileStatuses = fs.listStatus(outputPath);
for (int i = 1; i< fileStatuses.length; i++) {
// 輸出結(jié)果文件路徑
System.out.println(fileStatuses[i].getPath());
// 獲取文件系統(tǒng)數(shù)據(jù)字節(jié)輸入流
FSDataInputStream in = fs.open(fileStatuses[i].getPath());
// 將結(jié)果文件顯示在控制臺
IOUtils.copyBytes(in, System.out, 4096, false);
}
}
}
運行查看結(jié)果
6、計算大利潤和平均利潤利用利用MapReduce計算框架 處理profit.txt文件,輸出每月大利潤和平均利潤
創(chuàng)建利潤信息profit.txt文件并上傳HDFS
1 10000
1 15000
1 20000
2 2340
2 5640
2 6140
3 15000
3 2380
3 8900
創(chuàng)建ScoreMapper類
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ScoreMapper extends Mapper{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 獲取行內(nèi)容
String line = value.toString();
// 按空格拆分得到字段數(shù)組
String[] fields = line.split(" ");
// 獲取月份
String name = fields[0].trim();
// 遍歷各利潤信息
for (int i = 1; i< fields.length; i++) {
// 獲取利潤信息
int score = Integer.parseInt(fields[i].trim());
// 寫入<月份,值>鍵值對
context.write(new Text(name), new IntWritable(score));
}
}
}
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class ScoreMapper extends Mapper{
@Override
protected void map(LongWritable key, Text value, Context context)
throws IOException, InterruptedException {
// 獲取行內(nèi)容
String line = value.toString();
// 按空格拆分得到字段數(shù)組
String[] fields = line.split(" ");
// 獲取月份
String name = fields[0].trim();
// 遍歷各利潤信息
for (int i = 1; i< fields.length; i++) {
// 獲取利潤信息
int score = Integer.parseInt(fields[i].trim());
// 寫入<月份,值>鍵值對
context.write(new Text(name), new IntWritable(score));
}
}
}
創(chuàng)建ScoreReducer類
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.text.DecimalFormat;
public class ScoreReducer extends Reducer{
@Override
protected void reduce(Text key, Iterablevalues, Context context)
throws IOException, InterruptedException {
// 聲明變量
int count = 0; // 科目數(shù)
int sum = 0; // 總分
int avg = 0; // 平均分
int max = 20000;
// 遍歷迭代器計算總分
for (IntWritable value : values) {
count++; // 科目數(shù)累加
sum += value.get(); // 總分累加
}
// 計算平均值
avg = sum * 1 / count;
// 創(chuàng)建小數(shù)格式對象
DecimalFormat df = new DecimalFormat("#.#");
// 拼接每個大利潤與平均利潤信息
String scoreInfo = key + " maxProfit=" + max + ", avgProfit=" + df.format(avg);
// 寫入鍵值對
context.write(new Text(scoreInfo), NullWritable.get());
}
}
創(chuàng)建ScoreDriver類
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.net.URI;
public class ScoreDriver {
public static void main(String[] args) throws Exception {
// 創(chuàng)建配置對象
Configuration conf = new Configuration();
// 設(shè)置數(shù)據(jù)節(jié)點主機名屬性
conf.set("dfs.client.use.datanode.hostname", "true");
// 獲取作業(yè)實例
Job job = Job.getInstance(conf);
// 設(shè)置作業(yè)啟動類
job.setJarByClass(ScoreDriver.class);
// 設(shè)置Mapper類
job.setMapperClass(ScoreMapper.class);
// 設(shè)置map任務輸出鍵類型
job.setMapOutputKeyClass(Text.class);
// 設(shè)置map任務輸出值類型
job.setMapOutputValueClass(IntWritable.class);
// 設(shè)置Reducer類
job.setReducerClass(ScoreReducer.class);
// 設(shè)置reduce任務輸出鍵類型
job.setOutputKeyClass(Text.class);
// 設(shè)置reduce任務輸出值類型
job.setOutputValueClass(NullWritable.class);
// 定義uri字符串
String uri = "hdfs://master:9000";
// 創(chuàng)建輸入目錄
Path inputPath = new Path(uri + "/BigData");
// 創(chuàng)建輸出目錄
Path outputPath = new Path(uri + "/maxavgprofit/output");
// 獲取文件系統(tǒng)
FileSystem fs = FileSystem.get(new URI(uri), conf);
// 刪除輸出目錄(第二個參數(shù)設(shè)置是否遞歸)
fs.delete(outputPath, true);
// 給作業(yè)添加輸入目錄(允許多個)
FileInputFormat.addInputPath(job, inputPath);
// 給作業(yè)設(shè)置輸出目錄(只能一個)
FileOutputFormat.setOutputPath(job, outputPath);
// 等待作業(yè)完成
job.waitForCompletion(true);
// 輸出統(tǒng)計結(jié)果
System.out.println("======統(tǒng)計結(jié)果======");
FileStatus[] fileStatuses = fs.listStatus(outputPath);
for (int i = 1; i< fileStatuses.length; i++) {
// 輸出結(jié)果文件路徑
System.out.println(fileStatuses[i].getPath());
// 獲取文件系統(tǒng)數(shù)據(jù)字節(jié)輸入流
FSDataInputStream in = fs.open(fileStatuses[i].getPath());
// 將結(jié)果文件顯示在控制臺
IOUtils.copyBytes(in, System.out, 4096, false);
}
}
}
運行查看結(jié)果
7、統(tǒng)計學生總成績和平均成績創(chuàng)建利潤信息score.txt文件并上傳HDFS
姓名 語文 數(shù)學 英語 物理 化學
李小雙 89 78 94 96 87
王麗霞 94 80 86 78
吳雨涵 90 67 95 92 60
張曉紅 87 76 90 79 59
陳燕文 97 95 92 88 86
創(chuàng)建WordCountMapper類
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Mapper;
import java.io.IOException;
public class WordCountMapper extends Mapper{
@Override
protected void map(LongWritable key, Text value, Mapper.Context context) throws IOException, InterruptedException {
//獲取行內(nèi)容
String line = value.toString();
//按空格拆分得到字段數(shù)組
String[] fields = line.split(" ");
//獲取字段信息
String name = fields[0];
for (int i = 1; i< fields.length; i++){
int score = Integer.parseInt(fields[i]);
context.write(new Text(name),new IntWritable(score));
}
}
}
創(chuàng)建WordCountReducer類
import org.apache.hadoop.io.IntWritable;
import org.apache.hadoop.io.NullWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Reducer;
import java.io.IOException;
import java.text.DecimalFormat;
public class WordCountReducer extends Reducer{
@Override
protected void reduce(Text key, Iterablevalues, Context context)
throws IOException, InterruptedException {
int count = 0;
int sum = 0;
double avg = 0;
for (IntWritable value : values){
count++;
sum += value.get();
}
avg = sum * 1.0 /count;
DecimalFormat df = new DecimalFormat("#.#");
String scoreInfo = "("+key+","+sum+","+df.format(avg)+")";
context.write(new Text(scoreInfo),NullWritable.get());
}
}
創(chuàng)建WordCountDriver類
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.*;
import org.apache.hadoop.mapreduce.Job;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
import java.net.URI;
public class WordCountDriver {
public static void main(String[] args) throws Exception {
// 創(chuàng)建配置對象
Configuration conf = new Configuration();
// 設(shè)置數(shù)據(jù)節(jié)點主機名屬性
conf.set("dfs.client.use.datanode.hostname", "true");
// 獲取作業(yè)實例
Job job = Job.getInstance(conf);
// 設(shè)置作業(yè)啟動類
job.setJarByClass(mpr.WordCountDriver.class);
// 設(shè)置Mapper類
job.setMapperClass(WordCountMapper.class);
// 設(shè)置map任務輸出鍵類型
job.setMapOutputKeyClass(Text.class);
// 設(shè)置map任務輸出值類型
job.setMapOutputValueClass(IntWritable.class);
//設(shè)置Reducer類
job.setReducerClass(WordCountReducer.class);
// 設(shè)置Reducer任務輸出鍵類型
job.setOutputKeyClass(Text.class);
// 設(shè)置Reducer任務輸出值類型
job.setOutputValueClass(NullWritable.class);
//設(shè)置分區(qū)數(shù)量
job.setNumReduceTasks(1);
// 定義uri字符串
String uri = "hdfs://master:9000";
// 創(chuàng)建輸入目錄
Path inputPath = new Path(uri + "/BigData");
// 創(chuàng)建輸出目錄
Path outputPath = new Path(uri + "/outputs");
// 獲取文件系統(tǒng)
FileSystem fs = FileSystem.get(new URI(uri), conf);
// 刪除輸出目錄(第二個參數(shù)設(shè)置是否遞歸)
fs.delete(outputPath, true);
// 給作業(yè)添加輸入目錄(允許多個)
FileInputFormat.addInputPath(job, inputPath);
// 給作業(yè)設(shè)置輸出目錄(只能一個)
FileOutputFormat.setOutputPath(job, outputPath);
// 等待作業(yè)完成
job.waitForCompletion(true);
// 輸出統(tǒng)計結(jié)果
System.out.println("======統(tǒng)計結(jié)果======");
FileStatus[] fileStatuses = fs.listStatus(outputPath);
for (int i = 1; i< fileStatuses.length; i++) {
// 輸出結(jié)果文件路徑
System.out.println(fileStatuses[i].getPath());
// 獲取文件系統(tǒng)數(shù)據(jù)字節(jié)輸入流
FSDataInputStream in = fs.open(fileStatuses[i].getPath());
// 將結(jié)果文件顯示在控制臺
IOUtils.copyBytes(in, System.out, 4096, false);
}
}
}
運行查看結(jié)果
8、總結(jié)通過實訓,使得更加熟練掌握HDFS操作和MapReduce編程
你是否還在尋找穩(wěn)定的海外服務器提供商?創(chuàng)新互聯(lián)www.cdcxhl.cn海外機房具備T級流量清洗系統(tǒng)配攻擊溯源,準確流量調(diào)度確保服務器高可用性,企業(yè)級服務器適合批量采購,新人活動首月15元起,快前往官網(wǎng)查看詳情吧
網(wǎng)頁題目:Hadoop實訓任務3:HDFS和MapReduce綜合操作-創(chuàng)新互聯(lián)
文章地址:http://aaarwkj.com/article6/dpgdog.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供品牌網(wǎng)站制作、網(wǎng)站設(shè)計公司、企業(yè)網(wǎng)站制作、品牌網(wǎng)站設(shè)計、品牌網(wǎng)站建設(shè)、虛擬主機
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)
猜你還喜歡下面的內(nèi)容