Hadoop JAVA HDFS客户端操作

时间:2021-09-16 15:13:57

JAVA HDFS客户端操作

通过API操作HDFS

org.apache.logging.log4jlog4j-core2.8.2org.apache.hadoophadoop-common${hadoop.version}org.apache.hadoophadoop-hdfs${hadoop.version}org.apache.hadoophadoop-client${hadoop.version} " v:shapes="文本框_x0020_2">配置maven的pom文件

创建第一个java工程

public class HdfsClientDemo1 {

public static void main(String[] args) throws Exception {

// 1 获取文件系统

Configuration configuration = new Configuration();

// 配置在集群上运行

configuration.set("fs.defaultFS", "hdfs://hadoop-001:9000");

FileSystem fileSystem = FileSystem.get(configuration);

// 直接配置访问集群的路径和访问集群的用户名称

//            FileSystem fileSystem = FileSystem.get(new URI("hdfs://hadoop-001:9000"),configuration, "hadoop");

// 2 把本地文件上传到文件系统中

fileSystem.copyFromLocalFile(new Path("f:/hello.txt"), new Path("/hello1.copy.txt"));

// 3 关闭资源

fileSystem.close();

System.out.println("over");

}

}

出现下以异常现象

Hadoop  JAVA HDFS客户端操作

解决访问权限有两种解决方案:

1、 配置vm的参数


-DHADOOP_USER_NAME=hadoop


Hadoop  JAVA HDFS客户端操作

2、 直接在SystemFile.get方法指明用户名

Hadoop  JAVA HDFS客户端操作

3.2.1 HDFS获取文件系统

1)详细代码

@Test

public void initHDFS() throws Exception{

// 1 创建配置信息对象

// new Configuration();的时候,它就会去加载jar包中的hdfs-default.xml

// 然后再加载classpath下的hdfs-site.xml

Configuration configuration = new Configuration();

// 2 设置参数

// 参数优先级: 1、客户端代码中设置的值  2、classpath下的用户自定义配置文件 3、然后是服务器的默认配置

//            configuration.set("fs.defaultFS", "hdfs://hadoop102:9000");

configuration.set("dfs.replication", "3");

// 3 获取文件系统

FileSystem fs = FileSystem.get(configuration);

// 4 打印文件系统

System.out.println(fs.toString());

}

2)将core-site.xml拷贝到项目的根目录下

<?xml version="1.0" encoding="UTF-8"?>

<?xml-stylesheet type="text/xsl" href="configuration.xsl"?>

<configuration>

<!-- 指定HDFS中NameNode的地址 -->

<property>

<name>fs.defaultFS</name>

<value>hdfs://hadoop102:9000</value>

</property>

<!-- 指定hadoop运行时产生文件的存储目录 -->

<property>

<name>hadoop.tmp.dir</name>

<value>/opt/module/hadoop-2.7.2/data/tmp</value>

</property>

</configuration>

3.2.2 HDFS文件上传

@Test

public void putFileToHDFS() throws Exception{

// 1 创建配置信息对象

Configuration configuration = new Configuration();

FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"),configuration, "hadoop");

// 2 创建要上传文件所在的本地路径

Path src = new Path("e:/hello.txt");

// 3 创建要上传到hdfs的目标路径

Path dst = new Path("hdfs://hadoop102:9000/user/hadoop/hello.txt");

// 4 拷贝文件

fs.copyFromLocalFile(src, dst);

fs.close();

}

如何更改副本个数?

1、          在类路径新建hdfs-site.xml文件

Hadoop  JAVA HDFS客户端操作

2、        直接configuration里面设置键值对象

Hadoop  JAVA HDFS客户端操作

3、          3.2.3 HDFS文件下载

@Test

