大家好,我是陶朱公Boy,又跟大家见面了。
前言
今天跟大家分享一款基于“生产者消费者模式”下实现的组件。
该组件是作者偶然在翻阅公司一中间件源码的时候碰到的,觉得设计的非常精美、巧妙,花了点时间整理成文分享给大家。
生产者和消费者彼此之间不进行通信,中间通过一个容器(如阻塞队列)来解决强解耦问题。
阻塞队列起到了一定的数据缓冲作用,平衡了生产者和消费者对数据的处理能力。by—《Java并发编程的艺术》
组件介绍
该组件基于生产者消费者模式来编码实现,是一款本地化解决流量削峰、解耦、异步的利器。
此组件由以下知识点构成:线程池、阻塞队列、LockSupport、Executor框架、final、volatile。此外你还能接触到hash取模算法、接口回调等机制。
组件本身代码量并不大,但知识点比较密集,所以希望大家能花一点时间认真看完。我将从适用场景、架构设计、源码解析这三个角度给大家讲介绍这款组件。
适用场景
☆场景一:报表下载
现在很多后台下载功能,普适的做法是先筛选转换数据,然后对接云存储平台进行保存,最后生成一个可访问的文件地址,整个过程非常耗时。
其实完全可以生产者发送一个下载请求就结束响应,服务端异步的去消费这个任务请求,处理完生成地址后,再进行通知(比如更新对应数据库文件字段)这是一种异步体现,也解耦了生产者与消费者原来的同步交互方式,整体效率会更高。
☆场景二:日志埋点
有些应用它的QPS非常高,产生的数据本身并不是特别重要比如埋点的日志,如果实时调用埋点平台可能给平台侧造成非常大的访问压力。所以这个时候中间的阻塞队列就起到了一定的缓冲作用,等一段时间或队列数据量达到一定量(参赛可动态配置)再一次性拿出来转换后,最后批量传递出去。
☆场景三:Yana(阿里内部一款基于邮件分享技术文章的工具)
《Java并发编程的艺术》作者方腾飞有分享过他们基于生产者消费者模式实现的一个案例。
他们团队早期有一个习惯,大家如果在平时工作当中遇到比较好的文章,会通过邮件转发到专属邮箱进行内部分享,这样其他成员就能看到这篇文章,甚至大家会在底部评论、回复、交流。
但期间遇到一个问题:一旦时间一长,以前的文章很难被检阅。邮件列表的可视化太差,也不能进行归类,有些新入职员工也看不到以往其他成员分享过的文章。
基于这些问题,有几个小伙伴自发的趁业余时间开发了一个简易工具--yana。该工具功能就是:生产者线程会先往邮箱里将所有分享的邮件下载下来(包括附件、图片、邮件回复等内容),下载完成后,通过confluence的Web Service接口,把文章保存到confluence中去。这样不仅好维护,而且留存问题也得到了解决。
不过随着这款工具在其他部门的推广,发现系统响应时间越来越长。只要单位时间内积累邮件一多,一次处理完可能就要花费几分钟。
于是他们升级了方案,把架构演进到了V2.0版本。整体思路是使用了生产者消费者模式来处理。
思路如下:生产者线程去邮件系统下载完邮件后,不会立即调用confluence的web service接口,而是选择把下载的内容放入阻塞队列后立即返回。而消费者启动CPU*2个线程数来并行处理队列中的邮件,从之前的单线程演变成了多线程处理,生产者和消费者实现了异步、解耦。经过观察,比起V1.0同步处理,速度比之前要快好了几倍。
...
架构设计
☆对象图
该组件支持“多生产者多消费者”场景,在多核时代充分利用CPU多核机制,消费者多线程并行处理阻塞队列中的数据,加快任务处理速度。
☆逻辑架构图
该组件内部持有一个工作线程对象数组,当生产者提交数据的时候,会先经过一个route组件(采用hash取模算法),动态路由到其中一个线程对象内的阻塞队列中存储起来。等到满足一定条件,工作线程就会将自身线程对象内阻塞队列中的数据转换成指定容量的List对象(BlockQueue的drainto方法有支持),然后调用已经注册的回调函数把数据传递出去。
☆流程图
我们一起来看下这张工作线程内部运行流程图:
首选我们说此组件对象内部持有一个工作线程对象数组,每个工作线程对象内部持有一个有界阻塞队列实例对象(ArrayBlockingQueue),方法有run(),add(),timeout()方法。
生产者调用组件自身的add方法后,add方法内部通过hash取模算法动态路由到某个工作线程对象内部的blockingQueue中去。
timeout方法是这款组件设计的一个亮点(容错性设计)????。
假如实际运行过程中,工作线程内部的阻塞队列内一直只占少许几个对象,如果仅仅只判断队列中的元素个数是否超出指定阈值,再去处理队列中的数据,一旦长时间未超出,工作线程就会一直被阻塞,也将导致队列中的数据长时间堆积。
所以新增的这个timeout()这个机制能应对一旦队列中的数据长时间积压,它会根据时间差即判断当前时间距离上次任务处理时间是否超出指定阈值(可配置),如果超出了也会强制处理队列中的数据。
源码赏析
public class ProducerAndConsumerComponet {
private final static Logger log = LoggerFactory.getLogger(ProducerAndConsumerComponet.class);
//组件持有一个工作线程对象数组
private final WorkThread<T>[] workThreads;
private AtomicInteger index;
private static final Random r = new Random();
//任务定时器
private static ScheduledExecutorService scheduleThreadPool = new ScheduledThreadPoolExecutor(1);
//组件初始化完成工作线程的新建
private static ExecutorService executorService = Executors.newCachedThreadPool();
/**
* 构造器
* @param threadNum 默认新建的消费者线程个数
* @param limitSize 队列长度阈值;超过将唤醒阻塞的线程
* @param period 前后两个任务的执行周期 (for example :200ms 代表前面一次任务执行完毕后,200毫秒后下一个任务继续执行)
* @param capacity 工作线程内部的有界阻塞队列的初始容量大小
* @param processor 回调接口(初始化组价实例的时候需要传递)
*/
public ProducerAndConsumerComponet(int threadNum,int limitSize, int period, int capacity, Processor<T> processor) {
this.workThreads = new WorkThread[threadNum];
if (threadNum > 1) {
this.index = new AtomicInteger();
}
for(int i = 0; i < threadNum; ++i) {
WorkThread<T> workThread = new WorkThread("workThread"+ "_" + i, limitSize, period, capacity, processor);
this.workThreads[i] = workThread;
executorService.submit(workThread);
//调用scheduleAtFixedRate时,会向ScheduledThreadPoolExecutor的DelayQueue添加一个实现了RunableScheduleFuture接口的
//ScheduleFutureTask
scheduleThreadPool.scheduleAtFixedRate(workThread::timeout, r.nextInt(50), period, TimeUnit.MILLISECONDS);
}
}
/**
* 生产者线程将对象添加到对应消费者线程对象内部的阻塞队列中去<br>
* 内部采用HASH取模算法进行动态路由
* @param item 待添加的对象
* @return true:添加成功 false:添加失败
*/
public boolean add(T item) {
// log.info("add item={}",item);
int len = this.workThreads.length;
//log.info("add len..."+len);
if (len == 1) {
return this.workThreads[0].add(item);
} else {
int mod = this.index.incrementAndGet() % len;
// log.info("路由到this.workThreads[mod]={}",mod);
return this.workThreads[mod].add(item);
}
}
/**
* 消费者线程
*/
private static class WorkThread<T> implements Runnable {
/**
* 工作线程命名
*/
private final String threadName;
/**
* 队列中允许存放元素个数限制<br>
* 超出将从队列中取出此大小的元素转成List对象
*/
private final int queueSizeLimit;
/**
* 前后两个任务的执行周期
*/
private int period;
/**
* 用来记录任务的即时处理时间
*/
private volatile long lastFlushTime;
/**
* 当前工作线程对象
*/
private volatile Thread currentThread;
/**
* 工作线程对象内部的阻塞队列
*/
private final BlockingQueue<T> queue;
/**
* 回调接口
*/
private final Processor<T> processor;
/**
* 消费者线程构造器
* @param threadName 线程名
* @param queueSizeLimit 指定队列阈值(可配置)
* @param period 前后两个任务的执行周期(可配置)
* @param capacity 阻塞队列初始容量
* @param processor 回调接口
*/
public WorkThread(String threadName, int queueSizeLimit, int period, int capacity, Processor<T> processor) {
this.threadName = threadName;
this.queueSizeLimit = queueSizeLimit;
this.period = period;
this.lastFlushTime = System.currentTimeMillis();
this.processor = processor;
this.queue = new ArrayBlockingQueue(capacity);
}
/**
* 往阻塞队列中添加元素
* @param item 添加的对象
* @return true:添加成功 false:添加失败
*/
public boolean add(T item) {
// log.info("add result:"+item);
boolean result = this.queue.offer(item);
// log.info("resultP{}",result);
this.checkQueueSize();
return result;
}
/**
* 当前时间与上次任务处理时间差是否超过指定阈值;如果超过触发start方法
*/
public void timeout() {
// log.info("{}====check timeout",currentThread.getName());
if (System.currentTimeMillis() - this.lastFlushTime >= (long)this.period) {
log.info("当前时间距离上次任务处理时间周期={}超出指定阈值={}",System.currentTimeMillis() - this.lastFlushTime ,period);
this.start();
}
}
/**
* 唤醒被阻塞的工作线程
*/
private void start() {
log.info("执行start方法,唤醒被阻塞的线程"+currentThread.getName());
LockSupport.unpark(this.currentThread);
}
/**
* 判断队列实际长度是否超过指定阈值;如果超过触发start方法
*/
private void checkQueueSize() {
if (this.queue.size() > this.queueSizeLimit) {
log.info("{}队列大小={}超出指定阈值={}",currentThread.getName(),this.queue.size() ,queueSizeLimit);
this.start();
}
}
/**
* 将队列中的元素通过调用<code>drainTo</code>方法,转成List对象(容量受queueSizeLimit限制),最后调用回调函数传递List对象
*/
public void flush() {
if(queue.isEmpty()){
return;
}
this.lastFlushTime = System.currentTimeMillis();
List<T> temp = new ArrayList(this.queueSizeLimit);
int size = this.queue.drainTo(temp, this.queueSizeLimit);
if (size > 0) {
log.info("{}被唤醒后,开始执行任务:从队列中腾出大小为{}的数据且转成List对象",currentThread.getName(),size);
try {
//执行回调函数
this.processor.process(temp);
} catch (Throwable var4) {
System.out.println("process error");
}
}
}
/**
* 判断队列实际大小是否超过指定阈值亦或距离上次任务处理时间差是否超过指定阈值
* @return true:满足触发条件 false:不满足触发条件
*/
private boolean canFlush() {
return this.queue.size() > this.queueSizeLimit || System.currentTimeMillis() - this.lastFlushTime > (long)this.period;
}
@Override
public void run() {
this.currentThread = Thread.currentThread();
this.currentThread.setName(this.threadName);
//当前线程没有被其他线程打断
while(!this.currentThread.isInterrupted()) {
//死循环的判断是否满足触发条件(队列实际大小是否超出指定阈值或距离上次任务时间是否超出指定阈值),如果未满足将阻塞当前线程,避免死循环给系统带来性能开销
while(!this.canFlush()) {
//当前工作线程被阻塞
log.info("线程被阻塞...");
LockSupport.park(this);
}
//一旦add方法执行的时候判断存放的阻塞队列元素大小超出自定制阈值亦或距离上次任务处理时间差超出指定阈值,就会调用LockSupport.unpark方法解除阻塞的线程
//一旦线程被解除阻塞,就会触发此方法,将队列元素转成List对象且调用已经注册的回调函数
// log.info("阻塞线程被唤醒");
this.flush();
}
}
}
复制代码
测试用例
/**
* 前置条件:
* #主件初始化默认新增两个工作线程
* config.threadNum=2
* config.period=12000
* config.queueSizeLimit=3
* config.capacity=10
*/
@DisplayName("队列大小超出指定阈值")
@Test
void add() {
for(int i=0;i<10;i++){
producerAndConsumerComponet.add("1");
}
try {
TimeUnit.SECONDS.sleep(10);
}catch (Exception e){
e.printStackTrace();
}
}
结果打印:
2022-10-29 21:04:51,656 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:165] - workThread_1队列大小=4超出指定阈值=3
2022-10-29 21:04:51,658 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 执行start方法,唤醒被阻塞的线程workThread_1
2022-10-29 21:04:51,659 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_1被唤醒后,开始执行任务:从队列中腾出大小为3的数据且转成List对象
2022-10-29 21:04:51,659 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 线程被阻塞...
2022-10-29 21:04:51,659 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:165] - workThread_0队列大小=4超出指定阈值=3
2022-10-29 21:04:51,659 [main] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 执行start方法,唤醒被阻塞的线程workThread_0
2022-10-29 21:04:51,660 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_0被唤醒后,开始执行任务:从队列中腾出大小为3的数据且转成List对象
2022-10-29 21:04:51,660 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 线程被阻塞...
2022-10-29 21:04:53,374 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:146] - 当前时间距离上次任务处理时间周期=1714超出指定阈值=1000
2022-10-29 21:04:53,374 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 执行start方法,唤醒被阻塞的线程workThread_0
2022-10-29 21:04:53,375 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_0被唤醒后,开始执行任务:从队列中腾出大小为2的数据且转成List对象
2022-10-29 21:04:53,375 [workThread_0] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 线程被阻塞...
2022-10-29 21:04:53,379 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:146] - 当前时间距离上次任务处理时间周期=1720超出指定阈值=1000
2022-10-29 21:04:53,379 [pool-1-thread-1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:156] - 执行start方法,唤醒被阻塞的线程workThread_1
2022-10-29 21:04:53,380 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:183] - workThread_1被唤醒后,开始执行任务:从队列中腾出大小为2的数据且转成List对象
2022-10-29 21:04:53,380 [workThread_1] INFO c.t.c.c.ProducerAndConsumerComponet [ProducerAndConsumerComponet.java:211] - 线程被阻塞...
复制代码
可能有部分小伙伴对在生产环境如何正确使用这款组件有疑虑,对此完整版我已经开源到GitHub上(基于springboot构建内含如何正确初始化此组件以及完整的测试用例),有兴趣的小伙伴可以自取。
github地址:https://github.com/TaoZhuGongBoy/ProducerAndConsumerComponet
总结
好了这款组件介绍已接近尾声,接下来让我们一起做下总结:
☆是什么
我们说这款工具是一款基于“生产者-消费者模式”实现的组件。以前生产者必须同步调用、等待相关业务操作的处理结果后才能返回(一旦有些业务场景生产者生产的速度过快,方法内部自身业务处理又比较耗时)这时如果同步等待调用结果返回,系统整体吞吐量会极具降低。现在换了一种思路即生产者不需要同步等待业务的处理结果,当它发送一个请求后立即返回,耗时的处理由一致多个消费者线程来异步处理,加快任务整体处理速度。(异步、解耦)
☆适用场景
它比较适合处理一些重要程度不是很高的数据(比如埋点日志、下载请求等),当生产者生产数据过快,业务本身处理又比较耗时,那用这个方案是比较合适的。
为什么在这里要强调重要程度不是很高这句话呢?因为BlockQueue毕竟是基于内存的数据结构,极端情况下是存在数据丢失风险的,像埋点日志、下载请求这种数据小部分丢失其实对业务影响不大。
看到这里可能有部分小伙伴会产生一个疑问:怎么看这个组件的功能跟MQ这么像。对的,功能是相似的,但这款组件是一个本地化的解决方案,目的就是为了降低引入消息队列的复杂度才设计(设计意图)。
☆怎么实现的
每个消费者线程对象内部持有一个有界阻塞队列,当外部生产者调用组件的add方法后,add方法内部实现路由,最终保存到指定的阻塞队列内。
消费者线程本身死循环来判断阻塞队列中的元素是否满足条件,如果不满足,线程就会被阻塞(避免死循环给系统造成性能影响;通过Locksupport.park实现)。 一旦消费者线程对象内部的add、timeout方法满足触发条件后,被阻塞的线程就会被唤醒,然后线程继续执行余下业务逻辑:从阻塞队列中取出数据,然后转换成有初始容量限制的List对象后,调用回调接口传递数据。
写到最后
这款组件几乎囊括了并发编程领域半壁*的技术点,能把这些散的点串起来,用好用对,着实不容易,也体现出组件作者深厚的基础技术功底。
如果你是《并发编程》的初学者亦或有几年经验的老兵,都建议好好揣摩与学习一下这款组件的架构设计与编码实现;如果在你的生产场景中你刚好也碰到类似问题与场景,那么这款组件也许能帮助你。
本文完!
关注我
如果这篇文章你看了对你有帮助或启发,麻烦点赞、关注一下作者。你的肯定是作者创作源源不断的动力。
公众号
里面不仅汇集了硬核的干货技术、还汇集了像左耳朵耗子、张朝阳总结的高效学习方法论、职场升迁窍门、软技能。希望能辅助你达到你想梦想之地!
公众号内回复关键字“电子书”下载pdf格式的电子书籍(JAVAEE、Spring、JVM、并发编程、Mysql、Linux、kafka、分布式等)、“开发手册”获取阿里开发手册2本、"面试"获取面试PDF资料。