监控一个目录,如果有新文件就启动一个线程复制到另一个目录

时间:2022-03-18 20:59:04

要求:

实时监控一个目录,如果目录里有新增文件就启动一个复制文件线程进行文件复制,把文件复制到另一个目录里。

实时监控程序:

package cn.ba.watchFile.downLoadFile.finalcode;

/**
 * 监控文件夹
 * 经测试基本不可用,在一个40g的很深的目录下去新建和删除文件5分钟都没结果,主要原因是需要对每一个Path进行注册监控。
 */
import java.io.File;
import java.io.FileInputStream;
import java.io.FileOutputStream;
import java.io.IOException;
import java.nio.file.FileSystems;
import java.nio.file.Path;
import java.nio.file.Paths;
import java.nio.file.WatchEvent;
import java.nio.file.WatchKey;
import java.nio.file.WatchService;
import java.util.concurrent.LinkedBlockingQueue;
import static java.nio.file.StandardWatchEventKinds.*;

public class Sample {


    private WatchService watcher;
    private Path path;
    private String filePath,toFilePath;
    private File file;
    private long startLength,endLength;
    public static LinkedBlockingQueue<String> lbq = new LinkedBlockingQueue<String>();;// 先进先出队列
    //创建线程池
    public static TracingThreadPool tracingThreadPool = new TracingThreadPool(10);

    public Sample(Path path) throws IOException {
        this.path = path;
        // 获取系统监控器
        watcher = FileSystems.getDefault().newWatchService();
        this.path.register(watcher, OVERFLOW, ENTRY_CREATE, ENTRY_DELETE,
                ENTRY_MODIFY);
    }

    // 文件复制
    public static void copyFile(File fromFile, File toFile) {
        try {
            FileInputStream in = new FileInputStream(fromFile);
            FileOutputStream os = new FileOutputStream(toFile);
            byte[] b = new byte[1024];
            int n = 0;
            while ((n = in.read(b)) != -1) {
                os.write(b, 0, n);
            }
            in.close();
            os.close();
        } catch (Exception e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }
    }

    public void handleEvents() throws InterruptedException {
        // 开始加载数据文件
        while (true) {
            WatchKey key = watcher.take();
            for (WatchEvent<?> event : key.pollEvents()) {
                // 获取事件类型
                final WatchEvent.Kind<?> kind = event.kind();
                // 获取文件名
                @SuppressWarnings("unchecked")
                final WatchEvent<Path> pathWatchEvent = (WatchEvent<Path>) event;
                final Path fileName = pathWatchEvent.context();
                if (kind == ENTRY_CREATE) {
                    filePath = path + File.separator + fileName;
                    
                    file = new File(filePath);
                    startLength = file.length();
                    System.out.println("新增文件" + path + File.separator
                            + fileName);
                    Thread.sleep(20);//休息10ms
                    endLength = file.length();
                    if(startLength==endLength){
                        lbq.add(fileName.toString());
                        // pool.submit(task);
                        //从队列中取出
                        String poll = lbq.poll();
                        filePath = path + File.separator + poll;
                        toFilePath=path + File.separator+"zipbck"+File.separator+poll;
                        System.out.println("取出文件" + poll);
                        if (endLength<5000000) {
                            Thread.sleep(10);//休息10ms
                        }else {
                            Thread.sleep(500);//休息500ms
                        }
                        tracingThreadPool.addTask(new CopyThread(new FileBean(filePath, toFilePath)));
                        //4 等待工作线程完成所有的任务
//                           tracingThreadPool.join();
                         //5 关闭线程池
//                           tracingThreadPool.close();
//                        // 把取出的文件复制到另外一个目录下
//                        copyFile(new File(filePath), new File(toFilePath));
                        System.out.println("文件"+fileName+"复制完成!!");
                    }
                    

                } else if (kind == ENTRY_DELETE) {
                } else if (kind == ENTRY_MODIFY) {
                } else if (kind == OVERFLOW) {
                }
            }
            // IMPORTANT: the key must be reset after processed
            if (!key.reset()) {
                return;
            }
        }
    }

    public static void main(String[] args) throws InterruptedException,
            IOException {

        new Sample(Paths.get("F://")).handleEvents();
    }

}

FileBean程序:

package cn.ba.watchFile.downLoadFile.finalcode;
/**
 * 用于传递参数的对象
 * @author Administrator
 *
 */
