学习并发编程,自然要学习JDK提供的并发工具类,了解他们后,我们就可以更好的去控制程序的并发,为我们的开发也是有了很大帮助呢,本文主要参考了方腾飞老师的《Java并发编程的艺术》。
在JDK的并发包中已经提供了几个非常有用的并发工具类。CountDownLatch、CyclicBarrier和Semaphore工具类中提供了一种并发流程控制的手段,Exchanger工具类提供了在线程间交换数据的一种手段。
1、等待多线程完成的CountDownLatch
CountDownLatch类位于java.util.concurrent包下,利用它可以实现类似计数器的功能。CountDownLatch允许一个或多个线程等待其他线程完成操作。
假设有一个需求:需要解析一个Excel中的多个sheet的数据,此时可以考虑使用多线程,每个线程解析一个sheet里的数据,等到所有的sheet都解析完之后,程序需要提示解析完成。在这个需求中,要实现主线程等待所有线程完成sheet的解析操作,最简单的做法就是使用join()方法,如下所示:
package chapter8; /** * 使用join()方法,使得主线程在所有子线程执行完毕后结束 * @author LXH * */ public class JoinCountDownLatch { public static void main(String[] args) throws InterruptedException { Thread parser1 = new Thread(new Runnable() { @Override public void run() { } }); Thread parser2 = new Thread(new Runnable() { @Override public void run() { System.out.println("parser2 finish"); } }); parser1.start(); parser2.start(); parser1.join(); parser2.join(); System.out.println("all parser finish"); } }
join用于让当前执行线程等待join线程执行结束。其实现原理就是不停检查join线程是否存活,如果join线程存活则让当前线程永远等待。直到join线程中止后,线程的this.notifyAll()方法会被调用,调用notifyAll()方法是在JVM里面实现的,在JDK中看不到。
在JDK 1.5之后的并发包中提供的CountDownLatch也可以实现join的功能,并且比join的功能更多。
/** * 仅仅提供了一个构造方法, 参数count为计数值 * @param count */ public CountDownLatch(int count) { if (count < 0) throw new IllegalArgumentException("count < 0"); this.sync = new Sync(count); }
CountDownLatch中较为重要的三个方法,当我们调用countDown方法是,count值就会键1。由于countDown方法可以用在任何地方,所以这里的count,可以是count个线程,也可以是1个线程中的count个执行步骤。用在多个线程时,只需要把这个CountDownLatch的引用传递到线程中。
/**
* 调用await()方法的线程会被挂起,它会等待直到count值为0才继续执行
* @throws InterruptedException
*/
public void await() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
/**
* 与await()方法类似,只不过等待一定的时间后count值还没变为0的话就会继续执行
* @param timeout
* @param unit
* @return
* @throws InterruptedException
*/
public boolean await(long timeout, TimeUnit unit)
throws InterruptedException {
return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
}
/**
* count值减一
*/
public void countDown() {
sync.releaseShared(1);
}
用下面一个例子来简单介绍一下CountDownLatch的使用方法
package chapter8; import java.util.concurrent.CountDownLatch; /** * CountDownLatch的简单用法 * @author LXH * */ public class CountDownLatchTest { private static CountDownLatch c = new CountDownLatch(2); public static void main(String[] args) throws InterruptedException { new Thread(new Runnable() { @Override public void run() { System.out.println(1); c.countDown(); System.out.println(2); c.countDown(); } }).start(); c.await(); // 只有当count值为0后才继续执行 System.out.println(3); } }
注意:计数器必须大于等于0,只是等于0的时候,计数器就是0,调用await方法时不会阻塞当前线程。CountDownLatch不可能重新初始化或者修改CountDownLatch对象的内部计数器的值(只有在构造它的时候的一次赋值)。一个线程调用countDown方法happen-before,另外一个线程调用await方法。 |
2、同步屏障CyclicBarrier
CyclicBarrier意思是可循环使用(Cyclic)的屏障(Barrier),要做的事情是,让一组线程到达一个屏障(同步点)时被阻塞,知道最后一个线程到达屏障时,屏障才会开门,所有被屏障拦截的线程才会继续运行。
2.1 CyclicBarrier简介
CyclicBarrier类位于java.util.concurrent包下,CyclicBarrier提供2个构造器:
/** * 参数barrierAction为当这些线程都达到barrier状态时会执行的内容。方便执行更复杂的业务场景。 * @param parties * @param barrierAction */ public CyclicBarrier(int parties, Runnable barrierAction) { if (parties <= 0) throw new IllegalArgumentException(); this.parties = parties; this.count = parties; this.barrierCommand = barrierAction; } /** * 默认的构造方法,参数表示屏障拦截的线程数量 * @param parties */ public CyclicBarrier(int parties) { this(parties, null); }
当线程到达屏障时,每个线程调用await方法告诉CyclicBarrier我已经到达了屏障,然后当前线程被阻塞。实例代码如下所示。
package chapter8; import java.util.concurrent.CyclicBarrier; /** * 因为主线程和子线程的调度是由CPU决定的,两个线程都有可能先执行,所以会产生两种输出:1 2 或 2 1. * @author LXH * */ public class CyclicBarrierTest { private static CyclicBarrier c = new CyclicBarrier(2); public static void main(String[] args) { new Thread(new Runnable() { @Override public void run() { try { c.await(); } catch (Exception e) { } System.out.println(1); } }).start();; try { c.await(); } catch (Exception e) { } System.out.println(2); } }
若将new CyclicBarrier(2)修改为new CyclicBarrier(3),则主线程和子线程将拥有等待,不会执行输出语句。因为没有第三线程执行await方法,即没有第三个线程到达屏障,所以之前到达屏障的两个线程都不会继续执行。
使用CyclicBarrier的双参构造方法,用于在所有线程到达屏障时,优先执行barrierAction,方便处理更复杂的业务场景,示例代码如下所示。
package chapter8;
import java.util.concurrent.CyclicBarrier;
/**
* 使用CyclicBarrier的双参构造方法,指定优先执行的方法。
* @author LXH
*
*/
public class CyclicBarrierTest02 {
private static CyclicBarrier c = new CyclicBarrier(2, new A());
public static void main(String[] args) {
new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
}
System.out.println(1);
}
}).start();;
try {
c.await();
} catch (Exception e) {
}
System.out.println(2);
}
static class A implements Runnable {
@Override
public void run() {
System.out.println(3);
}
}
}
2.2 CyclicBarrier的应用场景
CyclicBarrier可以用于多线程计算数据,最后合并计算结果的场景。
eg:用一个Excel保存了用户所有的银行流水,每个Sheet保存一个账户近一年的每笔银行流水,现在需要统计用户的日平均银行流水,先用多线程处理每个sheet里的银行流水,都执行完之后,得到每个sheet的日均银行流水,最后再用barrierAction用这些线程的计算结果,计算出真个Excel的日均银行流水。示例代码如下。
package chapter8;
import java.util.Map.Entry;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Executor;
import java.util.concurrent.Executors;
public class BankWaterService implements Runnable {
/**
* 创建屏障,当有4个线程到达屏障,并处理完成之后,优先执行当前类的run方法
*/
private CyclicBarrier c = new CyclicBarrier(4, this);
/**
* 假设只有4个sheet,所以只启动四个线程
*/
private Executor executor = Executors.newFixedThreadPool(4);
/**
* 保存每个sheet计算出的结果
*/
private ConcurrentHashMap<String, Integer> sheetBankWaterCount = new ConcurrentHashMap<>();
private void count() {
for(int i = 0; i < 4; i++) {
executor.execute(new Runnable() {
@Override
public void run() {
// 计算后当前sheet的值,计算代码省略
sheetBankWaterCount.put(Thread.currentThread().getName(), 1);
// 通知屏障,我已到达,阻塞。
try {
c.await();
} catch (Exception e) {
e.printStackTrace();
}
}
});
}
}
@Override
public void run() {
int result = 0;
for(Entry<String, Integer> sheet : sheetBankWaterCount.entrySet()) {
result += sheet.getValue();
}
sheetBankWaterCount.put("result", result);
System.out.println(result);
}
public static void main(String[] args) {
BankWaterService bankWaterService = new BankWaterService();
bankWaterService.count();
}
}
2.3 CyclicBarrier与CountDownLatch的区别
public class Test {
public static void main(String[] args) {
int N = 4;
CyclicBarrier barrier = new CyclicBarrier(N);
for(int i=0;i<N;i++) {
new Writer(barrier).start();
}
try {
Thread.sleep(25000);
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("CyclicBarrier重用");
for(int i=0;i<N;i++) {
new Writer(barrier).start();
}
}
static class Writer extends Thread{
private CyclicBarrier cyclicBarrier;
public Writer(CyclicBarrier cyclicBarrier) {
this.cyclicBarrier = cyclicBarrier;
}
@Override
public void run() {
System.out.println("线程"+Thread.currentThread().getName()+"正在写入数据...");
try {
Thread.sleep(5000); //以睡眠来模拟写入数据操作
System.out.println("线程"+Thread.currentThread().getName()+"写入数据完毕,等待其他线程写入完毕");
cyclicBarrier.await();
} catch (InterruptedException e) {
e.printStackTrace();
}catch(BrokenBarrierException e){
e.printStackTrace();
}
System.out.println(Thread.currentThread().getName()+"所有线程写入完毕,继续处理其他任务...");
}
}
}
CyclicBarrier的功能更加强大,提供了其他有用的方法,如getNumberWaiting方法可以获得CyclicBarrier阻塞线程的数量,isBroken()方法用来了解阻塞线程是否被中断。
package chapter8;
import java.util.concurrent.CyclicBarrier;
/**
* @author LXH
*
*/
public class CyclicBarrierTest3 {
private static CyclicBarrier c = new CyclicBarrier(2);
public static void main(String[] args) {
Thread thread = new Thread(new Runnable() {
@Override
public void run() {
try {
c.await();
} catch (Exception e) {
}
}
});
thread.start();
/*
中断这个线程。
除非当前线程中断自身,这是始终允许的,所以调用此线程的checkAccess方法,这可能会导致抛出SecurityException 。
如果该线程阻塞的调用wait() , wait(long) ,或wait(long, int)的方法Object类,或者在join() , join(long) , join(long, int) , sleep(long) ,或sleep(long, int) ,这个类的方法,那么它的中断状态将被清除,并且将收到一个InterruptedException 。
*/
thread.interrupt();
try {
c.await();
} catch (Exception e) {
System.out.println(c.isBroken()); // true
}
}
}
3 控制并发线程数的Semaphore
Semaphore(信号量)是用来控制同时访问特定资源的线程数量,它通过协调各个线程,以保证合理的使用公共资源。
3.1 Semaphore简介
/** * 接收一个整形数字,表示可用的许可证的数量,默认为非公平设置 */ public Semaphore(int permits) { sync = new NonfairSync(permits); } /** * 接收一个整形数字,表示可用的许可证的数量。接收一个是否公平设置 */ public Semaphore(int permits, boolean fair) { sync = fair ? new FairSync(permits) : new NonfairSync(permits); }Semaphore还提供了一些其他方法,具体如下
/** * 获取一个Semaphore信号量 * @throws InterruptedException */ public void acquire() throws InterruptedException { sync.acquireSharedInterruptibly(1); } /** * 使用方法尝试获取许可证 * 获得许可证,如果有可用并立即返回,值为true ,将可用许可证数量减少一个。 * 如果没有许可证可用,那么该方法将立即返回值为false 。 * @return */ public boolean tryAcquire() { return sync.nonfairTryAcquireShared(1) >= 0; } /** * 归还一个信号量 */ public void release() { sync.releaseShared(1); } /** *是否有线程正在等待获取信号量 * * @return */ public final boolean hasQueuedThreads() { return sync.hasQueuedThreads(); } /** *返回次信号量中当前可以使用的信号量数 * * @return */ public int availablePermits() { return sync.getPermits(); } /** * 返回正在等待获取信号量的线程数 * @return */ public final int getQueueLength() { return sync.getQueueLength(); } /** *返回所有等待获取信号量的线程集合,protected方法 * * @return the collection of threads */ protected Collection<Thread> getQueuedThreads() { return sync.getQueuedThreads(); }
Semaphore的用法很简单,首先线程使用Semaphore的acquire()方法获取一个许可证,使用完之后调用release()方法归还许可证。还可以使用tryAcquire()方法尝试获取许可证。
3.2 应用场景
Semaphore可以用于做流量控制,特别是公共资源有限的应用场景,比如数据库连接。
eg:假如有一个需求,要读取几万个文件的数据,因为都是IO密集型任务,我们可以启动几十个线程并发地读取,但是如果读到内存后,还需要存储到数据库中,而数据库的连接数只有10个,这是我们必须控制只有10个线程同时获取数据库连接保存数据,否则会报错无法获取数据库连接。这是,就可以使用Semaphore来做流量控制。
package chapter8;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
/**
* 虽然有30个线程在执行,但是只允许10个线程并发执行
* @author LXH
*
*/
public class SemaphoreTest {
private static final int THREAD_COUNT = 30;
private static ExecutorService threadPool = Executors.newFixedThreadPool(THREAD_COUNT);
private static Semaphore s = new Semaphore(10);
public static void main(String[] args) {
for(int i = 0; i < THREAD_COUNT; i++) {
threadPool.execute(new Runnable() {
@Override
public void run() {
try {
s.acquire();
System.out.println("获取成功,save data");
s.release();
} catch (Exception e) {
}
}
});
}
threadPool.shutdown();
}
}
4 线程间交换数据的Exchanger
Exchanger(交换者)是一个用于线程间协作的工具类。Exchanger用于进行线程间的数据交换。它提供一个同步点,在这个同步点,两个线程可以交换彼此的数据。这两个线程通过exchange方法交换数据,如果第一个线程先执行exchange()方法,它会一直等待第二个线程也只需exchange()方法,当这两个线程都达到同步点时,这两个线程就可以交换数据,将本线程生产出来的数据传递给对方。
4.1 Exchanger的应用场景
用于遗传算法:遗传算法里需要选出两个人作为交配对象,这时候会交换两个人的数据,并使用交叉规则得到两个交配结果。
用于校对工作:对某一项工作,两个人同时进行,最后校对两人的数据是否相等。
package chapter8; import java.util.concurrent.Exchanger; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; public class ExchangerTest { private static final Exchanger<String> exgr = new Exchanger<>(); private static ExecutorService threadPool = Executors.newFixedThreadPool(2); public static void main(String[] args) { threadPool.execute(new Runnable() { @Override public void run() { try { String A = "银行流水A"; // A录入银行流水数据 exgr.exchange(A); } catch (Exception e) { } } }); threadPool.execute(new Runnable() { @Override public void run() { try { String B = "银行流水A"; // B录入银行流水数据 String A = exgr.exchange("B"); System.out.println("A和B数据是否一致:" + A.equals(B) + ", A录入的是: " + A + ",B录入的是:" + B); } catch (Exception e) { } } }); threadPool.shutdown(); } }
运行结果:
如果两个线程有一个没有执行exchange()方法,则会造成一致等待,如果担心有特殊情况发生,避免一直等待,可以使用exchange(V x, long timeout, TimeUnit unit)设置最大等待时常。
好啦,到这里,这几个小工具的使用就介绍完了,希望对大家有些帮助吧。有什么见解或问题,给小编留言哦。