public class BlockingQueueTest {
private static AtomicInteger count = new AtomicInteger(0);
private static AtomicInteger countCreate = new AtomicInteger(0);
public static void main(String args[]) {
//定义一个阻塞队列,存放文件信息
BlockingQueue fileQueue = new ArrayBlockingQueue(5);
String path = "F:\\Song";
File root = new File(path);
//生产者线程去遍历文件,放入队列
FileCrawler fileCrawler = new FileCrawler(fileQueue, root);
//消费者线程去遍历队列,取出文件
Indexer indexer = new Indexer(fileQueue);
//开启几个生产者线程开始遍历文件
for (File file : ()) {
new Thread(new FileCrawler(fileQueue, file)).start();
}
//开启7个消费者者线程开始取出文件
for (int i = 0; i < 7; i++) {
new Thread(new Indexer(fileQueue)).start();
}
try {
(2000);
} catch (InterruptedException e) {
(); //To change body of catch statement use File | Settings | File Templates.
}
("生产者生产:" + ());
("消费者取到:" + ());
}
static class FileCrawler implements Runnable {
private final BlockingQueue fileQueue;
private final File root;
FileCrawler(BlockingQueue fileQueue, File root) {
= fileQueue;
= root;
}
public void run() {
try {
("生产者开始生产:" + ());
crawl(root);
} catch (InterruptedException e) {
();
}
}
private void crawl(File root) throws InterruptedException {
File[] files = ();
if (files != null) {
for (File file : files) {
if (()) {
crawl(file);
} else {
(file); //put
();
}
}
}
}
}
static class Indexer implements Runnable {
private final BlockingQueue fileQueue;
Indexer(BlockingQueue fileQueue) {
= fileQueue;
}
public void run() {
while (true) {
try {
("消费者开始消费:" + ());
File file = (File) (); //take
();
(());
} catch (InterruptedException e) {
(); //To change body of catch statement use File | Settings | File Templates.
}
}
}
}
}