public class FileBean {
private String fromFilePath;
private String  toFilePath;
public String getFromFilePath() {
    return fromFilePath;
}
public void setFromFilePath(String fromFilePath) {
    this.fromFilePath = fromFilePath;
}
public String getToFilePath() {
    return toFilePath;
}
public void setToFilePath(String toFilePath) {
    this.toFilePath = toFilePath;
}
public FileBean(String fromFilePath, String toFilePath) {
    super();
    this.fromFilePath = fromFilePath;
    this.toFilePath = toFilePath;
}
@Override
public String toString() {
    return "FileBean [fromFilePath=" + fromFilePath + ", toFilePath="
            + toFilePath + "]";
}

}

线程池程序:

package cn.ba.watchFile.downLoadFile.finalcode;

import java.util.Date;
import java.util.LinkedList;

public class TracingThreadPool extends ThreadGroup {
    // 线程池是否关闭
     private boolean isClosed = false;
     // 表示工作队列
     private LinkedList<Runnable> workQueue;
     // 表示线程池ID
     private static int threadPoolID;
     // 表示工作线程ID
     private int threadID;
    public TracingThreadPool(int poolSize) {
        super("ThreadPool-" + (threadPoolID++));
        setDaemon(true);
          // 创建工作队列
          workQueue = new LinkedList<Runnable>();
        // 打印起始时间
          System.out.println("start time:" + (new Date()));
          for (int i = 0; i < poolSize; i++)
               // 创建并启动工作线程
               new WorkThread().start();
             }
    public synchronized void addTask(Runnable task) {
          // 线程池被关则抛出IllegalStateException异常
          if (isClosed) {
           throw new IllegalStateException();
          }
          if (task != null) {
           workQueue.add(task);
           // 唤醒正在getTask()方法中等待任务的工作线程
           notify();
          }
         }
    protected synchronized Runnable getTask() throws InterruptedException {
          while (workQueue.size() == 0) {
           if (isClosed)
            return null;
           // 如果工作队列中没有任务,就等待任务
           wait();
          }
          return workQueue.removeFirst();
         }
    
     public synchronized void close() {
          if (!isClosed) {
           isClosed = true;
           workQueue.clear(); // 清空工作队列
           interrupt(); // 中断所有的工作线程,该方法继承自ThreadGroup类
          }
         }
     
     public void join() {
          synchronized (this) {
           isClosed = true;
           // 唤醒还在getTask()方法中等待任务的工作线程
           notifyAll();
          }

          Thread[] threads = new Thread[activeCount()];
          //获得线程组中当前所有活着的工作线程
          int count = enumerate(threads);
          // 等待所有工作线程运行结束
          for (int i = 0; i < count; i++) {
           try {
            // 等待工作线程运行结束
            threads[i].join();
           } catch (InterruptedException ex) {
           }
          }
          //打印结束时间
          System.out.println("end time:" + (new Date()));
         }
     private class WorkThread extends Thread {
          public WorkThread() {
           // 加入到当前ThreadPool线程组中
           super(TracingThreadPool.this, "WorkThread-" + (threadID++));
          }

          public void run() {
           while (!isInterrupted()) { // isInterrupted()方法继承自Thread类,判断线程是否被中断
            Runnable task = null;
            try {
             // 得到任务
             task = getTask();
            } catch (InterruptedException ex) {
            }

            // 如果getTask()返回null或者线程执行getTask()时被中断,则结束此线程
            if (task == null)
             return;

            try {
             // 运行任务,捕获异常
             task.run();
            } catch (Throwable t) {
             t.printStackTrace();
            }
           }
          }
    }
}

复制文件线程程序:

package cn.ba.watchFile.downLoadFile.finalcode;

import java.io.File;

/**
 * 拷贝文件线程
 * 
 * @author Administrator
 *
 */
public class CopyThread implements Runnable{
    private FileBean fileBean;
    public CopyThread(FileBean fileBean) {
          this.fileBean  = fileBean ;
         }
    @Override
    public void run() {
        Sample.copyFile(new File(fileBean.getFromFilePath()), new File(fileBean.getToFilePath()));
        System.out.println("当前执行的线程为:"+Thread.currentThread()+"文件名为:"+fileBean.getFromFilePath());
    }

}