hadoop 学习笔记(第三章 Hadoop分布式文件系统 )

时间:2023-03-08 17:37:27
hadoop 学习笔记(第三章 Hadoop分布式文件系统 )

map->shuffle->reduce

map(k1,v1)--->(k2,v2)

reduce(k2,List<v2>)--->(k2,v3)

传输类型:org.apache.hadoop.io

访问HDFS文件系统

1.java.net.URL 的setURLStreamHandlerFactory() 方法。每个java虚拟机只能调用一次,因此通常在静态方法中调用。如果引用的第三方组件调用过,再次调用会报错。

public class App 
{
static{
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
} static InputStream inputStream=null;
public static void main( String[] args ) throws Exception
{
try{
inputStream=new URL(args[0]).openStream();
IOUtils.copyBytes(inputStream,System.out,4096,false);
}finally {
IOUtils.closeStream(inputStream);
}
}
}

2.FileSystem API 读取数据

public class App {
public static void main(String[] args) throws Exception {
String uri = args[];
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI(uri), configuration);
InputStream inputStream = null;
try {
inputStream = fs.open(new Path(uri));
IOUtils.copyBytes(inputStream, System.out, , false);
} finally {
IOUtils.closeStream(inputStream);
}
}
}
//实际上,FileSystem对象中open()方法返回的是FSDataInputStream对象。其实现了Seekable接口和PositionedReadable接口
public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer {
}
public interface Seekable {
/**
* Seek to the given offset from the start of the file.
* The next read() will be from that location. Can't
* seek past the end of the file.
*/
void seek(long pos) throws IOException; /**
* Return the current offset from the start of the file
*/
long getPos() throws IOException; /**
* Seeks a different copy of the data. Returns true if
* found a new source, false otherwise.
*/
@InterfaceAudience.Private
boolean seekToNewSource(long targetPos) throws IOException;
} public interface PositionedReadable {
/**
* Read upto the specified number of bytes, from a given
* position within a file, and return the number of bytes read. This does not
* change the current offset of a file, and is thread-safe.
*/
public int read(long position, byte[] buffer, int offset, int length)
throws IOException; /**
* Read the specified number of bytes, from a given
* position within a file. This does not
* change the current offset of a file, and is thread-safe.
*/
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException; /**
* Read number of bytes equal to the length of the buffer, from a given
* position within a file. This does not
* change the current offset of a file, and is thread-safe.
*/
public void readFully(long position, byte[] buffer) throws IOException;
}

read()和readFully()的区别是readFully()在读取到length之前会阻塞,read()如果读到的小于length,读到多少返回多少。

seek()方法的开销较高,要谨慎使用。

3.写入数据

public class App {
public static void main(String[] args) throws Exception { String localSrc=args[0];
String dstSrc=args[1]; InputStream inputStream=new BufferedInputStream(new FileInputStream(localSrc)); Configuration configuration=new Configuration();
FileSystem fs=FileSystem.get(URI.create(dstSrc),configuration); OutputStream outputStream=fs.create(new Path(dstSrc), new Progressable() {
@Override
public void progress() {
System.out.print(".");
}
}); IOUtils.copyBytes(inputStream,outputStream,4096,true);
}
}

4.目录与查询

FileSystem提供mkdir方法创建目录。通常不需要,因为create方法写入文件时会自动创建目录

public boolean mkdirs(Path f) throws IOException {
}

FileSystem提供getFileStatus方法返回文件元数据。元数据包括文件的地址,大小,权限等

public abstract FileStatus getFileStatus(Path f) throws IOException;

FIleSystem提供listStatus()方法列出目录中的文件

public class App {
public static void main(String[] args) throws Exception { String uri = args[0];
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), configuration); Path[] paths = new Path[args.length]; for (int i = 0; i < paths.length; i++) {
paths[i] = new Path(args[i]);
} FileStatus[] status = fs.listStatus(paths);
Path[] listPaths = FileUtil.stat2Paths(status); for (Path path : listPaths) {
System.out.println(path);
}
}
}

FileSystem还提供globStatus方法返回与指定格式匹配的所有FIleStatus

 public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException ;

FileSystem提供delete方法永久删除文件或目录。如果f是一个空目录,recursive就会被忽略。如果f非空,只有在recursive为true时才会执行删除。

public abstract boolean delete(Path f, boolean recursive) throws IOException;