Semaphore ,synchronized 的加强版,主要作用控制线程并发的数量。
acquire(),
acquireUninterruptibly(),
release(),
availablePermits(),
drainPermits(),
getQueneLength(),
hasQuenedThreads(),
公平与非公平量,
tryAcquire(),
java并发编程上看到的例子。
package mavenTest.Semaphore; import java.util.ArrayList; import java.util.Arrays; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Executors; import java.util.concurrent.Future; import java.util.concurrent.Semaphore; import java.util.concurrent.locks.Condition; import java.util.concurrent.locks.ReentrantLock; public class RepastService { volatile private Semaphore setSemaphore = new Semaphore(10);// 生产者 volatile private Semaphore getSemaphore = new Semaphore(20);// 消费者 volatile private ReentrantLock lock = new ReentrantLock(); volatile private Condition setCondition = lock.newCondition(); volatile private Condition getCondition = lock.newCondition(); int objctNum = 4; volatile public Object[] producePosition = new Object[objctNum];// 商品 private boolean isEmpty() { return Arrays.asList(producePosition).stream().filter(x -> { return null != x; }).count() == 0; } private boolean isFull() { return Arrays.asList(producePosition).stream().filter(x -> { return null != x; }).count() == objctNum; } public void set(Object object) { try { setSemaphore.acquire(); lock.lock(); while (isFull()) { setCondition.await(); } for (int i = 0; i < producePosition.length; i++) { if (null == producePosition[i]) { producePosition[i] = object; System.out.println(Thread.currentThread().getName() + "生产了" + object); break; } } getCondition.signalAll(); lock.unlock(); } catch (InterruptedException e) { e.printStackTrace(); } finally { setSemaphore.release(); } } public Object get() { Object obj = null; try { getSemaphore.acquire(); lock.lock(); while (isEmpty()) { getCondition.await(); } for (int i = 0; i < producePosition.length; i++) { if (null != producePosition[i]) { obj = producePosition[i]; System.out.println(Thread.currentThread().getName() + "消费了" + obj); producePosition[i] = null; break; } } setCondition.signalAll(); lock.unlock(); } catch (InterruptedException e) { e.printStackTrace(); } finally { getSemaphore.release(); } return obj; } public static void main(String[] args) { RepastService service = new RepastService(); ExecutorService es = Executors.newCachedThreadPool(); for (int i = 0; i < 60; i++) { es.execute(new Thread(new ThreadSet(service, "商品" + i))); } es.shutdown(); ExecutorService es2 = Executors.newCachedThreadPool(); ArrayList<Future<Object>> getList = new ArrayList<>(); for (int i = 0; i < 60; i++) { getList.add(es2.submit(new CallAbleGet(service))); } try { for (Future<?> fo : getList) { System.out.println(fo.get()); } } catch (InterruptedException e) { e.printStackTrace(); } catch (ExecutionException e) { e.printStackTrace(); }finally { es2.shutdown(); } } }
package mavenTest.Semaphore; class ThreadSet implements Runnable{ private RepastService service; private Object obj; public ThreadSet(RepastService service,Object obj) { super(); this.obj =obj; this.service =service; } @Override public void run() { service.set(obj); } }
package mavenTest.Semaphore; import java.util.concurrent.Callable; public class CallAbleGet implements Callable<Object> { private RepastService service; public CallAbleGet(RepastService service) { super(); this.service = service; } @Override public Object call() throws Exception { return service.get(); } }