public void getFileFromHDFS() throws Exception{

// 1 创建配置信息对象

Configuration configuration = new Configuration();

FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"),configuration, "hadoop");

//     fs.copyToLocalFile(new Path("hdfs://hadoop102:9000/user/hadoop/hello.txt"), new Path("d:/hello.txt"));

// boolean delSrc 指是否将原文件删除

// Path src 指要下载的文件路径

// Path dst 指将文件下载到的路径

// boolean useRawLocalFileSystem 是否开启文件效验

// 2 下载文件

fs.copyToLocalFile(false, new Path("hdfs://hadoop102:9000/user/hadoop/hello.txt"), new Path("e:/hellocopy.txt"), true);

fs.close();

}

3.2.4 HDFS目录创建

@Test

public void mkdirAtHDFS() throws Exception{

// 1 创建配置信息对象

Configuration configuration = new Configuration();

FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"),configuration, "hadoop");

//2 创建目录

fs.mkdirs(new Path("hdfs://hadoop102:9000/user/hadoop/output"));

}

3.2.5 HDFS文件夹删除

@Test

public void deleteAtHDFS() throws Exception{

// 1 创建配置信息对象

Configuration configuration = new Configuration();

FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"),configuration, "hadoop");

//2 删除文件夹 ,如果是非空文件夹,参数2必须给值true

fs.delete(new Path("hdfs://hadoop102:9000/user/hadoop/output"), true);

}

3.2.6 HDFS文件名更改

@Test

public void renameAtHDFS() throws Exception{

// 1 创建配置信息对象

Configuration configuration = new Configuration();

FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"),configuration, "hadoop");

//2 重命名文件或文件夹

fs.rename(new Path("hdfs://hadoop102:9000/user/hadoop/hello.txt"), new Path("hdfs://hadoop102:9000/user/hadoop/hellonihao.txt"));

}

3.2.7 HDFS文件详情查看

@Test

public void readListFiles() throws Exception {

// 1 创建配置信息对象

Configuration configuration = new Configuration();

FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"),configuration, "hadoop");

// 思考:为什么返回迭代器,而不是List之类的容器

RemoteIterator<LocatedFileStatus> listFiles = fs.listFiles(new Path("/"), true);

while (listFiles.hasNext()) {

LocatedFileStatus fileStatus = listFiles.next();

System.out.println(fileStatus.getPath().getName());

System.out.println(fileStatus.getBlockSize());

System.out.println(fileStatus.getPermission());

System.out.println(fileStatus.getLen());

BlockLocation[] blockLocations = fileStatus.getBlockLocations();

for (BlockLocation bl : blockLocations) {

System.out.println("block-offset:" + bl.getOffset());

String[] hosts = bl.getHosts();

for (String host : hosts) {

System.out.println(host);

}

}

System.out.println("--------------李冰冰的分割线--------------");

}

}

3.2.8 HDFS文件夹查看

@Test

public void findAtHDFS() throws Exception, IllegalArgumentException, IOException{

// 1 创建配置信息对象

Configuration configuration = new Configuration();

FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"),configuration, "hadoop");

// 2 获取查询路径下的文件状态信息

FileStatus[] listStatus = fs.listStatus(new Path("/"));

// 3 遍历所有文件状态

for (FileStatus status : listStatus) {

if (status.isFile()) {

System.out.println("f--" + status.getPath().getName());

} else {

System.out.println("d--" + status.getPath().getName());

}

}

}

3.3 通过IO流操作HDFS

3.3.1 HDFS文件上传

@Test

public void putFileToHDFS() throws Exception{

// 1 创建配置信息对象

Configuration configuration = new Configuration();

FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"),configuration, "hadoop");

// 2 创建输入流

FileInputStream inStream = new FileInputStream(new File("e:/hello.txt"));

// 3 获取输出路径

String putFileName = "hdfs://hadoop102:9000/user/hadoop/hello1.txt";

Path writePath = new Path(putFileName);

// 4 创建输出流

FSDataOutputStream outStream = fs.create(writePath);

// 5 流对接

try{

IOUtils.copyBytes(inStream, outStream, 4096, false);

}catch(Exception e){

e.printStackTrace();

}finally{

IOUtils.closeStream(inStream);

IOUtils.closeStream(outStream);

}

}

3.3.2 HDFS文件下载

@Test

