在API中是这样来描写叙述Semaphore
的
Semaphore 通经常使用于限制能够訪问某些资源(物理或逻辑的)的线程数目。
一个计数信号量。从概念上讲,信号量维护了一个许可集。如有必要,在许可可用前会堵塞每个 acquire()
,然后再获取该许可。每个release()
加入一个许可,从而可能释放一个正在堵塞的获取者。可是,不使用实际的许可对象,Semaphore
仅仅对可用许可的号码进行计数,并採取对应的行动。
比如,以下的类使用信号量控制线程并发的数量
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore; public class TestSemaphore { /**
* @param args
*/
public static void main(String[] args) {
ExecutorService pool = Executors.newCachedThreadPool();
final Semaphore sp = new Semaphore(3,true);
for(int i=0;i<10;i++){
Runnable runnable = new Runnable() { @Override
public void run() { try {
sp.acquire();
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println(sp.availablePermits());
System.out.println("线程 "+ Thread.currentThread().getName() +"进入,已有"+ (3-sp.availablePermits())+ "并发") ;
try {
Thread.sleep((long) (Math.random()*3000));
} catch (InterruptedException e) {
e.printStackTrace();
}
System.out.println("线程 "+Thread.currentThread().getName() +"即将离开 " );
sp.release();
System.out.println("线程 "+Thread.currentThread().getName() +"离开 ,已有"+ (3-sp.availablePermits()) + "并发");
}
};
pool.execute(runnable);
}
} }
再比如能够通过信号量来控制线程訪问资源:
import java.util.concurrent.Semaphore; public class DownloadThread {
private static int in_index = 0;
private static int out_index = 0;
private static int buffer_count = 100;
public static boolean g_downloadComplete;
private static Semaphore g_seFull = new Semaphore(0);
private static Semaphore g_seEmpty = new Semaphore(buffer_count);
public static boolean getBlockFromNet(int in_index) {
int i = 0;
while (i < 10000)
i++;
if (in_index < buffer_count - 1)
return false;
else
return true;
}
public static void writeBlockToDisk(int out_index) {
int i = 0;
while (i < 100000)
i++;
} /**
* @param args
*/
public static void main(String[] args) {
g_downloadComplete = false;
Thread threadA = new Thread() {
public void run() {
proA();
}
};
Thread threadB = new Thread() {
public void run() {
proB();
}
};
threadB.start();
threadA.start();
} public static void proA(){
while (g_seFull.availablePermits() < buffer_count) {
try {
g_seEmpty.acquire();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
g_downloadComplete = getBlockFromNet(in_index);
in_index = (in_index + 1) % buffer_count;
g_seFull.release();
System.out.println("download a block " + in_index);
if (g_downloadComplete)
break;
}
} public static void proB(){
while (g_seEmpty.availablePermits() > 0) {
try {
g_seFull.acquire();
} catch (InterruptedException e1) {
// TODO Auto-generated catch block
e1.printStackTrace();
}
writeBlockToDisk(out_index);
out_index = (out_index + 1) % buffer_count;
g_seEmpty.release();
System.out.println("write a block " + out_index);
if (g_downloadComplete && out_index == in_index)
break;
}
} }