这里参考《Hadoop权威指南》第三章相关内容,直接贴代码,比较简单,就不注释了。
package com.hadoop.hdfs; import java.io.ByteArrayOutputStream; import java.io.IOException; import java.io.InputStream; import java.net.MalformedURLException; import java.net.URL; import java.net.URLConnection; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.FsUrlStreamHandlerFactory; import org.apache.hadoop.fs.Path; import org.apache.hadoop.io.IOUtils; import org.junit.Test; /** * * * @author cskywit *完成HDFS操作 */ public class TestHDFS { /** * 读取HDFS文件 * @throws MalformedURLException */ @Test public void readFile() throws Exception{ //注册url流处理工厂,使用URL访问HDFS URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory()); URL url = new URL("hdfs://192.168.40.130:8020/user/ctr/hadoop/a.cmd"); URLConnection conn = url.openConnection(); InputStream is = conn.getInputStream(); byte[] buf = new byte[is.available()]; is.read(buf); String str = new String(buf); System.out.println(str); } @Test public void readFileByAPI() throws Exception{ /** * 通过Hadoop API访问文件 */ Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.40.130:8020"); FileSystem fs = FileSystem.get(conf); Path p =new Path("/user/ctr/hadoop/a.cmd"); FSDataInputStream fis = fs.open(p); byte[] buf = new byte[1024]; int len = -1; ByteArrayOutputStream baos = new ByteArrayOutputStream(); while((len = fis.read(buf)) != -1){ baos.write(buf,0,len); } fis.close(); baos.close(); System.out.println(new String(baos.toByteArray())); } @Test public void readFileByAPI2() throws Exception{ /** * 通过Hadoop API访问文件第二种方法 */ Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.40.130:8020"); FileSystem fs = FileSystem.get(conf); Path p =new Path("/user/ctr/hadoop/a.cmd"); ByteArrayOutputStream baos = new ByteArrayOutputStream(); FSDataInputStream fis = fs.open(p); IOUtils.copyBytes(fis, baos, 1024); System.out.println(new String(baos.toByteArray())); } @Test public void mkdir() throws Exception{ /** * 通过Hadoop API创建HDFS文件夹 */ Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.40.130:8020"); FileSystem fs = FileSystem.get(conf); Path p =new Path("/user/ctr/hadoop/mkFromWindows"); fs.mkdirs(p); } @Test public void putFile() throws Exception{ /** * 通过Hadoop API创建文件 */ Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.40.130:8020"); FileSystem fs = FileSystem.get(conf); Path p =new Path("/user/ctr/hadoop/mkFromWindows/hello.txt"); FSDataOutputStream fos = fs.create(p); fos.write("Hello,World".getBytes()); fos.close(); } @Test public void rmFile() throws Exception{ /** * 通过Hadoop API删除文件 */ Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.40.130:8020"); FileSystem fs = FileSystem.get(conf); Path p =new Path("/user/ctr/hadoop/mkFromWindows"); //TRUE:递归删除 fs.delete(p,true); } @Test public void listDir() throws Exception{ /** * 通过Hadoop API递归输出整个文件系统 */ Configuration conf = new Configuration(); conf.set("fs.defaultFS", "hdfs://192.168.40.130:8020"); FileSystem fs = FileSystem.get(conf); Path p =new Path("/user/ctr/hadoop"); FileStatus stat[] = fs.listStatus(p); Path[] listedPaths = FileUtil.stat2Paths(stat); for (Path p1:listedPaths){ System.out.println(p1); } } }