生产者消费者模式

时间:2021-05-12 18:46:46

这个模式非常的简单,就是生产者生产出东西,而消费者消费生产者的产物。

1、首先,我们在主方法里面创建一个队列来存放生产者生产的东西,使用LinkedBlockingQueue,因为这个队列是实现了读写分离的,生产者和消费者并发执行。然后创建一个线程池来执行生产者和消费者,所以生产者和消费者都要实现Runnable接口。
public class Main {
	public static void main(String[] args) {
		
		BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>();  //创建队列,存放生产者生产的东西
		
		ExecutorService pool = Executors.newCachedThreadPool();  //创建线程池
		Producer p1 = new Producer(queue);  //将队列传给生产者,生产者的产物会放进队列里面
		Producer p2 = new Producer(queue);
		Producer p3 = new Producer(queue);
		
		Customer c1 = new Customer(queue);  //将队列传给消费者,消费者会消费队列里面的产物
		Customer c2 = new Customer(queue);
		Customer c3 = new Customer(queue);
		
		pool.execute(p1);  //执行生产者和消费者
		pool.execute(p2);
		pool.execute(p3);
		pool.execute(c1);
		pool.execute(c2);
		pool.execute(c3);
		
		try {
			Thread.sleep(3000);
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
		
		p1.stop();  //将生产者们都停掉
		p2.stop();
		p3.stop();
		
		try {
			Thread.sleep(3000);  //给点时间给消费者消费
		} catch (InterruptedException e) {
			e.printStackTrace();
		}
	}
}
2、装入队列的数据类,简单地给id和name属性。
public class Data {
	private int id;
	private  String name;
	
	public Data(int id, String name) {
		super();
		this.id = id;
		this.name = name;
	}
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
}
3、生产者,需要继承Thread或者实现Runnable接口
public class Producer implements Runnable{
	private BlockingQueue<Data> queue; //要引用主程序的队列,往里面放生产出的产物
	private boolean isRunning = true;  //判断生产者还生产不
	private static AtomicInteger count = new AtomicInteger();  //原子的,作为id生成器
	public Producer(){}
	public Producer(BlockingQueue<Data> queue){
		this.queue = queue;
	}
	@Override
	public void run() {
		while(isRunning){
			try {
				Thread.sleep(1000);  //假设生产时间为300毫秒
				int id = count.incrementAndGet();
				Data data = new Data(id,"数据"+id);
				System.out.println("当前生产线程:"+Thread.currentThread().getName()+",将id为"+id+"的数据放入缓冲区..");
				if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){
					//如果添加不成功。。。
				}
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
			
		}
		
	}
	
	public void stop(){ //让生产者停止生产
		this.isRunning = false;
	}
}
4、消费者,也是要继承Thread或者实现Runnable接口
public class Customer implements Runnable{
	private BlockingQueue<Data> queue;
	public Customer(BlockingQueue<Data> queue) {
		super();
		this.queue = queue;
	}

	@Override
	public void run() {
		while(true){
			try {
				Data data = queue.take();
				System.out.println("当前消费线程:"+Thread.currentThread().getName()+",将id为"+data.getId()+"的数据从缓冲区中取出来");
			} catch (InterruptedException e) {
				e.printStackTrace();
			}
		}
		
	}

}