并发工具类
本系列文章主要讲解Java
并发相关的内容,包括同步、锁、信号量、阻塞队列、线程池等,整体思维导图如下:
系列文章列表:
本文主要以实例讲解CountDownLatch
、Semaphore
、阻塞队列和线程池等内容。
CountDownLatch
基本概念和用途
CountDownLatch
主要是在其他线程执行操作前,允许一个或者多个线程一直等待。
其源码实现主要采用AQS
,具体可参考Java
并发基础-同步和锁。
构造方法CountDownLatch
(int
count
)用来初始化计数器,实际就是设置count
(最终是设置state
值)的初始值。该值设置后,不能重置,所有当线程必须用这种方法反复倒计数时,可改为使用 CyclicBarrier
。countDown()
方法用来减少计数器的值,每次减1
。getCount()
方法用来返回当前计数器的值。
流程图简化如下
运行示例
在SynchronizedDemo
代码中,我们使用Thread
的join
方法等待一个Spender
和Earner
线程运行完成后,再去获取账户余额balance
的值,这里我们利用CountDownLatch
计数器,先阻塞主线程,待一组Spender
和一组Earner
线程完成后,再让主线程获取账户余额的值,代码如下:
package com.molyeo.java.concurrent;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by zhangkh on 2018/7/17.
*/
public class CountDownLatchDemo {
public static void main(String[] args) throws InterruptedException {
System.out.println("main thread start");
Account account = new Account();
account.setBalance(100000);
CountDownLatch latch = new CountDownLatch(20);
System.out.println("main latch init="+latch.getCount());
ExecutorService executorService = Executors.newFixedThreadPool(10);
for (int i = 0; i < 10; i++) {
SpenderWithCountDownLatch spender = new SpenderWithCountDownLatch(account, latch);
executorService.submit(spender);
}
for (int i = 0; i < 10; i++) {
EarnerWithCountDownLatch earner = new EarnerWithCountDownLatch(account, latch);
executorService.submit(earner);
}
System.out.println("main thread block");
latch.await();
System.out.println("main latch="+latch.getCount());
System.out.println("main thread continue to do");
System.out.println("balance="+account.getBalance());
executorService.shutdown();
}
}
先初始化CountDownLatch
的值为20
,然后分布创建一组Spender
线程(每组10
个)和一组Earner
线程(每组10
个),并将account
和latch
传递给这些线程。
SpenderWithCountDownLatch
代码如下,运行时主要是30
次减少账户余额,每次减少1000
。运行完成后调用latch
.countDown()
,减少计数器的值。
public class SpenderWithCountDownLatch implements Runnable {
private final Account account;
private final CountDownLatch latch;
public SpenderWithCountDownLatch(Account account, CountDownLatch latch) {
this.account = account;
this.latch = latch;
}
@Override
public void run() {
for (int i = 0; i < 30; i++) {
account.subtractAmount(1000);
}
latch.countDown();
System.out.println("Spender run ......");
}
}
EarnerWithCountDownLatch
代码如下,运行时主要是30
次增加账户余额,每次增加1000
。运行完成后调用latch
.countDown()
,减少计数器的值。
public class EarnerWithCountDownLatch implements Runnable {
private final Account account;
private final CountDownLatch latch;
public EarnerWithCountDownLatch(Account account, CountDownLatch latch) {
this.account = account;
this.latch = latch;
}
@Override
public void run() {
for (int i = 0; i < 30; i++) {
account.addAmount(1000);
}
latch.countDown();
System.out.println("Earner run ...." );
}
}
Account
代码如下,需要利用同步块或者Lock
保证addAmount
和subtractAmount
方法线程安全
public class Account {
private double balance;
public double getBalance() {
return balance;
}
public void setBalance(double balance) {
this.balance = balance;
}
public void addAmount(double amount) {
synchronized (Account.class) {
balance = balance + amount;
}
}
public void subtractAmount(double amount) {
synchronized (Account.class) {
balance = balance - amount;
}
}
}
整个程序输出如下:
main thread start
main latch init=20
main thread block
Spender run ......
Spender run ......
Spender run ......
Earner run ....
Earner run ....
Earner run ....
Earner run ....
Earner run ....
Earner run ....
Spender run ......
Earner run ....
Earner run ....
Spender run ......
Earner run ....
Earner run ....
Spender run ......
Spender run ......
Spender run ......
Spender run ......
Spender run ......
main latch=0
main thread continue to do
balance=100000.0
满足预期结果为100000
,同时主线程一直阻塞直到latch
的值为0
。
CyclicBarrier
基本概念和主要方法
CyclicBarrier
允许一组线程互相等待,直到所有的线程到到达公共屏障点(common
barrier
point
)。和CountDownLatch
不同的是,CyclicBarrier
可以在释放等待线程后重置然后重用。
构造方法
public CyclicBarrier(int parties, Runnable barrierAction)
其中parties
,表示线程数量,即参与者数量barrierAction
表示启动barrier
时执行指定的操作,该操作由最后一个进入barrier
的线程执行
await()
参与者阻塞等待,直到所有的参与者都到达barrier
流程图简化如下
使用示例
我们看下面一个示例,其功能是主线程、Spender
线程和Earner
线程共用一个barrier
,其中barrier
初始值为3
,三个线程都到达barrier
后执行BarrierAction
定义的动作。
package com.molyeo.java.concurrent;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
/**
* Created by zhangkh on 2018/9/5.
*/
public class CyclicBarrierDemo {
public static void main(String[] args) {
Account account=new Account();
account.setBalance(100000);
CyclicBarrier barrier = new CyclicBarrier(3, new BarrierAction());
ExecutorService executorService = Executors.newFixedThreadPool(10);
SpenderWithCyclicBarrier spender = new SpenderWithCyclicBarrier(account, barrier);
executorService.submit(spender);
EarnerWithCyclicBarrier earner = new EarnerWithCyclicBarrier(account, barrier);
executorService.submit(earner);
try{
System.out.println(String.format("%20s waiting at barrier",Thread.currentThread().getName()));
barrier.await();
}catch (InterruptedException e){
e.printStackTrace();
}catch (BrokenBarrierException e){
e.printStackTrace();
}
System.out.println("balance="+account.getBalance());
System.out.println(String.format("%20s done",Thread.currentThread().getName()));
}
}
class BarrierAction implements Runnable{
public void run() {
System.out.println(String.format("%20s executed",Thread.currentThread().getName()));
}
}
class SpenderWithCyclicBarrier implements Runnable {
private final Account account;
private final CyclicBarrier barrier;
public SpenderWithCyclicBarrier(Account account, CyclicBarrier barrier) {
this.account = account;
this.barrier = barrier;
}
@Override
public void run() {
for (int i = 0; i < 30; i++) {
account.subtractAmount(1000);
}
try{
System.out.println(String.format("%20s waiting at barrier",Thread.currentThread().getName()));
barrier.await();
}catch (InterruptedException e){
e.printStackTrace();
}catch (BrokenBarrierException e){
e.printStackTrace();
}
System.out.println(String.format("%20s done",Thread.currentThread().getName()));
}
}
class EarnerWithCyclicBarrier implements Runnable {
private final Account account;
private final CyclicBarrier barrier;
public EarnerWithCyclicBarrier(Account account, CyclicBarrier barrier) {
this.account = account;
this.barrier = barrier;
}
@Override
public void run(){
for (int i = 0; i < 30; i++) {
account.addAmount(1000);
}
try{
System.out.println(String.format("%20s waiting at barrier",Thread.currentThread().getName()));
barrier.await();
}catch (InterruptedException e){
e.printStackTrace();
}catch (BrokenBarrierException e){
e.printStackTrace();
}
System.out.println(String.format("%20s done",Thread.currentThread().getName()));
}
}
class Account {
private double balance;
public double getBalance() {
return balance;
}
public void setBalance(double balance) {
this.balance = balance;
}
public void addAmount(double amount) {
synchronized (Account.class) {
balance = balance + amount;
}
}
public void subtractAmount(double amount) {
synchronized (Account.class) {
balance = balance - amount;
}
}
}
程序输出如下:
pool-1-thread-2 waiting at barrier
main waiting at barrier
pool-1-thread-1 waiting at barrier
pool-1-thread-1 executed
pool-1-thread-1 done
pool-1-thread-2 done
balance=100000.0
main done
我们可以看到pool
-1
-thread
-2
和main
线程执行完对账户余额的操作后,先到达barrier
阻塞等待,pool
-1
-thread
-1
线程最后到达,然后由pool
-1
-thread
-1
线程执行预定义的动作,即输出executed
后,这三个线程再继续执行其他信息的输出。
这里要注意到时,各位输出的内容可能是上面的不太一致,不过第三行和第四行的线程名要么是main
、要么是pool
-1
-thread
-1
,或者是pool
-1
-thread
-2
。不会存在两个线程名不一样的情况。这里说明了到达barrier
后预定义的动作是由最后到达的线程去执行的。
本文参考
关于作者
爱编程、爱钻研、爱分享、爱生活
关注分布式、高并发、数据挖掘
如需捐赠,请扫码