在多线程环境下,通过 BlockingQueue,实现生产者-消费者场景。
Toast 被生产和消费的对象。
ToastQueue 继承了 LinkedblockingQueue ,用于中间存储 Toast 。
Producer 生产 Toast ,并将生产出来的 Toast 放进队列 initialToastQ 中。
Processor 加工 Toast,从 initialToastQ 中获得生产出来的 Toast,将其加工并放进队列 finishedToast 中。
Consumer 消费 Toast,从 finishedToastQ 中获得加工完成的 Toast。
ThreadHelper 工具类,用于输出线程相关信息。
ProducerConsumerDemo 演示这个场景
代码实现:
Toast 实现
public class Toast { private int id; public Toast(int id){
this.id = id;
} public String toString(){
return " toast#" + id;
}
}
ToastQueue 实现
import java.util.concurrent.LinkedBlockingQueue; public class ToastQueue extends LinkedBlockingQueue<Toast> {
private static final long serialVersionUID = 1L;
}
Producer 循环生产 Toast
import java.util.concurrent.TimeUnit; public class Producer implements Runnable { private ToastQueue toastQueue;
private int count; public Producer(ToastQueue toastQueue){
this.toastQueue = toastQueue;
this.count = 0;
} @Override
public void run() {
try {
while (true){
TimeUnit.MILLISECONDS.sleep(100); Toast toast = new Toast(count);
count++;
toastQueue.put(toast);
ThreadHelper.print(" produced " + toast);
}
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Processor 从 intialToastQ 获得 Toast ,对其加工,并放进 finishedToastQ 中。
import java.util.concurrent.TimeUnit; public class Processor implements Runnable { private ToastQueue initialToastQ;
private ToastQueue finishedToastQ; public Processor(ToastQueue initialToastQ, ToastQueue finishedToastQ){
this.initialToastQ = initialToastQ;
this.finishedToastQ = finishedToastQ;
} @Override
public void run() {
try {
while (true){
Toast toast = initialToastQ.take(); ThreadHelper.print(" processing " + toast); TimeUnit.MILLISECONDS.sleep(180); finishedToastQ.put(toast);
}
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
Consumer 消耗 Toast
public class Consumer implements Runnable { private ToastQueue finishedToastQ; public Consumer(ToastQueue finishedToastQ){
this.finishedToastQ = finishedToastQ;
} @Override
public void run() {
try {
while (true){
Toast toast = finishedToastQ.take();
ThreadHelper.print(" consumed " + toast);
}
}catch (InterruptedException e) {
e.printStackTrace();
}
}
}
ThreadHelper 线程帮助类
public class ThreadHelper {
public static void print(String msg){
System.out.println("[" + Thread.currentThread().getName() + " ] " + msg);
}
}
演示烤面包的生产、加工、消费的场景
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit; public class ProducerConsumerDemo { public static void main() throws InterruptedException{ ToastQueue initialToastQ = new ToastQueue();
ToastQueue finishedToastQ = new ToastQueue(); ExecutorService exec = Executors.newCachedThreadPool();
exec.execute(new Producer(initialToastQ));
exec.execute(new Processor(initialToastQ, finishedToastQ));
exec.execute(new Consumer(finishedToastQ)); TimeUnit.SECONDS.sleep(2);
exec.shutdownNow();
}
}
输出结果:
[pool--thread- ] processing toast#
[pool--thread- ] produced toast#
[pool--thread- ] produced toast#
[pool--thread- ] processing toast#
[pool--thread- ] consumed toast#
[pool--thread- ] produced toast#
[pool--thread- ] produced toast#
[pool--thread- ] processing toast#
[pool--thread- ] consumed toast#
[pool--thread- ] produced toast#
[pool--thread- ] produced toast#
[pool--thread- ] processing toast#
[pool--thread- ] consumed toast#
[pool--thread- ] produced toast#
[pool--thread- ] produced toast#
[pool--thread- ] processing toast#
[pool--thread- ] consumed toast#
[pool--thread- ] produced toast#
[pool--thread- ] processing toast#
[pool--thread- ] consumed toast#
[pool--thread- ] produced toast#
[pool--thread- ] produced toast#
[pool--thread- ] processing toast#
[pool--thread- ] consumed toast#
[pool--thread- ] produced toast#
[pool--thread- ] produced toast#
[pool--thread- ] processing toast#
[pool--thread- ] consumed toast#
[pool--thread- ] produced toast#
[pool--thread- ] produced toast#
[pool--thread- ] processing toast#
[pool--thread- ] consumed toast#
[pool--thread- ] produced toast#
[pool--thread- ] produced toast#
[pool--thread- ] processing toast#
[pool--thread- ] consumed toast#
[pool--thread- ] produced toast#
[pool--thread- ] processing toast#
[pool--thread- ] consumed toast#
[pool--thread- ] produced toast#
java.lang.InterruptedException
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.reportInterruptAfterWait(AbstractQueuedSynchronizer.java:)
at java.util.concurrent.locks.AbstractQueuedSynchronizer$ConditionObject.await(AbstractQueuedSynchronizer.java:)
at java.util.concurrent.LinkedBlockingQueue.take(LinkedBlockingQueue.java:)
at concurrencyProducerConsumer.Consumer.run(Consumer.java:)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)
at java.lang.Thread.run(Thread.java:)
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:)
at concurrencyProducerConsumer.Producer.run(Producer.java:)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)
at java.lang.Thread.run(Thread.java:)
java.lang.InterruptedException: sleep interrupted
at java.lang.Thread.sleep(Native Method)
at java.lang.Thread.sleep(Thread.java:)
at java.util.concurrent.TimeUnit.sleep(TimeUnit.java:)
at concurrencyProducerConsumer.Processor.run(Processor.java:)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:)
at java.lang.Thread.run(Thread.java:)
参考资料
Page 868, Produer-consumers and queue, Thinking in Java