public void getFileToHDFS() throws Exception{

// 1 创建配置信息对象

Configuration configuration = new Configuration();

FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"),configuration, "hadoop");

// 2 获取读取文件路径

String filename = "hdfs://hadoop102:9000/user/hadoop/hello1.txt";

// 3 创建读取path

Path readPath = new Path(filename);

// 4 创建输入流

FSDataInputStream inStream = fs.open(readPath);

// 5 流对接输出到控制台

try{

IOUtils.copyBytes(inStream, System.out, 4096, false);

}catch(Exception e){

e.printStackTrace();

}finally{

IOUtils.closeStream(inStream);

}

}

3.3.3 定位文件读取

1)下载第一块

@Test

// 定位下载第一块内容

public void readFileSeek1() throws Exception {

// 1 创建配置信息对象

Configuration configuration = new Configuration();

FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "hadoop");

// 2 获取输入流路径

Path path = new Path("hdfs://hadoop102:9000/user/hadoop/tmp/hadoop-2.7.2.tar.gz");

// 3 打开输入流

FSDataInputStream fis = fs.open(path);

// 4 创建输出流

FileOutputStream fos = new FileOutputStream("e:/hadoop-2.7.2.tar.gz.part1");

// 5 流对接

byte[] buf = new byte[1024];

for (int i = 0; i < 128 * 1024; i++) {

fis.read(buf);

fos.write(buf);

}

// 6 关闭流

IOUtils.closeStream(fis);

IOUtils.closeStream(fos);

}

2)下载第二块

@Test

// 定位下载第二块内容

public void readFileSeek2() throws Exception{

// 1 创建配置信息对象

Configuration configuration = new Configuration();

FileSystem fs = FileSystem.get(new URI("hdfs://hadoop102:9000"), configuration, "hadoop");

// 2 获取输入流路径

Path path = new Path("hdfs://hadoop102:9000/user/hadoop/tmp/hadoop-2.7.2.tar.gz");

// 3 打开输入流

FSDataInputStream fis = fs.open(path);

// 4 创建输出流

FileOutputStream fos = new FileOutputStream("e:/hadoop-2.7.2.tar.gz.part2");

// 5 定位偏移量(第二块的首位)

fis.seek(1024 * 1024 * 128);

// 6 流对接

IOUtils.copyBytes(fis, fos, 1024);

// 7 关闭流

IOUtils.closeStream(fis);

IOUtils.closeStream(fos);

}

3)读取块信息

Configuration configuration = new Configuration();

FileSystem fs = FileSystem.get(new URI("hdfs://hadoop-001:9000"),configuration, "hadoop");

// 2 获取读取文件路径

String filename = "hdfs://hadoop-001:9000/0306_668/hadoop-2.7.2.tar.gz";

// 3 创建读取path

Path readPath = new Path(filename);

// 4 创建输入流

HdfsDataInputStream hdis=

(HdfsDataInputStream)

fs.open(readPath);

List<LocatedBlock> allBlocks=

hdis.getAllBlocks();

for(LocatedBlock block:allBlocks){

ExtendedBlock eBlock=

block.getBlock();

System.out.println("------------------------");

System.out.println(

eBlock.getBlockId());

System.out.println(

eBlock.getBlockName());

System.out.println(

block.getBlockSize());

System.out.println(

block.getStartOffset());

// 获取当前的数据块所在的DataNode的信息

DatanodeInfo[] locations=

block.getLocations();

for(DatanodeInfo info:locations){

System.out.println(

info.getIpAddr());

System.out.println(

info.getHostName());

}

}

hdis.close();

fs.close();

合并文件指令:type hadoop-2.7.2.tar.gz.part2>>hadoop-2.7.2.tar.gz.part1

代码如下:

