- 信号量Semaphore:是一种计数器,用来保护一个或多个共享资源的访问。
- CountDownLatch:在完成一组正在其他线程中执行的操作之前,它允许线程一直等待
- CyclicBarrier:它允许多个线程在某个集合点处进行相互等待
- Phaser:可以把并发任务分成多个阶段运行,在进行下一阶段之前,当前阶段的所有线程都必须执行完成
- Exchanger:提供两个线程之间的数据交换点
1、如果线程要访问一个共享资源,必须先获得信号量。如果信号量的计数器大于0,信号量减一,然后允许访问这个资源。
2、如果信号量的计数器等于0信号量就会把线程置为休眠直到信号量大于0.
3、当线程使用完某个共享资源时,信号量必须被释放,释放操作使信号量内部计数器加一
4、二进制信号量只能保护单一资源
三、注意
实现临界区的三个步骤:
1、acquire()获得信号量
2、对共享资源进行必要操作
3、通过release()释放信号量
四、其他方法
acquireUninterruptibly():就是acquire()方法,但是不抛出异常,当线程被阻塞这段时间,可能会中断。acquire()会抛出InterruptedExecption
public void acquireUninterruptibly() {
sync.acquireShared(1);
}public void acquire() throws InterruptedException {
sync.acquireSharedInterruptibly(1);
}
tryAcquire():试图获取信号量
五、测试二进制信号量
package com.concurrent.threadAvgClass;
/**
* 打印工作类
*
* @author Nicholas
*
*/
public class Job implements Runnable {
private PrintQueue printQueue;
public Job(PrintQueue printQueue) {
this.printQueue = printQueue;
}
@Override
public void run() {
System.out.println("Going to print a document "
+ Thread.currentThread().getName());
printQueue.PrintJob(new Object());
System.out.println("The document has been printed "
+ Thread.currentThread().getName());
}
}package com.concurrent.threadAvgClass;
import java.util.concurrent.Semaphore;
public class PrintQueue {
// 声明一个信号量对象
private final Semaphore semaphore;
public PrintQueue() {
semaphore = new Semaphore(1);
}
public void PrintJob(Object documentObject) {
try {
semaphore.acquire();
Long duration = (long) (Math.random() * 10);
System.out.println(Thread.currentThread().getName()
+ " : PrintQueue : Printing a Job during "
+ (duration / 1000) + " seconds");
Thread.sleep(duration);
} catch (InterruptedException e) {
e.printStackTrace();
}finally {
semaphore.release();
}
}
}
六、信号量如何实现保护资源的多个副本或者多个线程同时执行的临界区?
package com.concurrent.threadAvgClass;
/**
* 打印工作类
*
* @author Nicholas
*
*/
public class Job2 implements Runnable {
private PrintQueue2 printQueue;
public Job2(PrintQueue2 printQueue) {
this.printQueue = printQueue;
}
@Override
public void run() {
System.out.println("Going to print a document "
+ Thread.currentThread().getName());
printQueue.PrintJob(new Object());
System.out.println("The document has been printed "
+ Thread.currentThread().getName());
}
}package com.concurrent.threadAvgClass;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
public class PrintQueue2 {
// 声明一个信号量对象
private final Semaphore semaphore;
private boolean[] freePrinters; // 存放打印机的状态,两种状态:空闲或者正在打印
private Lock lockFreePrinters;// 锁对象,保护对freePrinters的访问
public PrintQueue2() {
semaphore = new Semaphore(3);
freePrinters = new boolean[3];
for (int i = 0; i < freePrinters.length; i++) {
freePrinters[i] = true;
}
lockFreePrinters = new ReentrantLock();
}
public void PrintJob(Object documentObject) {
try {
semaphore.acquire();
//获取当前空闲的打印机
int assignedPrinter = getPrinter();
Long duration = (long) (Math.random() * 10);
System.out
.printf("%s : PrintQueue : Printing a job at Printer %d during %d seconds",
Thread.currentThread().getName(), assignedPrinter,
duration);
TimeUnit.SECONDS.sleep(duration);
//打印完毕,将当前打印机置为空闲
freePrinters[assignedPrinter] = true;
} catch (InterruptedException e) {
e.printStackTrace();
} finally {
semaphore.release();
}
}
/**
* 返回一个当前空闲的打印机编号
* @return
*/
private int getPrinter() {
int ret = -1;//表示打印机的编号
try {
lockFreePrinters.lock();
for (int i = 0; i < freePrinters.length; i++) {
//找到空闲的打印机
if (freePrinters[i]) {
ret = i;
freePrinters[i] = false;//表示当前打印机已被使用
break;
}
}
} catch (Exception e) {
e.printStackTrace();
} finally {
lockFreePrinters.unlock();
}
return ret;
}
}package com.concurrent.threadAvgClass;
public class Main {
public static void main(String[] args) {
/*
PrintQueue printQueue = new PrintQueue();
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(new Job(printQueue), "Thread-" + i);
}
for (int i = 0; i < 10; i++) {
threads[i].start();
}*/
PrintQueue2 printQueue2 = new PrintQueue2();
Thread[] threads = new Thread[10];
for (int i = 0; i < 10; i++) {
threads[i] = new Thread(new Job2(printQueue2), "Thread-" + i);
}
for (int i = 0; i < 10; i++) {
threads[i].start();
}
}
}