ftp+线程池批量上传文件

时间:2025-04-04 08:03:10
public class FileBatchUploader implements Closeable { private final String ftpServer; private final String userName; private final String password; private final String targetRemoteDir; private final FTPClient ftp = new FTPClient(); private final CompletionService<File> completionService; private final ExecutorService es; private final ExecutorService dispatcher; public FileBatchUploader(String ftpServer, String userName, String password, String targetRemoteDir) { this.ftpServer = ftpServer; this.userName = userName; this.password = password; this.targetRemoteDir = targetRemoteDir; // 使用单工作者线程的线程池 this.es = Executors.newSingleThreadExecutor(); this.dispatcher = Executors.newSingleThreadExecutor(); this.completionService = new ExecutorCompletionService<File>(es); } public void uploadFiles(final Set<File> files) { dispatcher.submit(new Runnable() { @Override public void run() { try { doUploadFiles(files); } catch (InterruptedException ignored) { } } }); } private void doUploadFiles(Set<File> files) throws InterruptedException { // 批量提交文件上传任务 for (final File file : files) { completionService.submit(new UploadTask(file)); } Future<File> future; File md5File; File uploadedFile; Set<File> md5Files = new HashSet<File>(); for (File file : files) { try { future = completionService.take(); uploadedFile = future.get(); // 将上传成功的文件移动到备份目录,并为其生成相应的MD5文件 md5File = generateMD5(moveToSuccessDir(uploadedFile)); md5Files.add(md5File); } catch (ExecutionException | IOException | NoSuchAlgorithmException e) { e.printStackTrace(); moveToDeadDir(file); } } for (File file : md5Files) { // 上传相应的MD5文件 completionService.submit(new UploadTask(file)); } // 检查md5文件的上传结果 int successUploaded = md5Files.size(); for (int i = 0; i < successUploaded; i++) { future = completionService.take(); try { uploadedFile = future.get(); md5Files.remove(uploadedFile); } catch (ExecutionException e) { e.printStackTrace(); } } // 将剩余(即未上传成功)的md5文件移动到相应备份目录 for (File file : md5Files) { moveToDeadDir(file); } } private File generateMD5(File file) throws IOException, NoSuchAlgorithmException { String md5 = Tools.md5sum(file); File md5File = new File(file.getAbsolutePath() + ".md5"); Files.write(Paths.get(md5File.getAbsolutePath()), md5.getBytes("UTF-8")); return md5File; } private static File moveToSuccessDir(File file) { File targetFile = null; try { targetFile = moveFile(file, Paths.get(file.getParent(), "..", "backup", "success")); } catch (IOException e) { e.printStackTrace(); } return targetFile; } private static File moveToDeadDir(File file) { File targetFile = null; try { targetFile = moveFile(file, Paths.get(file.getParent(), "..", "backup", "dead")); } catch (IOException e) { e.printStackTrace(); } return targetFile; } private static File moveFile(File srcFile, Path destPath) throws IOException { Path sourcePath = Paths.get(srcFile.getAbsolutePath()); if (!Files.exists(destPath)) { Files.createDirectories(destPath); } Path destFile = destPath.resolve(srcFile.getName()); Files.move(sourcePath, destFile, StandardCopyOption.REPLACE_EXISTING); return destFile.toFile(); } class UploadTask implements Callable<File> { private final File file; public UploadTask(File file) { this.file = file; } @Override public File call() throws Exception { Debug.info("uploading %s", file.getCanonicalPath()); // 上传指定的文件 upload(file); return file; } } // 初始化FTP客户端 public void init() throws Exception { FTPClientConfig config = new FTPClientConfig(); ftp.configure(config); int reply; ftp.connect(ftpServer); Debug.info("FTP Reply:%s", ftp.getReplyString()); reply = ftp.getReplyCode(); if (!FTPReply.isPositiveCompletion(reply)) { ftp.disconnect(); throw new Exception("FTP server refused connection."); } boolean isOK = ftp.login(userName, password); if (isOK) { Debug.info("FTP Reply:%s", ftp.getReplyString()); } else { throw new Exception("Failed to login." + ftp.getReplyString()); } reply = ftp.cwd(targetRemoteDir); if (!FTPReply.isPositiveCompletion(reply)) { ftp.disconnect(); throw new Exception("Failed to change working :" + reply); } else { Debug.info("FTP Reply:%s", ftp.getReplyString()); } ftp.setFileType(FTP.ASCII_FILE_TYPE); } // 将指定的文件上传至FTP服务器 protected void upload(File file) throws Exception { boolean isOK; try (InputStream dataIn = new BufferedInputStream(new FileInputStream(file))) { isOK = ftp.storeFile(file.getName(), dataIn); } if (!isOK) { throw new IOException("Failed to upload " + file + ",reply:" + "," + ftp.getReplyString()); } } @Override public void close() throws IOException { dispatcher.shutdown(); try { es.awaitTermination(60, TimeUnit.SECONDS); } catch (InterruptedException ignored) { } es.shutdown(); try { es.awaitTermination(60, TimeUnit.SECONDS); } catch (InterruptedException ignored) { } Tools.silentClose(new Closeable() { @Override public void close() throws IOException { if (ftp.isConnected()) { ftp.disconnect(); } } }); } }