这个模式非常的简单,就是生产者生产出东西,而消费者消费生产者的产物。
1、首先,我们在主方法里面创建一个队列来存放生产者生产的东西,使用LinkedBlockingQueue,因为这个队列是实现了读写分离的,生产者和消费者并发执行。然后创建一个线程池来执行生产者和消费者,所以生产者和消费者都要实现Runnable接口。
public class Main { public static void main(String[] args) { BlockingQueue<Data> queue = new LinkedBlockingQueue<Data>(); //创建队列,存放生产者生产的东西 ExecutorService pool = Executors.newCachedThreadPool(); //创建线程池 Producer p1 = new Producer(queue); //将队列传给生产者,生产者的产物会放进队列里面 Producer p2 = new Producer(queue); Producer p3 = new Producer(queue); Customer c1 = new Customer(queue); //将队列传给消费者,消费者会消费队列里面的产物 Customer c2 = new Customer(queue); Customer c3 = new Customer(queue); pool.execute(p1); //执行生产者和消费者 pool.execute(p2); pool.execute(p3); pool.execute(c1); pool.execute(c2); pool.execute(c3); try { Thread.sleep(3000); } catch (InterruptedException e) { e.printStackTrace(); } p1.stop(); //将生产者们都停掉 p2.stop(); p3.stop(); try { Thread.sleep(3000); //给点时间给消费者消费 } catch (InterruptedException e) { e.printStackTrace(); } } }2、装入队列的数据类,简单地给id和name属性。
public class Data { private int id; private String name; public Data(int id, String name) { super(); this.id = id; this.name = name; } public int getId() { return id; } public void setId(int id) { this.id = id; } public String getName() { return name; } public void setName(String name) { this.name = name; } }3、生产者,需要继承Thread或者实现Runnable接口
public class Producer implements Runnable{ private BlockingQueue<Data> queue; //要引用主程序的队列,往里面放生产出的产物 private boolean isRunning = true; //判断生产者还生产不 private static AtomicInteger count = new AtomicInteger(); //原子的,作为id生成器 public Producer(){} public Producer(BlockingQueue<Data> queue){ this.queue = queue; } @Override public void run() { while(isRunning){ try { Thread.sleep(1000); //假设生产时间为300毫秒 int id = count.incrementAndGet(); Data data = new Data(id,"数据"+id); System.out.println("当前生产线程:"+Thread.currentThread().getName()+",将id为"+id+"的数据放入缓冲区.."); if(!this.queue.offer(data, 2, TimeUnit.SECONDS)){ //如果添加不成功。。。 } } catch (InterruptedException e) { e.printStackTrace(); } } } public void stop(){ //让生产者停止生产 this.isRunning = false; } }4、消费者,也是要继承Thread或者实现Runnable接口
public class Customer implements Runnable{ private BlockingQueue<Data> queue; public Customer(BlockingQueue<Data> queue) { super(); this.queue = queue; } @Override public void run() { while(true){ try { Data data = queue.take(); System.out.println("当前消费线程:"+Thread.currentThread().getName()+",将id为"+data.getId()+"的数据从缓冲区中取出来"); } catch (InterruptedException e) { e.printStackTrace(); } } } }