基于阻塞队列实现消费者和生产者

时间:2022-02-03 17:41:20

不知道为什么原来写的文章内容被清空,可能是没有上传成功吧,那我就再写一遍好了。。。

关于阻塞队列的说明和实现在另一篇文章中已经做了比较消息的介绍了,这里就不再赘述。

生产者-消费者 算得上是计算机领域中经典的问题之一了,生产者生产数据,消费者消费数据,类似于小学数学中的“一个池子进水,一个池子出水”问题。废话不多说了,直接上代码吧:

PS:本来想粘代码图片上来,但是考虑到reader对象很可能是需要解决实现问题,为了大家方便,所以还是文本形式的粘贴代码吧。。。

1.consumer类:

package BlockingQueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;


/** 
 * <p>创建人:刘星 创建日期:2017-2-24 上午10:53:56</p>
 * <p>功能描述:(消费者实现类)</p>
 * @version V1.0  
 */
public class Consumer extends BlockingDelayQueue implements Runnable{
private BlockingQueue<String> queue;//阻塞队列
public Consumer(ArrayBlockingQueue<String> arrayBlcokingQueue){//构造方法
this.queue = arrayBlcokingQueue;
}
boolean running = true;//运行状态,用来标识是否需要从队列中消费
public void run(){
try{
String data = null;
while(running){
System.out.println("【消费者线程正在消费数据。。。。。。】");
data = queue.poll(10, TimeUnit.SECONDS);
}
if(null != data){
System.out.println("【消费者线程消费数据,消费的数据为:】"+data);
System.out.println("【消费者线程正在消费数据。。。】");
}else{
running = false;
}
}catch(InterruptedException ie){
ie.printStackTrace();
}finally{
System.out.println("【退出消费者线程】");
}
}

public void stop(){
running = false;
}
}

2.生产者实现类:

package BlockingQueue;

import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.TimeUnit;

/** 
 * <p>创建人:刘星  创建日期:2017-2-24 上午11:09:35</p>
 * <p>功能描述:(手填)</p>
 * @version V1.0  
 */
public class Productor implements Runnable {
boolean running = true;
BlockingQueue<String> queue = null;
public Productor(ArrayBlockingQueue<String> arrayBlockingQueue){
this.queue = arrayBlockingQueue;
}
@Override
public void run() {
try{
boolean success = false;
while(running){
System.out.println("【生产者线程开始工作。。。。。。。】");
success = queue.offer("data one", 10,TimeUnit.SECONDS);
}
if(success){
System.out.println("【生产者线程生产完成。。。。。。。】");
}else{
running = false;
}
}catch(InterruptedException ie){
ie.printStackTrace();
}finally{
System.out.println("【生产者线程退出 。。。。。。。】");
}
}
public void stop(){
running = false;
}
}

3.测试类实现

package BlockingQueue;
import java.util.concurrent.*;


/** 
 * <p>创建人:admin 创建日期:2017-2-24 上午9:58:46</p>
 * <p>功能描述:(手填)</p>
 * @version V1.0  
 */
public class BlockingDelayQueue {
public static void main(String[] args) throws InterruptedException {
ArrayBlockingQueue<String> queue = new ArrayBlockingQueue<String>(10);
Productor producer1 = new Productor(queue);
Productor producer2 = new Productor(queue);
Productor producer3 = new Productor(queue);
Consumer consumer = new Consumer(queue);
// 借助Executors
ExecutorService service = Executors.newCachedThreadPool();
// 启动线程
service.execute(producer1);
service.execute(producer2);
service.execute(producer3);
service.execute(consumer);
Thread.sleep(3000);
producer1.stop();
producer2.stop();
producer3.stop();
Thread.sleep(2000);
service.shutdown();

consumer.stop();

}
}

中间的几次sleep是为了让线程多执行一段时间。

菜鸟选手,大家有什么批评意见建议神马的可以在评论区留言讨论,菜鸟接受一切指正