Java代码操作HDFS测试类

时间:2021-06-05 19:23:40

1.Java代码操作HDFS需要用到Jar包和Java类

Jar包:

hadoop-common-2.6.0.jar和hadoop-hdfs-2.6.0.jar

Java类:

java.net.URL
org.apache.hadoop.fs.FsUrlStreamHandlerFactory
java.net.URI
org.apache.hadoop.conf.Configuration
org.apache.hadoop.fs.FileSystem
org.apache.hadoop.fs.Path
org.apache.hadoop.io.IOUtils

2.读文件的过程
客户端(client)用FileSystem的open()函数打开文件,DistributedFileSystem用RPC调用名称节点,得到文件的数据块信息。对于每一个数据块,名称节点返回保存数据块的数据节点的地址。
DistributedFileSystem返回FSDataInputStream给客户端,用来读取数据。
客户端调用stream的read()函数开始读取数据。DFSInputStream连接保存此文件第一个数据块的最近的数据节点。
Data从数据节点读到客户端(client),当此数据块读取完毕时,DFSInputStream关闭和此数据节点的连接,然后连接此文件下一个数据块的最近的数据节点。
当客户端读取完毕数据的时候,调用FSDataInputStream的close函数。
在读取数据的过程中,如果客户端在与数据节点通信出现错误,则尝试连接包含此数据块的下一个数据节点。失败的数据节点将被记录,以后不再连接。

3.上代码:

写文件 create
读取文件 open
删除文件delete
创建目录 mkdirs
删除文件或目录 delete
列出目录的内容 listStatus
显示文件系统的目录和文件的元数据信息 getFileStatus

ReadHdfsFile.java

 import java.io.IOException;
import java.io.InputStream;
import java.net.URISyntaxException;
import java.net.URL; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import java.net.URI;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils; public class ReadHdfsFile {
//让Java程序识别HDFS的URL
static{
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
}
static String fileSystemUri = "hdfs://192.168.211.130:9000"; public static void main(String[] args) throws Exception {
String fileHdfsPath = "hdfs://192.168.211.130:9000/user/root/metafile2.xml";
String fileHdfsPath2 = "hdfs://192.168.211.130:9000/user/root/metafile.xml";
String fileHdfsPath3 = "hdfs://192.168.211.130:9000/user/root/metafile3.xml";
String fileHdfsPath4 = "hdfs://192.168.211.130:9000/user/root/testCopy.txt";
String localFilePath = "D://test.txt";
String folderHdfsPath = "hdfs://192.168.211.130:9000/bbb";
//mkdir(folderHdfsPath);
//readFilePrint(fileHdfsPath);
//judgeFileOrFolder(fileHdfsPath2);
//rmdir(folderHdfsPath);
readFileAndCopy(localFilePath,fileHdfsPath4);
readFilePrint(fileHdfsPath4); }
/**
* 打印hdfs上指定的文本文件
* @param fileHdfsPath
* @throws URISyntaxException
* @throws IOException
*/
private static void readFilePrint(String fileHdfsPath) throws URISyntaxException, IOException {
FileSystem fileSystem = getFileSystem(fileSystemUri);
FSDataInputStream hdfsInputStream = fileSystem.open(new Path(fileHdfsPath)); byte[] ioBuffer = new byte[1024];
int readLen = hdfsInputStream.read(ioBuffer);
while(readLen != -1){
System.out.write(ioBuffer, 0, readLen);
readLen = hdfsInputStream.read(ioBuffer);
}
hdfsInputStream.close();
fileSystem.close();
}
/**
* 得到hdfs文件系统对象
* @param fileSystemUri
* @return
* @throws URISyntaxException
* @throws IOException
*/
private static FileSystem getFileSystem(String fileSystemUri) throws URISyntaxException,
IOException {
Configuration conf = new Configuration();
conf.set("fs.hdfs.impl",org.apache.hadoop.hdfs.DistributedFileSystem.class.getName());
conf.set("fs.file.impl", org.apache.hadoop.fs.LocalFileSystem.class.getName());
URI uri = new URI(fileSystemUri);
final FileSystem fileSystem = FileSystem.get(uri, conf);
return fileSystem;
} /**
* 读取文件,调用fileSystem的open(path)
* @throws Exception
*/
private static void readFileAndCopy(String sourceFileHdfsPath,String targetFileHdfsPath) throws Exception {
FileSystem fileSystem = getFileSystem(fileSystemUri);
//获得这段代码运行的时所处的系统(如果在windows上运行就是Windows本地操作系统)
Configuration configuration=new Configuration();
FileSystem locationFileSystem=FileSystem.getLocal(configuration); FSDataInputStream inputStream = null;
FSDataOutputStream outputStream = null;
//输出路径
Path outPath = new Path(targetFileHdfsPath);
if(sourceFileHdfsPath.startsWith("hdfs://")){
inputStream = fileSystem.open(new Path(sourceFileHdfsPath));
}else{
inputStream = locationFileSystem.open(new Path(sourceFileHdfsPath));
}
//打开输出流
if(!(fileSystem.exists(outPath))){
outputStream = fileSystem.create(outPath);
}else {
outputStream = fileSystem.append(outPath);
} IOUtils.copyBytes(inputStream, outputStream, 1024, false);
IOUtils.closeStream(inputStream);
}
/*
各个参数所代表的含义:
in: 是FSDataInputStream类的对象,是有关读取文件的类,也就是所谓“输入流”
out:是FSDataOutputStream类的对象,是有关文件写入的类,也就是“输出流”
4096表示用来拷贝的buffer大小(buffer是缓冲区)
false表明拷贝完成后我们并不关闭拷贝源可拷贝目的地
上面inputStream和outputStream都是通过FileSystem类
fileSystem和fs都是FileSystem类的对象,path和block都是路径Path类的对象
然后IOUtils.copyBytes(in, out, 4096, false)方法实现了文件合并及上传至hdfs上
*/ /**
* 创建目录,调用fileSystem的mkdirs(path)
* @throws Exception
*/
private static void mkdir(String folderHdfsPath) throws Exception {
FileSystem fileSystem = getFileSystem(fileSystemUri);
fileSystem.mkdirs(new Path(folderHdfsPath));
} /**
* 删除目录,调用fileSystem的deleteOnExit(path)
* @throws Exception
*/
private static void rmdir(String folderHdfsPath) throws Exception {
FileSystem fileSystem = getFileSystem(fileSystemUri);
fileSystem.delete(new Path(folderHdfsPath));
} /**
* 遍历目录,使用FileSystem的listStatus(path) 如果要查看file状态,使用FileStatus对象
* @throws Exception
*/
private static void judgeFileOrFolder(String fileHdfsPath) throws Exception {
FileSystem fileSystem = getFileSystem(fileSystemUri);
FileStatus[] listStatus = fileSystem.listStatus(new Path(fileHdfsPath));
for (FileStatus fileStatus : listStatus) {
String isDir = fileStatus.isDir() ? "目录" : "文件";
String name = fileStatus.getPath().toString();
System.out.println(isDir + " " + name);
}
} }

如果代码中报AccessControlException: Permission denied:

在conf/hdfs-site.xml增加
<property>
<name>dfs.permissions</name>
<value>false</value>
</property>
中文字符集:/etc/sysconfig/i18n