在一些开发场景中,经常会遇到这样的场景,一边是产生数据段,一边是消费数据端。比如在日志系统中,为了分析消费者行为,会在网站中埋点,记录登录者的信息,比如浏览了多长时间,买个什么产品,收藏了哪些产品等一系列用户信息。这些消费者的信息经过埋点调用接口,就会产生大量的数据,可以简单的看做生产者。而消费者可以看做后续对这些数据的处理。而生产消费者模式就是为了解决以上类似场景的设计模式。刚一开始了解此模式的时候,我也带有疑问,为什么不用消费者直接调用生产者,获取信息进行处理,随着深入了解,才知道若直接调用会有以下不足点:
1:消费者和生产者代码耦合度太高,比如随着业务变化,消费者端代码可以会变化,你就要重写消费者代码,但消费者代码一部分耦合在生产者中,又要更改生产者代码。
2:效率不高。比如消费者调用生产者接口获取信息进行处理,因为调用时同步的,消费者要一直等待生产者产生信息返回,若生产者产生信息很慢,但消费者就要一直等待生产者产生结果。故效率不高
为了解决以上的问题,就是把生产者和消费者的解耦,把生产者的生产的信息放在缓冲区中,消费者从缓冲区中读取数据。这样生产者和消费者就解耦开。生产者不需考虑消费者的效率,而是把信息保存在缓冲区中,留消费者消费。这个场景直接让我想到了分布式消息队列kafka,kafka就是消费者消费数据,生产者生产数据并持久化在本地。
缓冲区要使用阻塞队列,为了简单,你可以使用jdk1.5后提供的现成的阻塞队列
ArrayBlockingQueue :一个由数组结构组成的有界阻塞队列。
LinkedBlockingQueue :一个由链表结构组成的有界阻塞队列。
PriorityBlockingQueue :一个支持优先级排序的*阻塞队列
SynchronousQueue:一个不存储元素的阻塞队列。
当然你也可以自己实现,下面就是简单用多线程实现的生产消费者模式,代码案列//实现一个阻塞队列缓冲区
public class BlockQt {
private Object empty = new Object();//队列为空的对象锁
private Object full = new Object();//队列为满的对象锁
private Queue<Object> linkedList = new LinkedList<Object>();//队列
private int maxLength = 10;//队列最大长度
private int num =50;//递减生成数据
//获取队列中的数据
public Object take() throws InterruptedException{
synchronized (empty) {//获取empty对象锁
if(linkedList.size()==0){//若队列为空
empty.wait();//获取empty对象锁的线程(消费者)阻塞
}
synchronized (full) {//若队列不为空,获取full对象锁
if(linkedList.size()==maxLength){//当队列满的时候,唤醒等待full对象锁的线程生产者,因为下面有消费者消费了,队列已经不为满的了
full.notifyAll();//唤醒生产者
}
Object obj = linkedList.poll();//消费数据
System.out.println("线程名称"+Thread.currentThread().getName()+"获取数据为:"+obj);
return obj;
}
}
}
public void off(Object object) throws InterruptedException{
synchronized(empty){//获取empty对象锁,
if(linkedList.size()==0){//若linkedList为空,唤醒消费者线程,以为下面已经生产数据了
empty.notifyAll();
}
synchronized (full) {//获取full对象锁
if(linkedList.size()==maxLength){//若队列满了,生产者等待
full.wait();
}
linkedList.add(object);//生产数据
}
}
}
public int getNum() {
return num;
}
public void setNum(int num) {
this.num = num;
}
public static void main(String[] args) {
ExecutorService executor = Executors.newCachedThreadPool();
BlockQ queue = new BlockQ();
executor.execute(new Producer1(queue));
executor.execute(new Producer1(queue));
executor.execute(new Consumer1(queue));
executor.execute(new Consumer1(queue));
}
}
//消费者
class Consumer1 implements Runnable{
BlockQ queue;
Consumer1(BlockQ que){
this.queue = que;
}
@Override
public void run() {
try {
while(true){
Object o = queue.take();
System.out.println(Thread.currentThread().getName()+"获取数据"+o);
}
} catch (InterruptedException e) {
e.printStackTrace();
}
}
}
//生产者
class Producer1 implements Runnable{
BlockQ queue;
Producer1(BlockQ que){
this.queue = que;
}
@Override
public void run() {
while(true){
//System.out.println(queue.getNum());
if(queue.getNum()<0){
break;
}
queue.offer(queue.getNum());
queue.setNum(queue.getNum()-1);
System.out.println(Thread.currentThread().getName()+"插入数据"+queue.getNum());
}
}
}