map->shuffle->reduce
map(k1,v1)--->(k2,v2)
reduce(k2,List<v2>)--->(k2,v3)
传输类型:org.apache.hadoop.io
访问HDFS文件系统
1.java.net.URL 的setURLStreamHandlerFactory() 方法。每个java虚拟机只能调用一次,因此通常在静态方法中调用。如果引用的第三方组件调用过,再次调用会报错。
public class App
{
static{
URL.setURLStreamHandlerFactory(new FsUrlStreamHandlerFactory());
} static InputStream inputStream=null;
public static void main( String[] args ) throws Exception
{
try{
inputStream=new URL(args[0]).openStream();
IOUtils.copyBytes(inputStream,System.out,4096,false);
}finally {
IOUtils.closeStream(inputStream);
}
}
}
2.FileSystem API 读取数据
public class App {
public static void main(String[] args) throws Exception {
String uri = args[];
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(new URI(uri), configuration);
InputStream inputStream = null;
try {
inputStream = fs.open(new Path(uri));
IOUtils.copyBytes(inputStream, System.out, , false);
} finally {
IOUtils.closeStream(inputStream);
}
}
}
//实际上,FileSystem对象中open()方法返回的是FSDataInputStream对象。其实现了Seekable接口和PositionedReadable接口
public class FSDataInputStream extends DataInputStream implements Seekable, PositionedReadable, ByteBufferReadable, HasFileDescriptor, CanSetDropBehind, CanSetReadahead, HasEnhancedByteBufferAccess, CanUnbuffer {
}
public interface Seekable {
/**
* Seek to the given offset from the start of the file.
* The next read() will be from that location. Can't
* seek past the end of the file.
*/
void seek(long pos) throws IOException; /**
* Return the current offset from the start of the file
*/
long getPos() throws IOException; /**
* Seeks a different copy of the data. Returns true if
* found a new source, false otherwise.
*/
@InterfaceAudience.Private
boolean seekToNewSource(long targetPos) throws IOException;
} public interface PositionedReadable {
/**
* Read upto the specified number of bytes, from a given
* position within a file, and return the number of bytes read. This does not
* change the current offset of a file, and is thread-safe.
*/
public int read(long position, byte[] buffer, int offset, int length)
throws IOException; /**
* Read the specified number of bytes, from a given
* position within a file. This does not
* change the current offset of a file, and is thread-safe.
*/
public void readFully(long position, byte[] buffer, int offset, int length)
throws IOException; /**
* Read number of bytes equal to the length of the buffer, from a given
* position within a file. This does not
* change the current offset of a file, and is thread-safe.
*/
public void readFully(long position, byte[] buffer) throws IOException;
}
read()和readFully()的区别是readFully()在读取到length之前会阻塞,read()如果读到的小于length,读到多少返回多少。
seek()方法的开销较高,要谨慎使用。
3.写入数据
public class App {
public static void main(String[] args) throws Exception { String localSrc=args[0];
String dstSrc=args[1]; InputStream inputStream=new BufferedInputStream(new FileInputStream(localSrc)); Configuration configuration=new Configuration();
FileSystem fs=FileSystem.get(URI.create(dstSrc),configuration); OutputStream outputStream=fs.create(new Path(dstSrc), new Progressable() {
@Override
public void progress() {
System.out.print(".");
}
}); IOUtils.copyBytes(inputStream,outputStream,4096,true);
}
}
4.目录与查询
FileSystem提供mkdir方法创建目录。通常不需要,因为create方法写入文件时会自动创建目录
public boolean mkdirs(Path f) throws IOException {
}
FileSystem提供getFileStatus方法返回文件元数据。元数据包括文件的地址,大小,权限等
public abstract FileStatus getFileStatus(Path f) throws IOException;
FIleSystem提供listStatus()方法列出目录中的文件
public class App {
public static void main(String[] args) throws Exception { String uri = args[0];
Configuration configuration = new Configuration();
FileSystem fs = FileSystem.get(URI.create(uri), configuration); Path[] paths = new Path[args.length]; for (int i = 0; i < paths.length; i++) {
paths[i] = new Path(args[i]);
} FileStatus[] status = fs.listStatus(paths);
Path[] listPaths = FileUtil.stat2Paths(status); for (Path path : listPaths) {
System.out.println(path);
}
}
}
FileSystem还提供globStatus方法返回与指定格式匹配的所有FIleStatus
public FileStatus[] globStatus(Path pathPattern, PathFilter filter) throws IOException ;
FileSystem提供delete方法永久删除文件或目录。如果f是一个空目录,recursive就会被忽略。如果f非空,只有在recursive为true时才会执行删除。
public abstract boolean delete(Path f, boolean recursive) throws IOException;