今天开始我们聊聊 Java 并发工具包中提供的一些工具类,本文主要从并发同步容器和并发集合工具角度入手,简单介绍下相关 API 的用法与部分实现原理,旨在帮助大家更好的使用和理解 JUC 工具类。
在开始今天的内容之前,我们还需要简单回顾下线程、 syncronized 的相关知识。
Java 线程的运行周期中的几种状态, 在 java.lang.Thread 包中有详细定义和说明:
NEW 状态是指线程刚创建, 尚未启动
RUNNABLE 状态是线程正在正常运行中
BLOCKED 阻塞状态
WAITING 等待另一个线程来执行某一特定操作的线程处于这种状态。这里要区分 BLOCKED 和 WATING 的区别, BLOCKED 是在临界点外面等待进入, WATING 是在临界点里面 wait 等待其他线程唤醒(notify)
TIMEDWAITING 这个状态就是有限的(时间限制)的 WAITING
TERMINATED 这个状态下表示 该线程的 run 方法已经执行完毕了, 基本上就等于死亡了(当时如果线程被持久持有, 可能不会被回收)
synchronized 实现同步的基础:Java 中的每一个对象都可以作为锁。
具体表现为以下 3 种形式:
对于普通同步方法,锁是当前实例对象。
对于静态同步方法,锁是当前类的 Class 对象。
对于同步方法块,锁是 synchronized 括号里配置的对象。当一个线程试图访问同步代码块时,它首先必须得到锁,退出或抛出异常时必须释放锁。
那么同步方法(syncronized ) 与 静态同步方法(static syncronized ) 的有什么区别呢? 我们来看一个简单的例子:
class Phone {
public /*static*/ synchronized void sendEmail() throws InterruptedException {
TimeUnit.SECONDS.sleep(4);
System.out.println("--------sendEmail");
}
public /*static*/ synchronized void getMessage() {
System.out.println("--------getMessage");
}
public void getHello() {
System.out.println("--------getHello");
}
main{
Phone p = new Phone();
p.sendEmail();
p.getMessage();
p.getHello();
}
}
通过以上代码回答下面问题:
标准访问的时候,请问先打印邮件还是短信?
sendEmail方法暂停4秒钟,请问先打印邮件还是短信?
新增Hello普遍方法,请问先打印邮件还是Hello?
两部手机,请问先打印邮件还是短信?
两个静态同步方法,同1部手机 ,请问先打印邮件还是短信?
两个静态同步方法,有2部手机 ,请问先打印邮件还是短信?
1个静态同步方法,1个普通同步方法,有1部手机 ,请问先打印邮件还是短信?
1个静态同步方法,1个普通同步方法,有2部手机 ,请问先打印邮件还是短信?
思考一下,我们再做分析~
一个对象里面如果有多个 synchronized 方法,某一个时刻内,只要一个线程去调用 其中的一个 synchronized 方法了,其它的线程都只能等待,换句话说,某一个时刻内,只能有唯一一个线程去访问这些 synchronized 方法;
所有的非静态同步方法用的都是同一把锁——实例对象本身,synchronized 方法锁的是当前对象 this,被锁定后,其它的线程都不能进入到当前对象的其它 synchronized 方法,也就是说如果一个实例对象的非静态同步方法获取锁后,该实例对象的其他非静态同步方法必须等待获取锁的方法释放锁后才能获取锁;
加个普通方法后发现和同步锁无关;
-
换成两个对象后,不是同一把锁了,毋须等待互不影响。
因为别的实例对象的非静态同步方法跟该实例对象的非静态同步方法用的是不同的锁,所以毋须等待;
所有的静态同步方法用的是同一把锁——类对象本身(锁的是类模板),一旦一个静态同步方法获取锁后,其他的静态同步方法都必须等待该方法释放锁后才能获取锁,而不管是同一个实例对象的静态同步方法之间,还是不同的实例对象的静态同步方法之间,只要它们是同一个类模板的实例对象就要争取同一把锁;
第1和第5中的这两把锁是两个不同的对象,所以静态同步方法与非静态同步方法之间是不会有竞态条件的。
经过分析,答案也就不然而喻了。
简单回顾之后,回到正文,JUC 中提供了比 synchronized 更加高级的同步结构,包括 CountDownLatch,CyclicBarrier,Semaphone 等可以实现更加丰富的多线程操作。
另外还提供了各种线程安全的容器 ConcurrentHashMap、有序的 ConcurrentSkipListMap,CopyOnWriteArrayList 等。
CountDownLatch (计数器)
让一些线程阻塞直到另一些线程完成一系列操作后才被唤醒。
CountDownLatch 主要有 countDown、await 两个方法,当一个或多个线程调用 await 方法时,这些线程会阻塞。其它线程调用 countDown 方法会将计数器减 1 (调用 countDown 方法的线程不会阻塞),当计数器的值变为 0 时,因 await 方法阻塞的线程会被唤醒,继续执行。
代码案例:图书馆下班 ,等读者全部离开后,图书管理员才能关闭图书馆。
main 主线程必须要等前面线程完成全部工作后,自己才能执行。
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
CountDownLatch countDownLatch = new CountDownLatch(5);//参数代表读者的数量
for (int i = 1; i <= 5 ; i++) {
new Thread(() -> {
System.out.println(Thread.currentThread().getName()+"\t 号读者离开了图书馆");
countDownLatch.countDown();
} ,CountryEnum.getKey(i).getName()).start();
}
countDownLatch.await();
System.out.println(Thread.currentThread().getName()+"\t ------图书管理员闭馆");
}
}
结果如下:
3 号读者离开了图书馆
2 号读者离开了图书馆
4 号读者离开了图书馆
1 号读者离开了图书馆
5 号读者离开了图书馆
main ------图书管理员闭馆
CyclicBarrier (循环屏障)
CyclicBarrier 的字面意思是可循环(Cyclic)使用的屏障(Barrier)。
它要做的事情是,让一组线程到达一个屏障(也可以叫同步点)时被阻塞,直到最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续干活。
线程进入屏障通过 CyclicBarrier 的 await() 方法。
代码案例:集齐10张卡牌才可以召开奖
public class CyclicBarrierDemo {
private static final int NUMBER = 10;
public static void main(String[] args){
//构造方法 CyclicBarrier(int parties,Runnable action)
CyclicBarrier cyclicBarrier = new CyclicBarrier(10, new Thread(() -> {
System.out.println("集齐卡牌 开始开奖");
}));
for (int i = 1; i <= NUMBER ; i++) {
final int tempInt = i;
new Thread(() -> {
try {
System.out.println(Thread.currentThread().getName()+"\t 收集了"+tempInt+"号卡牌");
cyclicBarrier.await();
} catch (Exception e) {
e.printStackTrace();
}
} ,String.valueOf(i)).start();
}
}
}
结果如下:
1 收集了1号卡牌
8 收集了8号卡牌
4 收集了4号卡牌
3 收集了3号卡牌
5 收集了5号卡牌
7 收集了7号卡牌
9 收集了9号卡牌
6 收集了6号卡牌
2 收集了2号卡牌
10 收集了10号卡牌
集齐卡牌 开始开奖
Semaphone (信号量)
信号量典型应用场景是多个线程抢多个资源。
在信号量上我们定义两种操作:
acquire(获取): 当一个线程调用 acquire 操作时,它要么通过成功获取信号量(信号量减 1 ),要么一直等下去,直到有线程释放信号量,或超时。
release(释放):实际上会将信号量的值加 1,然后唤醒等待的线程。
信号量主要用于两个目的,一个是用于多个共享资源的互斥使用,另一个用于并发线程数的控制。
代码案例:停车场停车 ,车抢车位
public class SemaphoreDemo {
public static void main(String[] args){
Semaphore semaphore = new Semaphore(3);// 模拟 3 个停车位
for (int i = 1; i <= 6 ; i++) {//6 辆车
new Thread(() -> {
try{
semaphore.acquire();
System.out.println(Thread.currentThread().getName()+"\t 抢到停车位");
TimeUnit.SECONDS.sleep(new Random().nextInt(5));
System.out.println(Thread.currentThread().getName()+"\t 离开停车位");
}catch(Exception e){
e.printStackTrace();
}finally{
semaphore.release();
}
} ,String.valueOf(i)).start();
}
}
}
结果如下:
2 抢到停车位
4 抢到停车位
1 抢到停车位
2 离开停车位
6 抢到停车位
6 离开停车位
5 抢到停车位
4 离开停车位
1 离开停车位
3 抢到停车位
3 离开停车位
5 离开停车位
接下来,我们来梳理下并发包里提供的线程安全的集合类,基本代码如下:
public class NotSafeDemo {
public static void main(String[] args){
//高并发 list
List<Object> list = new CopyOnWriteArrayList<>();
/高并发 set
Set<Object> objects = new CopyOnWriteArraySet<>();
/高并发 map
Map<String,String> map = new ConcurrentHashMap<String,String>();
for (int i = 0; i < 50 ; i++) {
new Thread(() -> {
list.add(UUID.randomUUID().toString().substring(0,6));
System.out.println(list);
} ,String.valueOf(i)).start();
}
}
}
CopyOnWrite 容器也被称为写时复制的容器。
往一个容器添加元素的时候,不直接往当前容器 Object[] 添加,而是先将当前容器 Object[] 进行 Copy,复制出一个新的容器 Object[] newElements,然后新的容器 Object[] newElements 里添加元素,添加完元素之后,再将原容器的引用指向新的容器 setArray(newElements)。
这样做的好处是可以对 CopyOnWrite 容器进行并发的读,而不需要加锁,因为当前容器不会添加任何元素。所以 CopyOnWrite 容器也是一种读写分离的思想,读和写不同的容器,但是由于通过对底层数组复制来实现的,一般需要很大的开销。当遍历次数大大超过修改次数的时,这种方法比其他替代方法更有效。部分源码如下:
public boolean add(E e) {
final ReentrantLock lock = this.lock;
lock.lock();
try {
Object[] elements = getArray();
int len = elements.length;
Object[] newElements = Arrays.copyOf(elements, len + 1);//先复制,再添加一个空元素
newElements[len] = e;
setArray(newElements);
return true;
} finally {
lock.unlock();
}
}
而带有 Concurrent 的一般才是真正的适用并发的工具,ConcurrentHashMap 被认为是弱一致性的,本质原因在于 ConcurrentHashMap 在读数据是并没有加锁。
关于并发集合的应用还要在实际开发中多多体会,实践才是最好的老师。
扩展知识:
今天的扩展知识简单介绍下 Java 常用的 4 种线程池:
newCachedThreadPool
创建可缓存的线程,底层是依靠 SynchronousQueue 实现的,创建线程数量几乎没有限制(最大为 Integer.MAX_VALUE)。
如果长时间没有往线程池提交任务,即如果工作线程空闲了指定时间(默认1分钟),该工作线程自动终止。终止后如果又有了新的任务,则会创建新的线程。
在使用 CachedTreadPool 时,要注意控制任务数量,否则由于大量线程同时运行,很有可能造成系统瘫痪。
newFixedThreadPool
创建指定数量的工作线程,底层是依靠 LinkedBlockingQueue 实现的,没提交一个任务就创建一个工作线程,当工作线程数量达到线程池初始的最大数,则将提交的任务存入到池队列中。
在线程空闲时,不会释放工作线程,还会占用一定的系统资源。
newSingleThreadExecutor
创建单线程,底层是 LinkedBlockingQueue 实现的,它只会用一个工作线程来执行任务,保证所有的任务按指定顺序执行。如果这个线程异常结束,会有另一个取代它,保证顺序执行。
最大的特点是可保证顺序地执行各个任务,并在任意时间是不会有过个线程活动的。
newScheduleThreadPool
创建一个定长的线程池,支持定时以及周期性的任务调度。
参考资料:
https://github.com/fanpengyi/java-util-concurrent.git ---- 文中代码git库
关注一下,我写的就更来劲儿啦