Java 操作 Hadoop 的 HDFS 文件系统

时间:2023-02-06 09:00:47

1.导入pom依赖

  <properties>

    ......

    <hadoop.version>3.1.2</hadoop.version>

  </properties>

  <dependencies>
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-common</artifactId>
      <version>${hadoop.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-hdfs</artifactId>
      <version>${hadoop.version}</version>
    </dependency>

    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>${hadoop.version}</version>
    </dependency>

    ......

  </dependencies>

 

 

2.使用

public class TestHadoop {

    private static Configuration conf;
    private static FileSystem fs;

    /**
     * 初始化
     * @throws Exception
     */
    public static void init() throws Exception {
        conf = new Configuration();
        conf.set("fs.defaultFS", "hdfs://localhost:9527");  // 对应 core-site.xml 中配置的端口
        // 拿到操作HDFS的一个实例,并且设置其用户(由于windows权限问题"zwj"需替换为管理员账号)
        fs = FileSystem.get(new URI("hdfs://localhost:9527"),conf,"zwj");
    }

    /**
     * 文件上传
     * @throws Exception
     */
    public static void upload()throws Exception{
        //后面的true,是指如果文件存在,则覆盖
        FSDataOutputStream fout = fs.create(new Path("/mydir/001.jpg"), true);
        InputStream in = new FileInputStream("E:/tmp/qrcode/123.jpg");

        //复制流,并且完成之后关闭流
        IOUtils.copyBytes(in, fout, 1024,true);
    }

    /**
     * 在指定位置读取
     * @throws Exception
     */
    public static void random()throws Exception{
        FSDataInputStream fin = fs.open(new Path("/mydir/001.jpg"));
        //从0起始位置的位置开始读
        fin.seek(0);

        OutputStream out = new FileOutputStream("E:/tmp/qrcode/111.jpg");

        IOUtils.copyBytes(fin, out, 1024,true);
    }

    /**
     * 获取hadoop配置
     * @throws Exception
     */
    public static void conf()throws Exception{
        Iterator<Map.Entry<String, String>> iterator = conf.iterator();
        while(iterator.hasNext()){
            Map.Entry<String, String> entry = iterator.next();
            System.out.println(entry);
        }
    }

    /**
     * 创建文件夹
     * @throws Exception
     */
    public static void mkdir()throws Exception{
        boolean mkdirs = fs.mkdirs(new Path("/mydir/dir1/dir2"));
        if(mkdirs){
            System.out.println("创建文件夹成功");
        }
        fs.close();
    }

    /**
     * 删除文件及文件夹
     * @throws Exception
     */
    public static void delete()throws Exception{
        //递归删除
        boolean delete = fs.delete(new Path("/mydir"), true);
        if(delete){
            System.out.println("删除成功");
        }
        fs.close();
    }

    /**
     * 递归列出所有文件
     * @throws Exception
     */
    public static void listFile()throws Exception{
        RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);
        while(listFiles.hasNext()){
            LocatedFileStatus lfs = listFiles.next();
            System.out.println("块大小:" + lfs.getBlockSize());
            System.out.println("所属组:" + lfs.getOwner());
            System.out.println("大小:" + lfs.getLen());
            System.out.println("文件名:" + lfs.getPath().getName());
            System.out.println("是否目录:" + lfs.isDirectory());
            System.out.println("是否文件:" + lfs.isFile());
            System.out.println();
            BlockLocation[] blockLocations = lfs.getBlockLocations();
            for (BlockLocation blockLocation : blockLocations) {
                System.out.println("块偏移数:" + blockLocation.getOffset());
                System.out.println("块长度:" + blockLocation.getLength());
                System.out.println("块名称:" + Arrays.toString(blockLocation.getNames()));
                System.out.println("块名称:" + Arrays.toString(blockLocation.getHosts()));
            }
            System.out.println("--------------------------");
        }
    }

    /**
     * 列出指定目录下的文件
     * @throws Exception
     */
    public static void listFile2()throws Exception{
        FileStatus[] listStatus = fs.listStatus(new Path("/"));
        for (FileStatus fileStatus : listStatus) {
            System.out.println("块大小:" + fileStatus.getBlockSize());
            System.out.println("所属组:" + fileStatus.getOwner());
            System.out.println("大小:" + fileStatus.getLen());
            System.out.println("文件名:" + fileStatus.getPath().getName());
            System.out.println("是否目录:" + fileStatus.isDirectory());
            System.out.println("是否文件:" + fileStatus.isFile());
        }
    }


    public static void main(String[] args) {
        try {
            init();
//            upload();
//            random();
//            conf();
//            mkdir();
//            delete();
            listFile();
//            listFile2();
        } catch (Exception e){
            e.printStackTrace();
        }
    }

}