使用CountDownLatch,这其实是最优雅的写法了,每个线程完成后都去将计数器减一,最后完成时再来唤醒
@Test
public void testThreadSync3() {
final Vector<Integer> list = new Vector<Integer>();
Thread[] threads = new Thread[TEST_THREAD_COUNT];
final CountDownLatch latch = new CountDownLatch(TEST_THREAD_COUNT);
for (int i = 0; i < TEST_THREAD_COUNT; i++) {
final int num = i;
threads[i] = new Thread(new Runnable() {
public void run() {
try {
Thread.sleep(random.nextInt(100));
} catch (InterruptedException e) {
e.printStackTrace();
}
list.add(num);
System.out.print(num + " add.\t");
latch.countDown();
}
});
threads[i].start();
}
try {
latch.await();
} catch (InterruptedException e) {
e.printStackTrace();
}
printSortedResult(list);
}
例2
CountDownLatch 初始化设置count,即等待(await)count个线程或一个线程count次计数,通过工作线程来countDown计数减一,直到计数为0,await阻塞结束。
设置的count不可更改,如需要动态设置计数的线程数,可以使用CyclicBarrier.
下面的例子,所有的工作线程中准备就绪以后,并不是直接运行,而是等待主线程的信号后再执行具体的操作。
package com.example.multithread;
import java.util.concurrent.CountDownLatch;
class Driver
{
private static final int TOTAL_THREADS = 10;
private final CountDownLatch mStartSignal = new CountDownLatch(1);
private final CountDownLatch mDoneSignal = new CountDownLatch(TOTAL_THREADS);
void main()
{
for (int i = 0; i < TOTAL_THREADS; i++)
{
new Thread(new Worker(mStartSignal, mDoneSignal, i)).start();
}
System.out.println("Main Thread Now:" + System.currentTimeMillis());
doPrepareWork();// 准备工作
mStartSignal.countDown();// 计数减一为0,工作线程真正启动具体操作
doSomethingElse();//做点自己的事情
try
{
mDoneSignal.await();// 等待所有工作线程结束
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("All workers have finished now.");
System.out.println("Main Thread Now:" + System.currentTimeMillis());
}
void doPrepareWork()
{
System.out.println("Ready,GO!");
}
void doSomethingElse()
{
for (int i = 0; i < 100000; i++)
{
;// delay
}
System.out.println("Main Thread Do something else.");
}
}
class Worker implements Runnable
{
private final CountDownLatch mStartSignal;
private final CountDownLatch mDoneSignal;
private final int mThreadIndex;
Worker(final CountDownLatch startSignal, final CountDownLatch doneSignal,
final int threadIndex)
{
this.mDoneSignal = doneSignal;
this.mStartSignal = startSignal;
this.mThreadIndex = threadIndex;
}
@Override
public void run()
{
// TODO Auto-generated method stub
try
{
mStartSignal.await();// 阻塞,等待mStartSignal计数为0运行后面的代码
// 所有的工作线程都在等待同一个启动的命令
doWork();// 具体操作
System.out.println("Thread " + mThreadIndex + " Done Now:"
+ System.currentTimeMillis());
mDoneSignal.countDown();// 完成以后计数减一
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void doWork()
{
for (int i = 0; i < 1000000; i++)
{
;// 耗时操作
}
System.out.println("Thread " + mThreadIndex + ":do work");
}
}
public class CountDownLatchTest
{
public static void main(String[] args)
{
// TODO Auto-generated method stub
new Driver().main();
}
}
通过Executor启动线程:
class CountDownLatchDriver2
{
private static final int TOTAL_THREADS = 10;
private final CountDownLatch mDoneSignal = new CountDownLatch(TOTAL_THREADS);
void main()
{
System.out.println("Main Thread Now:" + System.currentTimeMillis());
doPrepareWork();// 准备工作
Executor executor = Executors.newFixedThreadPool(TOTAL_THREADS);
for (int i = 0; i < TOTAL_THREADS; i++)
{
// 通过内建的线程池维护创建的线程
executor.execute(new RunnableWorker(mDoneSignal, i));
}
doSomethingElse();// 做点自己的事情
try
{
mDoneSignal.await();// 等待所有工作线程结束
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
System.out.println("All workers have finished now.");
System.out.println("Main Thread Now:" + System.currentTimeMillis());
}
void doPrepareWork()
{
System.out.println("Ready,GO!");
}
void doSomethingElse()
{
for (int i = 0; i < 100000; i++)
{
;// delay
}
System.out.println("Main Thread Do something else.");
}
}
class RunnableWorker implements Runnable
{
private final CountDownLatch mDoneSignal;
private final int mThreadIndex;
RunnableWorker(final CountDownLatch doneSignal, final int threadIndex)
{
this.mDoneSignal = doneSignal;
this.mThreadIndex = threadIndex;
}
@Override
public void run()
{
// TODO Auto-generated method stub
doWork();// 具体操作
System.out.println("Thread " + mThreadIndex + " Done Now:"
+ System.currentTimeMillis());
mDoneSignal.countDown();// 完成以后计数减一
// 计数为0时,主线程接触阻塞,继续执行其他任务
try
{
// 可以继续做点其他的事情,与主线程无关了
Thread.sleep(5000);
System.out.println("Thread " + mThreadIndex
+ " Do something else after notifing main thread");
}
catch (InterruptedException e)
{
// TODO Auto-generated catch block
e.printStackTrace();
}
}
public void doWork()
{
for (int i = 0; i < 1000000; i++)
{
;// 耗时操作
}
System.out.println("Thread " + mThreadIndex + ":do work");
}
}
CountDownLatch和CyclicBarrier简单比较:
|
CountDownLatch |
CyclicBarrier |
---|---|---|
软件包 |
java.util.concurrent |
java.util.concurrent |
适用情景 |
主线程等待多个工作线程结束 |
多个线程之间互相等待,直到所有线程达到一个障碍点(Barrier point) |
主要方法 |
CountDownLatch(int count) (主线程调用) 初始化计数 CountDownLatch.await (主线程调用) 阻塞,直到等待计数为0解除阻塞 CountDownLatch.countDown 计数减一(工作线程调用) |
CyclicBarrier(int parties, Runnable barrierAction) //初始化参与者数量和障碍点执行Action,Action可选。由主线程初始化 CyclicBarrier.await() //由参与者调用 阻塞,直到所有线程达到屏障点 |
等待结束 |
各线程之间不再互相影响,可以继续做自己的事情。不再执行下一个目标工作。 |
在屏障点达到后,允许所有线程继续执行,达到下一个目标。可以重复使用CyclicBarrier |
异常 |
如果其中一个线程由于中断,错误,或超时导致永久离开屏障点,其他线程也将抛出异常。 |
|
其他 |
如果BarrierAction不依赖于任何Party中的所有线程,那么在任何party中的一个线程被释放的时候,可以直接运行这个Action。 If(barrier.await()==2) { //do action } |
联系方式:zhang_liang1991@126.com