Hadoop学习笔记7之使用Hadoop客户端API访问HDFS

时间:2021-06-12 08:29:32

这里参考《Hadoop权威指南》第三章相关内容,直接贴代码,比较简单,就不注释了。

package com.hadoop.hdfs;

import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.InputStream;
import java.net.MalformedURLException;
import java.net.URL;
import java.net.URLConnection;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.FsUrlStreamHandlerFactory;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.junit.Test;
/**
 * 
 * 
 * @author cskywit
 *完成HDFS操作
 */
public class TestHDFS {
	/**
	 * 读取HDFS文件
	 * @throws MalformedURLException 
	 */
	@Test
	public void readFile() throws Exception{
		//注册url流处理工厂,使用URL访问HDFS
		URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
		URL url = new URL("hdfs://192.168.40.130:8020/user/ctr/hadoop/a.cmd");
		URLConnection conn = url.openConnection();
		InputStream is = conn.getInputStream();
		byte[] buf = new byte[is.available()];
		is.read(buf);
		String str = new String(buf);
		System.out.println(str);
		
	}
	
	@Test
	public void readFileByAPI() throws Exception{
		/**
		 * 通过Hadoop API访问文件
		 */
		
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://192.168.40.130:8020");
		FileSystem fs = FileSystem.get(conf);
		Path p =new Path("/user/ctr/hadoop/a.cmd");
		FSDataInputStream fis = fs.open(p);
		byte[] buf = new byte[1024];
		int len = -1;
		ByteArrayOutputStream baos = new ByteArrayOutputStream();
		while((len = fis.read(buf)) != -1){
			baos.write(buf,0,len);
		}
		fis.close();
		baos.close();
		System.out.println(new String(baos.toByteArray()));
	}
	
	@Test
	public void readFileByAPI2() throws Exception{
		/**
		 * 通过Hadoop API访问文件第二种方法
		 */
		
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://192.168.40.130:8020");
		FileSystem fs = FileSystem.get(conf);
		Path p =new Path("/user/ctr/hadoop/a.cmd");
		ByteArrayOutputStream baos = new ByteArrayOutputStream();
		FSDataInputStream fis = fs.open(p);
		IOUtils.copyBytes(fis, baos, 1024);
		System.out.println(new String(baos.toByteArray()));
	}
	
	@Test
	public void mkdir() throws Exception{
		/**
		 * 通过Hadoop API创建HDFS文件夹
		 */
		
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://192.168.40.130:8020");
		FileSystem fs = FileSystem.get(conf);
		Path p =new Path("/user/ctr/hadoop/mkFromWindows");
		fs.mkdirs(p);
	}
	
	@Test
	public void putFile() throws Exception{
		/**
		 * 通过Hadoop API创建文件
		 */
		
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://192.168.40.130:8020");
		FileSystem fs = FileSystem.get(conf);
		Path p =new Path("/user/ctr/hadoop/mkFromWindows/hello.txt");
		FSDataOutputStream fos = fs.create(p);
		fos.write("Hello,World".getBytes());
		fos.close();
	}
	
	@Test
	public void rmFile() throws Exception{
		/**
		 * 通过Hadoop API删除文件
		 */
		
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://192.168.40.130:8020");
		FileSystem fs = FileSystem.get(conf);
		Path p =new Path("/user/ctr/hadoop/mkFromWindows");
		//TRUE:递归删除
		fs.delete(p,true);
	}
	

	@Test
	public void listDir() throws Exception{
		/**
		 * 通过Hadoop API递归输出整个文件系统
		 */
		
		Configuration conf = new Configuration();
		conf.set("fs.defaultFS", "hdfs://192.168.40.130:8020");
		FileSystem fs = FileSystem.get(conf);
		Path p =new Path("/user/ctr/hadoop");
		FileStatus stat[] = fs.listStatus(p);
		Path[] listedPaths = FileUtil.stat2Paths(stat);
		for (Path p1:listedPaths){
			System.out.println(p1);
		}
	}

}