概述
为了提高接口的响应速度,可以使用ThreadPoolExecutor + Runnable 或者ThreadPoolExecutor 并发调用 技术来并行执行task。但是ThreadPoolExecutor有个特点,就是当core线程不足以应付请求的时候,会将task加入到队列中。一旦使用队列,那么就可能出现队列爆掉或者队列导致的内存溢出问题。为了尽快提供接口响应速度,但是又不想使用队列特性的话。可以使用信号量来做到。
Semaphore信号量管理着一组许可,在执行操作时需要首先获得许可,并在使用后释放许可。如果已经没有许可了, acquire方法将一直阻塞,直到有许可。Semaphore可以用来实现有界阻塞容器。
信号量简单例子
这里使用JAVA并发编程一书中的例子来说明信号量的基本用法。
public class BoundedHashSet<T> {
public static void main(String[] args) throws Exception {
BoundedHashSet<String> set = new BoundedHashSet<>(2);
set.add("1");
set.add("2");
set.remove("2");
set.add("3");
System.out.println(JSON.toJSONString(set));
}
private final Set<T> tempSet;
private final Semaphore sem;
public BoundedHashSet(int size) {
this.tempSet = Collections.synchronizedSet(new HashSet<T>());
sem = new Semaphore(size);
}
public boolean add(T o) throws Exception {
sem.acquire();
boolean isAdd = false;
try{
isAdd = tempSet.add(o);
return isAdd;
}
finally {
if (!isAdd) {
sem.release();
}
}
}
public boolean remove(Object o) {
boolean isRemoved = tempSet.remove(o);
if (isRemoved) {
sem.release();
}
return isRemoved;
}
}
这里例子实现了有界阻塞的HashSet。只允许这个HashSet存放两个元素,如果想存第三个元素,必须等到有人把HashSet中的元素remove掉。每次add之前先申请一个许可,如果能申请到,则正常添加元素。申请不到,则acquire()方法会一直阻塞。remove操作里面,则有一个释放许可的操作。
ThreadPoolExecutor中使用信号量
在ThreadPoolExecutor中,我们在定义core线程参数的时候,比如定义为10个,那么使用信号量的时候,初始化参数也设置为10.
Semaphore<Integer> sem= new Semaphore<>(10);
ThreadPoolExecutor中,如果不想用到队列,就必须保证线程池中始终只有core线程在工作。那么当请求太多,core线程处理不过来的时候,用信号量进行阻塞,保证只有当core线程的某些线程执行完后,阻塞才解开。