要求:
实时监控一个目录,如果目录里有新增文件就启动一个复制文件线程进行文件复制,把文件复制到另一个目录里。
实时监控程序:
package cn.ba.watchFile.downLoadFile.finalcode; /** * 监控文件夹 * 经测试基本不可用,在一个40g的很深的目录下去新建和删除文件5分钟都没结果,主要原因是需要对每一个Path进行注册监控。 */ import java.io.File; import java.io.FileInputStream; import java.io.FileOutputStream; import java.io.IOException; import java.nio.file.FileSystems; import java.nio.file.Path; import java.nio.file.Paths; import java.nio.file.WatchEvent; import java.nio.file.WatchKey; import java.nio.file.WatchService; import java.util.concurrent.LinkedBlockingQueue; import static java.nio.file.StandardWatchEventKinds.*; public class Sample { private WatchService watcher; private Path path; private String filePath,toFilePath; private File file; private long startLength,endLength; public static LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue<String>();;// 先进先出队列 //创建线程池 public static TracingThreadPool tracingThreadPool = new TracingThreadPool(10); public Sample(Path path) throws IOException { this.path = path; // 获取系统监控器 watcher = FileSystems.getDefault().newWatchService(); this.path.register(watcher, OVERFLOW, ENTRY_CREATE, ENTRY_DELETE, ENTRY_MODIFY); } // 文件复制 public static void copyFile(File fromFile, File toFile) { try { FileInputStream in = new FileInputStream(fromFile); FileOutputStream os = new FileOutputStream(toFile); byte[] b = new byte[1024]; int n = 0; while ((n = in.read(b)) != -1) { os.write(b, 0, n); } in.close(); os.close(); } catch (Exception e) { // TODO Auto-generated catch block e.printStackTrace(); } } public void handleEvents() throws InterruptedException { // 开始加载数据文件 while (true) { WatchKey key = watcher.take(); for (WatchEvent<?> event : key.pollEvents()) { // 获取事件类型 final WatchEvent.Kind<?> kind = event.kind(); // 获取文件名 @SuppressWarnings("unchecked") final WatchEvent<Path> pathWatchEvent = (WatchEvent<Path>) event; final Path fileName = pathWatchEvent.context(); if (kind == ENTRY_CREATE) { filePath = path + File.separator + fileName; file = new File(filePath); startLength = file.length(); System.out.println("新增文件" + path + File.separator + fileName); Thread.sleep(20);//休息10ms endLength = file.length(); if(startLength==endLength){ lbq.add(fileName.toString()); // pool.submit(task); //从队列中取出 String poll = lbq.poll(); filePath = path + File.separator + poll; toFilePath=path + File.separator+"zipbck"+File.separator+poll; System.out.println("取出文件" + poll); if (endLength<5000000) { Thread.sleep(10);//休息10ms }else { Thread.sleep(500);//休息500ms } tracingThreadPool.addTask(new CopyThread(new FileBean(filePath, toFilePath))); //4 等待工作线程完成所有的任务 // tracingThreadPool.join(); //5 关闭线程池 // tracingThreadPool.close(); // // 把取出的文件复制到另外一个目录下 // copyFile(new File(filePath), new File(toFilePath)); System.out.println("文件"+fileName+"复制完成!!"); } } else if (kind == ENTRY_DELETE) { } else if (kind == ENTRY_MODIFY) { } else if (kind == OVERFLOW) { } } // IMPORTANT: the key must be reset after processed if (!key.reset()) { return; } } } public static void main(String[] args) throws InterruptedException, IOException { new Sample(Paths.get("F://")).handleEvents(); } }
FileBean程序:
package cn.ba.watchFile.downLoadFile.finalcode; /** * 用于传递参数的对象 * @author Administrator * */ public class FileBean { private String fromFilePath; private String toFilePath; public String getFromFilePath() { return fromFilePath; } public void setFromFilePath(String fromFilePath) { this.fromFilePath = fromFilePath; } public String getToFilePath() { return toFilePath; } public void setToFilePath(String toFilePath) { this.toFilePath = toFilePath; } public FileBean(String fromFilePath, String toFilePath) { super(); this.fromFilePath = fromFilePath; this.toFilePath = toFilePath; } @Override public String toString() { return "FileBean [fromFilePath=" + fromFilePath + ", toFilePath=" + toFilePath + "]"; } }
线程池程序:
package cn.ba.watchFile.downLoadFile.finalcode; import java.util.Date; import java.util.LinkedList; public class TracingThreadPool extends ThreadGroup { // 线程池是否关闭 private boolean isClosed = false; // 表示工作队列 private LinkedList<Runnable> workQueue; // 表示线程池ID private static int threadPoolID; // 表示工作线程ID private int threadID; public TracingThreadPool(int poolSize) { super("ThreadPool-" + (threadPoolID++)); setDaemon(true); // 创建工作队列 workQueue = new LinkedList<Runnable>(); // 打印起始时间 System.out.println("start time:" + (new Date())); for (int i = 0; i < poolSize; i++) // 创建并启动工作线程 new WorkThread().start(); } public synchronized void addTask(Runnable task) { // 线程池被关则抛出IllegalStateException异常 if (isClosed) { throw new IllegalStateException(); } if (task != null) { workQueue.add(task); // 唤醒正在getTask()方法中等待任务的工作线程 notify(); } } protected synchronized Runnable getTask() throws InterruptedException { while (workQueue.size() == 0) { if (isClosed) return null; // 如果工作队列中没有任务,就等待任务 wait(); } return workQueue.removeFirst(); } public synchronized void close() { if (!isClosed) { isClosed = true; workQueue.clear(); // 清空工作队列 interrupt(); // 中断所有的工作线程,该方法继承自ThreadGroup类 } } public void join() { synchronized (this) { isClosed = true; // 唤醒还在getTask()方法中等待任务的工作线程 notifyAll(); } Thread[] threads = new Thread[activeCount()]; //获得线程组中当前所有活着的工作线程 int count = enumerate(threads); // 等待所有工作线程运行结束 for (int i = 0; i < count; i++) { try { // 等待工作线程运行结束 threads[i].join(); } catch (InterruptedException ex) { } } //打印结束时间 System.out.println("end time:" + (new Date())); } private class WorkThread extends Thread { public WorkThread() { // 加入到当前ThreadPool线程组中 super(TracingThreadPool.this, "WorkThread-" + (threadID++)); } public void run() { while (!isInterrupted()) { // isInterrupted()方法继承自Thread类,判断线程是否被中断 Runnable task = null; try { // 得到任务 task = getTask(); } catch (InterruptedException ex) { } // 如果getTask()返回null或者线程执行getTask()时被中断,则结束此线程 if (task == null) return; try { // 运行任务,捕获异常 task.run(); } catch (Throwable t) { t.printStackTrace(); } } } } }
复制文件线程程序:
package cn.ba.watchFile.downLoadFile.finalcode; import java.io.File; /** * 拷贝文件线程 * * @author Administrator * */ public class CopyThread implements Runnable{ private FileBean fileBean; public CopyThread(FileBean fileBean) { this.fileBean = fileBean ; } @Override public void run() { Sample.copyFile(new File(fileBean.getFromFilePath()), new File(fileBean.getToFilePath())); System.out.println("当前执行的线程为:"+Thread.currentThread()+"文件名为:"+fileBean.getFromFilePath()); } }