package com.gec.demo;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.*;
import org.apache.hadoop.hdfs.client.HdfsDataInputStream;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.ExtendedBlock;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test; import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.List; public class HdfsClientAppTest
{ @Test
public void getHdfsClient() throws IOException {
//如何通过java操作hdfs
//1、新建Configuration对象 Configuration configuration=new Configuration(); configuration.set("fs.defaultFS","hdfs://hadoop-001:9000"); //2获取FileSystem对象
FileSystem fileSystem=FileSystem.get(configuration);
fileSystem.mkdirs(new Path("/100_3")); //关闭,释放资源
fileSystem.close();
} @Test
public void getHdfsClient2() throws URISyntaxException, IOException, InterruptedException {
Configuration configuration=new Configuration(); FileSystem fileSystem=FileSystem.get(
new URI("hdfs://hadoop-001:9000"),
configuration,
"hadoop"); fileSystem.mkdirs(new Path("/100_2")); fileSystem.close(); } /*
* 实现一个文件上传
* */
@Test
public void putFileToHDFS() throws URISyntaxException, IOException, InterruptedException { Configuration configuration=new Configuration(); FileSystem fileSystem=FileSystem.get(
new URI("hdfs://hadoop-001:9000"),
configuration,
"hadoop"); //
// 参数一:Path src:源文件
// 参数二:Path dst:目标文件
Path srcPath=new Path("D:\\src\\hello.txt");
Path destPath=new Path("/100_1/hello.txt");
fileSystem.copyFromLocalFile(srcPath,destPath); fileSystem.close(); } @Test
public void putFileToHDFS2() throws URISyntaxException, IOException, InterruptedException { Configuration configuration=new Configuration(); FileSystem fileSystem=FileSystem.get(
new URI("hdfs://hadoop-001:9000"),
configuration,
"hadoop"); //
// 参数一:Path src:源文件
// 参数二:Path dst:目标文件
Path srcPath=new Path("D:\\src\\hello.txt");
Path destPath=new Path("/100_2/hello.txt");
fileSystem.copyFromLocalFile(srcPath,destPath); fileSystem.close(); } @Test
public void putFileToHDFS3() throws URISyntaxException, IOException, InterruptedException { Configuration configuration=new Configuration();
configuration.set("dfs.replication","2"); FileSystem fileSystem=FileSystem.get(
new URI("hdfs://hadoop-001:9000"),
configuration,
"hadoop"); //
// 参数一:Path src:源文件
// 参数二:Path dst:目标文件
Path srcPath=new Path("D:\\src\\hello.txt");
Path destPath=new Path("/100_3/hello.txt");
fileSystem.copyFromLocalFile(srcPath,destPath); fileSystem.close(); } //HDFS文件夹删除
@Test
public void deleteAtHDFS() throws URISyntaxException, IOException, InterruptedException { Configuration configuration=new Configuration();
configuration.set("dfs.replication","2"); FileSystem fileSystem=FileSystem.get(
new URI("hdfs://hadoop-001:9000"),
configuration,
"hadoop"); fileSystem.delete(new Path("/100_3"),true); fileSystem.close();
} //修改文件名
@Test
public void renameAtHDFS() throws URISyntaxException, IOException, InterruptedException { Configuration configuration=new Configuration(); FileSystem fileSystem=FileSystem.get(
new URI("hdfs://hadoop-001:9000"),
configuration,
"hadoop"); fileSystem.rename(new Path("/100_2/hello.txt"),new Path("/100_2/hello2.txt")); fileSystem.close(); } //查看文件列表
@Test
public void readListFiles() throws URISyntaxException, IOException, InterruptedException {
Configuration configuration=new Configuration(); FileSystem fileSystem=FileSystem.get(
new URI("hdfs://hadoop-001:9000"),
configuration,
"hadoop"); RemoteIterator<LocatedFileStatus> iterator=fileSystem.listFiles(new Path("/"),true); while (iterator.hasNext())
{
LocatedFileStatus filestatus = iterator.next(); System.out.println("权限="+filestatus.getPermission());
System.out.println("文件名="+filestatus.getPath().getName());
System.out.println("文件大小="+filestatus.getLen());
System.out.println("文件副本数="+filestatus.getReplication()); //获取块的位置信息
//
BlockLocation[] blockLocations = filestatus.getBlockLocations(); for (BlockLocation blockLocation : blockLocations) {
System.out.println("块的偏移量="+blockLocation.getOffset());
System.out.println("块大小="+blockLocation.getLength());
String hosts[]=blockLocation.getHosts();
for (String host : hosts) {
System.out.println("副本存储的主机位置="+host);
} System.out.println("区别块信息---------------"); } System.out.println("区别文件信息-----------------------------"); } fileSystem.close(); } /*
* 通过io流实现文件上传到hdfs
* */
@Test
public void putFileToHDFSByIOStream() throws Exception
{
Configuration configuration=new Configuration(); FileSystem fileSystem=FileSystem.get(
new URI("hdfs://hadoop-001:9000"),
configuration,
"hadoop"); FileInputStream fileInputStream=new FileInputStream("D:\\src\\Ahost.java"); FSDataOutputStream fsDataOutputStream=fileSystem.create(new Path("/100_1/Ahost.java"));
IOUtils.copyBytes(fileInputStream,fsDataOutputStream,1024,true); /* fileInputStream.close();
fsDataOutputStream.close();*/
fileSystem.close(); } /*
* 通过io流实现从hdfs下载本地文件
* */ @Test
public void downloadFileByIOStream() throws Exception
{
Configuration configuration=new Configuration(); FileSystem fileSystem=FileSystem.get(
new URI("hdfs://hadoop-001:9000"),
configuration,
"hadoop"); FSDataInputStream fsDataInputStream=fileSystem.open(new Path("hdfs://hadoop-001:9000/100_1/Ahost.java")); IOUtils.copyBytes(fsDataInputStream,System.out,1024,true); /* fileInputStream.close();
fsDataOutputStream.close();*/
fileSystem.close(); } @Test
public void getBlockInfo() throws Exception
{
Configuration configuration=new Configuration(); FileSystem fileSystem=FileSystem.get(
new URI("hdfs://hadoop-001:9000"),
configuration,
"hadoop"); HdfsDataInputStream hdis= (HdfsDataInputStream) fileSystem.open(new Path("hdfs://hadoop-001:9000/100_1/hadoop-2.7.2.tar.gz")); List<LocatedBlock> allBlocks = hdis.getAllBlocks();
for (LocatedBlock allBlock : allBlocks) { ExtendedBlock block = allBlock.getBlock();
System.out.println("块id="+block.getBlockId());
System.out.println("块文件名="+block.getBlockName());
System.out.println("时间="+block.getGenerationStamp()); DatanodeInfo[] locations = allBlock.getLocations();
for (DatanodeInfo location : locations) {
System.out.println("存储datanode的主机名="+location.getHostName());
} System.out.println("---------------------"); } fileSystem.close(); } //下载第一块内容
@Test
public void downFirstBlock() throws Exception
{
Configuration configuration=new Configuration(); FileSystem fileSystem=FileSystem.get(
new URI("hdfs://hadoop-001:9000"),
configuration,
"hadoop"); FSDataInputStream fsinput=fileSystem.open(new Path("hdfs://hadoop-001:9000/100_1/hadoop-2.7.2.tar.gz"));
FileOutputStream fileOutputStream=new FileOutputStream("D:\\src\\hadoop-2.7.2.tar.gz.part1"); //128MB
byte[] buf = new byte[1024];
for (int i = 0; i < 128 * 1024; i++) {
fsinput.read(buf);
fileOutputStream.write(buf);
} fileOutputStream.close();
fsinput.close(); fileSystem.close(); } //下载第二块内容
@Test
public void downFirstBlock2() throws Exception
{
Configuration configuration=new Configuration(); FileSystem fileSystem=FileSystem.get(
new URI("hdfs://hadoop-001:9000"),
configuration,
"hadoop"); FSDataInputStream fsinput=fileSystem.open(new Path("hdfs://hadoop-001:9000/100_1/hadoop-2.7.2.tar.gz"));
FileOutputStream fileOutputStream=new FileOutputStream("D:\\src\\hadoop-2.7.2.tar.gz.part2"); //定义偏移量为128MB
fsinput.seek(1024 * 1024 * 128); IOUtils.copyBytes(fsinput,fileOutputStream,1024,true); fileSystem.close(); } }