hadoop api供给了一些遍历文件的api,通过该api可以实现遍历文件目录:
import java.io.FileNotFoundException; import java.io.IOException; import java.net.URI; import java.util.ArrayList; import java.util.Arrays; import java.util.List; import java.util.concurrent.CountDownLatch; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; public class BatchSubmitMain { public static void main(String[] args) throws Exception { String mrTableName = args[0]; String fglibTableName = args[1]; Configuration conf = new Configuration(); /* * <property> <name>fs.defaultFS</name> <value>hdfs://hcluster</value> * </property> */ conf.set("fs.defaultFS", "hdfs://hcluster"); FileSystem fileSystem = FileSystem.get(conf); String mrFilePath = "/myuser/hivedb/" + mrTableName; String fglibFilePath = "/myuser/hivedb/" + fglibTableName; System.out.println(mrFilePath); List<String> mrObjectIdItems = getObjectIdItems(fileSystem, mrFilePath); System.out.println(fglibFilePath); List<String> fglibObjectIdItems = getObjectIdItems(fileSystem, fglibFilePath); List<String> objectIdItems = new ArrayList<>(); for (String mrObjectId : mrObjectIdItems) { for (String fglibObjectId : fglibObjectIdItems) { if (mrObjectId == fglibObjectId) { objectIdItems.add(mrObjectId); } } } String submitShPath = "/app/myaccount/service/submitsparkjob.sh"; CountDownLatch threadSignal = new CountDownLatch(objectIdItems.size()); for (int ii = 0; ii < objectIdItems.size(); ii++) { String objectId = objectIdItems.get(ii); Thread thread = new ImportThread(objectId, submitShPath, threadSignal); thread.start(); } threadSignal.await(); System.out.println(Thread.currentThread().getName() + "complete"); } private static List<String> getObjectIdItems(FileSystem fileSystem, String filePath) throws FileNotFoundException, IOException { List<String> objectItems = new ArrayList<>(); Path path = new Path(filePath); // 获取文件列表 FileStatus[] files = fileSystem.listStatus(path); // 展示文件信息 for (int i = 0; i < files.length; i++) { try { if (files[i].isDirectory()) { String[] fileItems = files[i].getPath().getName().split("http://www.mamicode.com/"); String objectId = fileItems[fileItems.length - 1].replace("objectid=", ""); objectItems.add(objectId); System.out.println(objectId); } } catch (Exception e) { e.printStackTrace(); } } return objectItems; } /** * @param hdfs * FileSystem 东西 * @param path * 文件路径 */ public static void iteratorShowFiles(FileSystem hdfs, Path path) { try { if (hdfs == null || path == null) { return; } // 获取文件列表 FileStatus[] files = hdfs.listStatus(path); // 展示文件信息 for (int i = 0; i < files.length; i++) { try { if (files[i].isDirectory()) { System.out.print(">>>" + files[i].getPath() + ", dir owner:" + files[i].getOwner()); // 递归挪用 iteratorShowFiles(hdfs, files[i].getPath()); } else if (files[i].isFile()) { System.out.print(" " + files[i].getPath() + ",length:" + files[i].getLen() + ", owner:" + files[i].getOwner()); } } catch (Exception e) { e.printStackTrace(); } } } catch (Exception e) { e.printStackTrace(); } } }
并行执行sh的线程: