欧美一级特黄大片做受成人-亚洲成人一区二区电影-激情熟女一区二区三区-日韩专区欧美专区国产专区

hdfs常用API和putMerge功能實現(xiàn)

所需jar包

我們提供的服務(wù)有:成都網(wǎng)站建設(shè)、成都網(wǎng)站制作、微信公眾號開發(fā)、網(wǎng)站優(yōu)化、網(wǎng)站認(rèn)證、順城ssl等。為1000多家企事業(yè)單位解決了網(wǎng)站和推廣的問題。提供周到的售前咨詢和貼心的售后服務(wù),是有科學(xué)管理、有技術(shù)的順城網(wǎng)站制作公司

hdfs常用API和putMerge功能實現(xiàn)

一、URL API操作方式

import java.io.InputStream;
import java.net.URL;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;

public class HDFSUrlTest {

    /**
     * HDFS URL API操作方式
     * 不需要讀取core-site.xml和hdfs-site.xml配置文件
     */

    // 讓JAVA程序識別HDFS的URL
    static {

        URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
    }

    // 查看文件內(nèi)容

    @Test
    public void testRead() throws Exception {

        InputStream in = null;
        // 文件路徑
        String fileUrl = "hdfs://hadoop-master.dragon.org:9000/opt/data/test/01.data";
        try {
            // 獲取文件輸入流
            in = new URL(fileUrl).openStream();
            // 將文件內(nèi)容讀取出來,打印控制臺
            IOUtils.copyBytes(in, System.out, 4096, false);
        } finally {

            IOUtils.closeStream(in);
        }

    }

}

二、通過FileSystem API操作HDFS


HDFS工具類


import java.io.IOException;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;


public class HDFSUtils {

/**
 *     HDFS工具類
 */
    
    public static FileSystem  getFileSystem() {
        //聲明FileSystem
        FileSystem hdfs=null;
        try {
            
            //獲取文件配置信息
            Configuration conf =new Configuration();

            //獲取文件系統(tǒng)
            hdfs=FileSystem.get(conf);
        } catch (IOException e) {
            
            e.printStackTrace();
        }
        return hdfs;
        
    }
    
}

常用操作實現(xiàn)類

import org.apache.hadoop.fs.BlockLocation;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.mapred.gethistory_jsp;
import org.junit.Test;

public class HDFSFsTest {

    /**
     * 
     * 通過FileSystem API操作HDFS
     */
    // 讀取文件內(nèi)容

    @Test
    public void testRead() throws Exception {
        // 獲取文件系統(tǒng)
        FileSystem hdfs = HDFSUtils.getFileSystem();
        // 文件名稱
        Path path = new Path("/opt/data/test/touch.data");
        // 打開文件輸入流
        FSDataInputStream inStream = hdfs.open(path);
        // 讀取文件到控制臺顯示
        IOUtils.copyBytes(inStream, System.out, 4096, false);

        // 關(guān)閉流
        IOUtils.closeStream(inStream);
    }

    // 查看目錄
    @Test
    public void testList() throws Exception {
        FileSystem hdfs = HDFSUtils.getFileSystem();
        // 文件名稱
        Path path = new Path("/opt/data");
        FileStatus[] fileStatus = hdfs.listStatus(path);
        for (FileStatus file : fileStatus) {
            Path p = file.getPath();
            String info = file.isDir() ? "目錄" : "文件";
            System.out.println(info + ":" + p);
        }
    }

    // 創(chuàng)建目錄
    @Test
    public void testDirectory() throws Exception {
        FileSystem hdfs = HDFSUtils.getFileSystem();
        // 要創(chuàng)建的目錄
        Path path = new Path("/opt/data/dir");
        boolean isSuccessful = hdfs.mkdirs(path);// 相當(dāng)于 linux下 mkdir -p
                                                    // /opt/data/dir
        String info = isSuccessful ? "成功" : "失敗";
        System.out.println("創(chuàng)建目錄【" + path + "】" + info);
    }

