欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

利用HDFS怎么實現(xiàn)多文件Join操作

本篇文章為大家展示了利用HDFS怎么實現(xiàn)多文件Join操作,內(nèi)容簡明扼要并且容易理解,絕對能使你眼前一亮,通過這篇文章的詳細介紹希望你能有所收獲。

鄞州網(wǎng)站制作公司哪家好,找成都創(chuàng)新互聯(lián)公司!從網(wǎng)頁設(shè)計、網(wǎng)站建設(shè)、微信開發(fā)、APP開發(fā)、成都響應(yīng)式網(wǎng)站建設(shè)公司等網(wǎng)站項目制作,到程序開發(fā),運營維護。成都創(chuàng)新互聯(lián)公司于2013年成立到現(xiàn)在10年的時間,我們擁有了豐富的建站經(jīng)驗和運維經(jīng)驗,來保證我們的工作的順利進行。專注于網(wǎng)站建設(shè)就選成都創(chuàng)新互聯(lián)公司

詳解HDFS多文件Join操作的實例

最近在做HDFS文件處理之時,遇到了多文件Join操作,其中包括:All Join以及常用的Left Join操作,

下面是個簡單的例子;采用兩個表來做left join其中數(shù)據(jù)結(jié)構(gòu)如下:

A 文件:

a|1b|2|c

B文件:

a|b|1|2|c

即:A文件中的第一、二列與B文件中的第一、三列對應(yīng);類似數(shù)據(jù)庫中Table的主鍵/外鍵

代碼如下:

import java.io.DataInput;
import java.io.DataOutput;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;


import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.contrib.utils.join.DataJoinMapperBase;
import org.apache.hadoop.contrib.utils.join.DataJoinReducerBase;
import org.apache.hadoop.contrib.utils.join.TaggedMapOutput;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileOutputFormat;
import org.apache.hadoop.mapred.JobClient;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.TextInputFormat;
import org.apache.hadoop.mapred.TextOutputFormat;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.hadoop.util.Tool;
import org.apache.hadoop.util.ToolRunner;


import cn.eshore.traffic.hadoop.util.CommUtil;
import cn.eshore.traffic.hadoop.util.StringUtil;




/**
 * @ClassName: DataJoin
 * @Description: HDFS JOIN操作
 * @author hadoop
 * @date 2012-12-18 下午5:51:32
 */
