所需jar包
我們提供的服務(wù)有:成都網(wǎng)站建設(shè)、成都網(wǎng)站制作、微信公眾號開發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、順城ssl等。為1000多家企事業(yè)單位解決了網(wǎng)站和推廣的問題。提供周到的售前咨詢和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的順城網(wǎng)站制作公司
一、URL API操作方式
import java.io.InputStream; import java.net.URL; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.io.IOUtils; import org.junit.Test; public class HDFSUrlTest { /** * HDFS URL API操作方式 * 不需要讀取core-site.xml和hdfs-site.xml配置文件 */ // 讓JAVA程序識別HDFS的URL static { URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); } // 查看文件內(nèi)容 @Test public void testRead() throws Exception { InputStream in = null; // 文件路徑 String fileUrl = "hdfs://hadoop-master.dragon.org:9000/opt/data/test/01.data"; try { // 獲取文件輸入流 in = new URL(fileUrl).openStream(); // 將文件內(nèi)容讀取出來,打印控制臺 IOUtils.copyBytes(in, System.out, 4096, false); } finally { IOUtils.closeStream(in); } } }
二、通過FileSystem API操作HDFS
HDFS工具類
import java.io.IOException; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; public class HDFSUtils { /** * HDFS工具類 */ public static FileSystem getFileSystem() { //聲明FileSystem FileSystem hdfs=null; try { //獲取文件配置信息 Configuration conf =new Configuration(); //獲取文件系統(tǒng) hdfs=FileSystem.get(conf); } catch (IOException e) { e.printStackTrace(); } return hdfs; } }
常用操作實現(xiàn)類
import org.apache.hadoop.fs.BlockLocation; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hdfs.DistributedFileSystem; import org.apache.hadoop.hdfs.protocol.DatanodeInfo; import org.apache.hadoop.io.IOUtils; import org.apache.hadoop.mapred.gethistory_jsp; import org.junit.Test; public class HDFSFsTest { /** * * 通過FileSystem API操作HDFS */ // 讀取文件內(nèi)容 @Test public void testRead() throws Exception { // 獲取文件系統(tǒng) FileSystem hdfs = HDFSUtils.getFileSystem(); // 文件名稱 Path path = new Path("/opt/data/test/touch.data"); // 打開文件輸入流 FSDataInputStream inStream = hdfs.open(path); // 讀取文件到控制臺顯示 IOUtils.copyBytes(inStream, System.out, 4096, false); // 關(guān)閉流 IOUtils.closeStream(inStream); } // 查看目錄 @Test public void testList() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); // 文件名稱 Path path = new Path("/opt/data"); FileStatus[] fileStatus = hdfs.listStatus(path); for (FileStatus file : fileStatus) { Path p = file.getPath(); String info = file.isDir() ? "目錄" : "文件"; System.out.println(info + ":" + p); } } // 創(chuàng)建目錄 @Test public void testDirectory() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); // 要創(chuàng)建的目錄 Path path = new Path("/opt/data/dir"); boolean isSuccessful = hdfs.mkdirs(path);// 相當(dāng)于 linux下 mkdir -p // /opt/data/dir String info = isSuccessful ? "成功" : "失敗"; System.out.println("創(chuàng)建目錄【" + path + "】" + info); } // 上傳文件-- put copyFromLocal @Test public void testPut() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); // 本地文件(目錄+文件名稱) Path srcPath = new Path("c:/0125.log"); // hdfs文件上傳路徑 Path dstPath = new Path("/opt/data/dir/"); hdfs.copyFromLocalFile(srcPath, dstPath); } // 創(chuàng)建hdfs文件并寫入內(nèi)容 @Test public void testCreate() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); Path path = new Path("/opt/data/dir/touch.data"); // 創(chuàng)建文件并獲取輸出流 FSDataOutputStream fSDataOutputStream = hdfs.create(path); // 通過輸出流寫入數(shù)據(jù) fSDataOutputStream.write("你好".getBytes()); fSDataOutputStream.writeUTF("hello hadoop!"); IOUtils.closeStream(fSDataOutputStream); } // 文件重命名 @Test public void testRename() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); Path oldPath = new Path("/opt/data/dir/touch.data"); Path newPath = new Path("/opt/data/dir/rename.data"); boolean flag = hdfs.rename(oldPath, newPath); System.out.println(flag); } // 刪除文件 public void testDelete() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); Path path = new Path("/opt/data/dir/touch.data"); boolean flag = hdfs.deleteOnExit(path); System.out.println(flag); } // 刪除目錄 public void testDeleteDir() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); Path path = new Path("/opt/data/dir"); boolean flag = hdfs.delete(path, true);// 如果是目錄第二個參數(shù)必須為true System.out.println(flag); } // 查找某個文件在hdfs集群的位置 public void testLocation() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); Path path = new Path("/opt/data/test.file"); FileStatus fileStatus = hdfs.getFileStatus(path); BlockLocation[] blockLocations = hdfs.getFileBlockLocations(fileStatus, 0, fileStatus.getLen()); for (BlockLocation blockLocation : blockLocations) { String[] hosts = blockLocation.getHosts(); for (String host : hosts) { System.out.print(host + " "); } System.out.println(); } } // 獲取hdfs集群上所有節(jié)點名稱信息 public void testCluster() throws Exception { FileSystem hdfs = HDFSUtils.getFileSystem(); DistributedFileSystem distributedFileSystem = (DistributedFileSystem) hdfs; DatanodeInfo[] datanodeInfos = distributedFileSystem.getDataNodeStats(); for (DatanodeInfo datanodeInfo : datanodeInfos) { String hostName = datanodeInfo.getHostName(); System.out.println(hostName); } } }
三、上傳合并小文件到hdfs
實現(xiàn)思想:循環(huán)遍歷本地文件輸入流
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; /** * * 向hdfs上傳復(fù)制文件的過程中,進(jìn)行合并文件 * */ public class PutMerge { /** * * @param localDir * 本地要上傳的文件目錄 * @param hdfsFile * HDFS上的文件名稱,包括路徑 */ public static void put(String localDir, String hdfsFile) throws Exception { // 獲取配置信息 Configuration conf = new Configuration(); Path localPath = new Path(localDir); Path hdfsPath = new Path(hdfsFile); // 獲取本地文件系統(tǒng) FileSystem localFs = FileSystem.getLocal(conf); // 獲取HDFS FileSystem hdfs = FileSystem.get(conf); // 本地文件系統(tǒng)指定目錄中的所有文件 FileStatus[] status = localFs.listStatus(localPath); // 打開hdfs上文件的輸出流 FSDataOutputStream fSDataOutputStream = hdfs.create(hdfsPath); // 循環(huán)遍歷本地文件 for (FileStatus fileStatus : status) { // 獲取文件 Path path = fileStatus.getPath(); System.out.println("文件為:" + path.getName()); // 打開文件輸入流 FSDataInputStream fSDataInputStream = localFs.open(path); // 進(jìn)行流的讀寫操作 byte[] buff = new byte[1024]; int len = 0; while ((len = fSDataInputStream.read(buff)) > 0) { fSDataOutputStream.write(buff, 0, len); } fSDataInputStream.close(); } fSDataOutputStream.close(); } public static void main(String[] args) { String localDir="D:/logs"; String hdfsFile="hdfs://hadoop-master.dragon.org:9000/opt/data/logs.data"; try { put(localDir,hdfsFile); } catch (Exception e) { e.printStackTrace(); } } }
當(dāng)前文章:hdfs常用API和putMerge功能實現(xiàn)
當(dāng)前地址:http://aaarwkj.com/article30/igehso.html
成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信小程序、網(wǎng)站設(shè)計公司、網(wǎng)站制作、外貿(mào)建站、網(wǎng)站導(dǎo)航、ChatGPT
聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)