    // 上傳文件-- put copyFromLocal

    @Test
    public void testPut() throws Exception {
        FileSystem hdfs = HDFSUtils.getFileSystem();
        // 本地文件(目錄+文件名稱)
        Path srcPath = new Path("c:/0125.log");
        // hdfs文件上傳路徑
        Path dstPath = new Path("/opt/data/dir/");
        hdfs.copyFromLocalFile(srcPath, dstPath);

    }

    // 創(chuàng)建hdfs文件并寫入內(nèi)容

    @Test
    public void testCreate() throws Exception {
        FileSystem hdfs = HDFSUtils.getFileSystem();

        Path path = new Path("/opt/data/dir/touch.data");
        // 創(chuàng)建文件并獲取輸出流
        FSDataOutputStream fSDataOutputStream = hdfs.create(path);
        // 通過輸出流寫入數(shù)據(jù)
        fSDataOutputStream.write("你好".getBytes());
        fSDataOutputStream.writeUTF("hello hadoop!");
        IOUtils.closeStream(fSDataOutputStream);
    }

    // 文件重命名

    @Test
    public void testRename() throws Exception {
        FileSystem hdfs = HDFSUtils.getFileSystem();

        Path oldPath = new Path("/opt/data/dir/touch.data");
        Path newPath = new Path("/opt/data/dir/rename.data");

        boolean flag = hdfs.rename(oldPath, newPath);
        System.out.println(flag);
    }

    // 刪除文件
    public void testDelete() throws Exception {
        FileSystem hdfs = HDFSUtils.getFileSystem();

        Path path = new Path("/opt/data/dir/touch.data");

        boolean flag = hdfs.deleteOnExit(path);

        System.out.println(flag);

    }

    // 刪除目錄

    public void testDeleteDir() throws Exception {
        FileSystem hdfs = HDFSUtils.getFileSystem();

        Path path = new Path("/opt/data/dir");

        boolean flag = hdfs.delete(path, true);// 如果是目錄第二個參數(shù)必須為true

        System.out.println(flag);

    }

    // 查找某個文件在hdfs集群的位置

    public void testLocation() throws Exception {
        FileSystem hdfs = HDFSUtils.getFileSystem();

        Path path = new Path("/opt/data/test.file");

        FileStatus fileStatus = hdfs.getFileStatus(path);
        BlockLocation[] blockLocations = hdfs.getFileBlockLocations(fileStatus,
                0, fileStatus.getLen());
        for (BlockLocation blockLocation : blockLocations) {

            String[] hosts = blockLocation.getHosts();
            for (String host : hosts) {

                System.out.print(host + " ");
            }
            System.out.println();

        }

    }

    // 獲取hdfs集群上所有節(jié)點名稱信息

    public void testCluster() throws Exception {
        FileSystem hdfs = HDFSUtils.getFileSystem();

        DistributedFileSystem distributedFileSystem = (DistributedFileSystem) hdfs;
        DatanodeInfo[] datanodeInfos = distributedFileSystem.getDataNodeStats();

        for (DatanodeInfo datanodeInfo : datanodeInfos) {
            String hostName = datanodeInfo.getHostName();

            System.out.println(hostName);
        }
    }

}

三、上傳合并小文件到hdfs

實現(xiàn)思想:循環(huán)遍歷本地文件輸入流

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;

/**
 * 
 * 向hdfs上傳復(fù)制文件的過程中,進(jìn)行合并文件
 * 
 */
public class PutMerge {

