下图显示了HDFS文件系统中路径为“localhost:50070/explorer.html#/user/hadoop”的目录中所有的文件信息:
对于该目录下的所有文件,我们将执行以下操作:
首先,从该目录中过滤出所有后缀名不为".abc"的文件。
然后,对过滤之后的文件进行读取。
最后,将这些文件的内容合并到文件“hdfs://localhost:9000/user/hadoop/merge.txt”中。
代码如下:
package mergeFile; import java.io.IOException; import java.io.PrintStream; import java.net.URI; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FSDataInputStream; import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.PathFilter; class myPathFilter implements PathFilter{ //过滤掉文件名满足特定条件的文件 String reg = null; myPathFilter(String reg){ this.reg = reg; } public boolean accept(Path path) { if(!(path.toString().matches(reg))) return true; return false; } } public class merge { Path inputPath = null; //待合并的文件所在的目录的路径 Path outputPath = null; //输出文件的路径 public merge(String input, String output){ this.inputPath = new Path(input); this.outputPath = new Path(output); } public void doMerge() throws IOException{ Configuration conf = new Configuration(); conf.set("fs.defaultFS","hdfs://localhost:9000" ); conf.set("fs.hdfs.impl", "org.apache.hadoop.hdfs.DistributedFileSystem"); FileSystem fsSource = FileSystem.get(URI.create(inputPath.toString()),conf); FileSystem fsDst = FileSystem.get(URI.create(outputPath.toString()),conf); FileStatus[] sourceStatus = fsSource.listStatus(inputPath, new myPathFilter(".*\\.abc")); //过滤掉目录中后缀为.abc的文件 FSDataOutputStream fsdos = fsDst.create(outputPath); //下面分别读取过滤之后的每个文件的内容,并输出到同一个文件中 for(FileStatus sta:sourceStatus){ System.out.println("路径: " + sta.getPath() + " 文件大小: " + sta.getLen() + " 权限: " + sta.getPermission() + " 内容: "); FSDataInputStream fsdis = fsSource.open(sta.getPath()); byte[] data = new byte[1024]; int read = -1; PrintStream ps = new PrintStream(System.out); while((read = fsdis.read(data)) > 0){ ps.write(data, 0, read); fsdos.write(data, 0, read); } } fsdos.close(); } public static void main(String args[]) throws IOException{ merge merge = new merge("hdfs://localhost:9000/user/hadoop/", "hdfs://localhost:9000/user/hadoop/merge.txt"); merge.doMerge(); } }
执行结果: