【Hadoop】HDFS的java客户端编写

时间:2022-07-28 20:06:53

项目使用了Maven  Project 快速进行HDFS 客户端程序测试

客户端操作系统:win10 64位

JDK: 1.7.0_79

开发工具 :Eclipse Luna

pom.xml

<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
  xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
  <modelVersion>4.0.0</modelVersion>

  <groupId>com.alixx</groupId>
  <artifactId>hdfsz</artifactId>
  <version>0.0.1</version>
  <packaging>jar</packaging>

  <name>hdfsz</name>
  <url>http://maven.apache.org</url>

  <properties>
    <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
  </properties>

  <dependencies>
    <dependency>
      <groupId>junit</groupId>
      <artifactId>junit</artifactId>
      <version>4.4</version>
      <scope>test</scope>
    </dependency>
    <!-- hadoop 分布式文件系统类库 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-hdfs</artifactId>
        <version>2.7.3</version>
    </dependency>
    <!-- hadoop 公共类库 -->
    <dependency>
        <groupId>org.apache.hadoop</groupId>
        <artifactId>hadoop-common</artifactId>
        <version>2.7.3</version>
    </dependency>
  </dependencies>
</project>

测试类HDFSUtil

package com.bonc.hdfsz;

import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;

import org.apache.commons.compress.utils.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocatedFileStatus;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.RemoteIterator;
import org.junit.Before;
import org.junit.Test;

public class HdfsUtil {
    /*
        FileSystem是文件系统的抽象,HDFS是分布式文件系统对FileSystem的实现,如此即可解耦合。
        不论底层文件系统的具体实现是什么样的,文件系统FileSystem统一提供了访问接口。
    */

    FileSystem fs = null;

    @Before
    public void init() throws IOException, InterruptedException, URISyntaxException{

        //访问HDFS文件系统两种方式
        Configuration conf = new Configuration();

        /*
            方式1:设置默认文件系统、设置run Configuration的参数 -DHADOOP_USER_NAME=dream361
            默认读取classpath下的xxx.site.xml配置文件,并解析其内容,封装到conf对象中。
            conf.set("fs.defaultFS", "hdfs://master:9000/");
        */
        fs = FileSystem.get(conf);

        /*
            方式2:在此方法的参数中设置默认文件系统、用户名
              根据配置信息,去获取一个具体文件系统的客户端操作实例对象
        */
        fs = FileSystem.get(new URI("hdfs://master:9000/"), conf, "dream361");
    }

    // 上传方式1:更底层的
    @Test
    public void upload1() throws IOException{

        Path dst = new Path("hdfs://master:9000/c.txt");

        FSDataOutputStream os = fs.create(dst);

        FileInputStream in = new FileInputStream("D:/c.txt");

        IOUtils.copy(in, os);
    }

    // 上传方式2: 封装好的
    @Test
    public void upload2() throws IllegalArgumentException, IOException{
        fs.copyFromLocalFile(new Path("D:/c2.txt"), new Path("hdfs://master:9000/c3.txt"));
    }

    // 下载文件
    @Test
    public void download() throws Exception, IOException{
        fs.copyToLocalFile(new Path(""), new Path(""));
    }

    //迭代列出文件
    @Test
    public void listFiles1() throws FileNotFoundException, IllegalArgumentException, IOException{
        //listFiles列出的是文件信息,而且提供递归遍历 ,第二个参数为false则不提供递归
        RemoteIterator<LocatedFileStatus> files = fs.listFiles(new Path("/"), true);

        while(files.hasNext()){
            LocatedFileStatus file = files.next();
            Path filePath = file.getPath();
            String fileName = filePath.getName();
            System.out.println(fileName);
        }
    }
    //迭代列出目录及文件
    @Test
    public void listFiles2() throws FileNotFoundException, IllegalArgumentException, IOException{
        //listStatus可以列出文件和目录信息,但是不提供自带的递归遍历
        FileStatus[] listStatus = fs.listStatus(new Path("/"));
        for(FileStatus status:listStatus){
            String name = status.getPath().getName();
            //判断是目录还是文件,然后打印name+和判断结果
            System.out.println(name+(status.isDirectory()?" is dir":" is file"));
        }
    }
    //创建目录
    @Test
    public void mkdir() throws IllegalArgumentException, IOException{
        fs.mkdirs(new Path("/aa/bb/cc"));
    }

    //删除文件或目录
    @Test
    public void rmFile() throws IllegalArgumentException, IOException{
        fs.delete(new Path("/aa"),true);
    }

    //移动文件
    @Test
    public void mvFile() throws IllegalArgumentException, IOException{
        fs.rename(new Path("/aa/a.txt"), new Path("/bb/b.txt"));
    }

    //在程序入口测试
    public static void main(String[] args) throws IOException {
        //配置文件信息
        Configuration conf = new Configuration();

        FileSystem fs = FileSystem.get(conf);

        Path src = new Path("hdfs://master:9000/jdk.tar.gz");

        FSDataInputStream in = fs.open(src);

        FileOutputStream os = new FileOutputStream("D:/jdk.tar.gz");

        IOUtils.copy(in, os);
    }
}