public class InstallJoin extends Configured implements Tool {
private String static enSplitCode = "\\|";
private String static splitCode = "|";


// 自定義Reducer
public static class ReduceClass extends DataJoinReducerBase {


@Override
protected TaggedMapOutput combine(Object[] tags, Object[] values) {
String joinedStr = "";
//該段判斷用戶生成Left join限制【其中tags表示文件的路徑,install表示文件名稱前綴】
//去掉則為All Join
if (tags.length == 1 && tags[0].toString().contains("install")) {
return null;
}

Map<String, String> map = new HashMap<String, String>();
for (int i = 0; i < values.length; i++) {
TaggedWritable tw = (TaggedWritable) values[i];
String line = ((Text) tw.getData()).toString();


String[] tokens = line.split(enSplitCode, 8);
String groupValue = tokens[6];

String type = tokens[7];

map.put(type, groupValue);
}

joinedStr += StringUtil.getCount(map.get("7"))+"|"+StringUtil.getCount(map.get("30"));
TaggedWritable retv = new TaggedWritable(new Text(joinedStr));
retv.setTag((Text) tags[0]);
return retv;
}
}


// 自定義Mapper
public static class MapClass extends DataJoinMapperBase {


//自定義Key【類似數(shù)據(jù)庫中的主鍵/外鍵】
@Override
protected Text generateGroupKey(TaggedMapOutput aRecord) {
String line = ((Text) aRecord.getData()).toString();
String[] tokens = line.split(CommUtil.enSplitCode);


String key = "";
String type = tokens[7];
//由于不同文件中的Key所在列有可能不同,所以需要動態(tài)生成Key,其中type為不同文件中的數(shù)據(jù)標(biāo)識;如:A文件最后一列為a用于表示此數(shù)據(jù)為A文件數(shù)據(jù)
if ("7".equals(type)) {
key = tokens[0]+"|"+tokens[1];
}else if ("30".equals(type)) {
key = tokens[0]+"|"+tokens[2];
}
return new Text(key);
}


@Override
protected Text generateInputTag(String inputFile) {
return new Text(inputFile);
}


@Override
protected TaggedMapOutput generateTaggedMapOutput(Object value) {
TaggedWritable retv = new TaggedWritable((Text) value);
retv.setTag(this.inputTag);
return retv;
}


}


public static class TaggedWritable extends TaggedMapOutput {


private Writable data;


// 自定義
public TaggedWritable() {
this.tag = new Text("");
}


public TaggedWritable(Writable data) {
this.tag = new Text("");
this.data = data;
}


@Override
public Writable getData() {
return data;
}


@Override
public void write(DataOutput out) throws IOException {
this.tag.write(out);
out.writeUTF(this.data.getClass().getName());
this.data.write(out);
}


@Override
public void readFields(DataInput in) throws IOException {
this.tag.readFields(in);
String dataClz = in.readUTF();
if (this.data == null
|| !this.data.getClass().getName().equals(dataClz)) {
try {
this.data = (Writable) ReflectionUtils.newInstance(
Class.forName(dataClz), null);
} catch (ClassNotFoundException e) {
e.printStackTrace();
}
}
this.data.readFields(in);
}


}


/**
* job運行
*/
@Override
public int run(String[] paths) throws Exception {
int no = 0;
try {
Configuration conf = getConf();
JobConf job = new JobConf(conf, InstallJoin.class);
FileInputFormat.setInputPaths(job, new Path(paths[0]));
FileOutputFormat.setOutputPath(job, new Path(paths[1]));
job.setJobName("join_data_test");
job.setMapperClass(MapClass.class);
job.setReducerClass(ReduceClass.class);
job.setInputFormat(TextInputFormat.class);
job.setOutputFormat(TextOutputFormat.class);
job.setOutputKeyClass(Text.class);
job.setOutputValueClass(TaggedWritable.class);
job.set("mapred.textoutputformat.separator", CommUtil.splitCode);
JobClient.runJob(job);
no = 1;
} catch (Exception e) {
throw new Exception();
}
return no;
}


//測試
public static void main(String[] args) {
String[] paths = {
"hdfs://master...:9000/home/hadoop/traffic/join/newtype",
"hdfs://master...:9000/home/hadoop/traffic/join/newtype/output" }

int res = 0;
try {
res = ToolRunner.run(new Configuration(), new InstallJoin(), paths);
} catch (Exception e) {
e.printStackTrace();
}
System.exit(res);
}
}

上述內(nèi)容就是利用HDFS怎么實現(xiàn)多文件Join操作,你們學(xué)到知識或技能了嗎?如果還想學(xué)到更多技能或者豐富自己的知識儲備,歡迎關(guān)注創(chuàng)新互聯(lián)行業(yè)資訊頻道。

網(wǎng)站標(biāo)題:利用HDFS怎么實現(xiàn)多文件Join操作
鏈接URL:http://aaarwkj.com/article30/iihdpo.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供定制網(wǎng)站、Google靜態(tài)網(wǎng)站、品牌網(wǎng)站設(shè)計網(wǎng)站設(shè)計公司、網(wǎng)站設(shè)計

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都定制網(wǎng)站網(wǎng)頁設(shè)計
日韩人成理论午夜福利| 日本人妻成人免费大片| 日韩av天堂在线观看| 久久中文字幕人妻熟av| 日韩成人三级一区二区| 中文字幕一区二区三区不卡日日| 中文字幕伦理一区二区| 蜜臀久久精品国产综合| 久久91亚洲精品中文字幕| 午夜性色福利在线播放| 欧美丝袜熟女日韩亚洲| 蜜桃视频中文字幕二区三区| 亚洲高清无毛一区二区| 一区二区人妻乳中文字幕| 亚洲欧美半夜激情一区二区| 凹凸国产精品熟女视频| 不卡av免费在线网址| 日韩毛片中文字幕在线观看| 白虎亚洲福利精品一区| 婷婷六月开心激情五月| 日本一区二区电影在线看| 91成人精品永久在线观看| 欧美日韩黄色的三级视频| 国产三级精品电影久久| 国产黄色片子在线观看| 国产在线精品91系列| 高清日本一区二区三区不卡片 | 自拍一区日韩二区欧美三区| 日韩不卡区高清在线视频| 国内传媒视频免费观看| 成人性生交大片免费看中文 | 国产一区国产二区中文字幕| 熟女肥臀一区二区三区| 欧美电影剧情av在线| 国产乱来视频在线观看| 91久久精品国产一区| 成人av免费高清在线| 午夜精品三级一区二区三区| 青青草网站在线观看视频| 日韩天堂视频在线播放| 亚洲伦理在线一区二区|