    /**
     * 
     * @param localDir
     *            本地要上傳的文件目錄
     * @param hdfsFile
     *            HDFS上的文件名稱,包括路徑
     */
    public static void put(String localDir, String hdfsFile) throws Exception {

        // 獲取配置信息
        Configuration conf = new Configuration();
        Path localPath = new Path(localDir);
        Path hdfsPath = new Path(hdfsFile);

        // 獲取本地文件系統(tǒng)
        FileSystem localFs = FileSystem.getLocal(conf);
        // 獲取HDFS
        FileSystem hdfs = FileSystem.get(conf);

        // 本地文件系統(tǒng)指定目錄中的所有文件

        FileStatus[] status = localFs.listStatus(localPath);
        // 打開hdfs上文件的輸出流
        FSDataOutputStream fSDataOutputStream = hdfs.create(hdfsPath);
        // 循環(huán)遍歷本地文件

        for (FileStatus fileStatus : status) {
            // 獲取文件
            Path path = fileStatus.getPath();
            System.out.println("文件為:" + path.getName());
            // 打開文件輸入流
            FSDataInputStream fSDataInputStream = localFs.open(path);

            // 進(jìn)行流的讀寫操作
            byte[] buff = new byte[1024];

            int len = 0;
            while ((len = fSDataInputStream.read(buff)) > 0) {

                fSDataOutputStream.write(buff, 0, len);
            }
            fSDataInputStream.close();
        }
        fSDataOutputStream.close();
    }

    
    public static void main(String[] args) {
        String localDir="D:/logs";
        String hdfsFile="hdfs://hadoop-master.dragon.org:9000/opt/data/logs.data";
        try {
            put(localDir,hdfsFile);
        } catch (Exception e) {
            
            e.printStackTrace();
        }
}
    
    
}

當(dāng)前文章:hdfs常用API和putMerge功能實現(xiàn)
當(dāng)前地址:http://aaarwkj.com/article30/igehso.html

成都網(wǎng)站建設(shè)公司_創(chuàng)新互聯(lián),為您提供微信小程序、網(wǎng)站設(shè)計公司、網(wǎng)站制作外貿(mào)建站網(wǎng)站導(dǎo)航、ChatGPT

廣告

聲明:本網(wǎng)站發(fā)布的內(nèi)容(圖片、視頻和文字)以用戶投稿、用戶轉(zhuǎn)載內(nèi)容為主,如果涉及侵權(quán)請盡快告知,我們將會在第一時間刪除。文章觀點不代表本網(wǎng)站立場,如需處理請聯(lián)系客服。電話:028-86922220;郵箱:631063699@qq.com。內(nèi)容未經(jīng)允許不得轉(zhuǎn)載,或轉(zhuǎn)載時需注明來源: 創(chuàng)新互聯(lián)

成都網(wǎng)站建設(shè)公司
亚洲综合福利视频网站| av熟女乱一区二区三区| 长腿丝袜美女亚洲一区二区| 蜜桃视频中文字幕二区三区| 中文字幕av一区二区人妻| 91精品国产自产永久在线| 国产精品毛片一区内射| 中文字幕熟女av一区二区| 濑亚美莉在线观看一区二区三区| 国产精品一区二区污网站| 亚洲天堂中文字幕麻豆| 中文字幕乱码亚洲美女精品| 亚洲少妇午夜福利视频| 国产一级黄色性生活片| 亚洲精品主播一区二区三区| 国产一区二区在线不卡播放| 成人免费大片在线观看视频| 国产精品午夜视频免费观看| 色婷婷激情一区二区三区| 亚洲日本国产一区二区| 日本成人精品二区在线观看| 大龄熟妇丰满有水多毛浓| 亚洲av区一区二区三| 91亚洲婷婷国产综合精品| 亚洲av最近在线观看| 国产亚洲精品视频中文字幕| 日本精品1区国产精品| 亚洲中文字幕乱码丝袜在线精品| 精品乱码一区二区三区四区| av小说亚洲激情乱| 91大神午夜在线观看| 国产成人国产精品国产三级| 亚洲精品色播一区二区| 日韩二区三区在线视频| 99久久成人精品国产片| 成人黄色av免费在线观看| 欧美亚洲中文字幕高清| 99热只有这里才有精品| 欧美日韩精品人妻一区| 日本一区二区三级在线观看| 午夜福利网午夜福利网|