Semaphore 实现生产者消费者模式

时间:2020-12-18 17:40:13

Semaphore ,synchronized 的加强版,主要作用控制线程并发的数量。

acquire(),

acquireUninterruptibly(),

release(),

availablePermits(),

drainPermits(),

getQueneLength(),

hasQuenedThreads(), 

公平与非公平量,

tryAcquire(),

java并发编程上看到的例子。

package mavenTest.Semaphore;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Future;
import java.util.concurrent.Semaphore;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;

public class RepastService {
	volatile private Semaphore setSemaphore = new Semaphore(10);// 生产者
	volatile private Semaphore getSemaphore = new Semaphore(20);// 消费者
	volatile private ReentrantLock lock = new ReentrantLock();
	volatile private Condition setCondition = lock.newCondition();
	volatile private Condition getCondition = lock.newCondition();
	int objctNum = 4;
	volatile public Object[] producePosition = new Object[objctNum];// 商品

	private boolean isEmpty() {
		return Arrays.asList(producePosition).stream().filter(x -> {
			return null != x;
		}).count() == 0;
	}

	private boolean isFull() {
		return Arrays.asList(producePosition).stream().filter(x -> {
			return null != x;
		}).count() == objctNum;
	}

	public void set(Object object) {
		try {
			setSemaphore.acquire();
			lock.lock();
			while (isFull()) {
				setCondition.await();
			}
			for (int i = 0; i < producePosition.length; i++) {
				if (null == producePosition[i]) {
					producePosition[i] = object;
					System.out.println(Thread.currentThread().getName() + "生产了" + object);
					break;
				}
			}
			getCondition.signalAll();
			lock.unlock();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			setSemaphore.release();
		}
	}

	public Object get() {
		Object obj = null;
		try {
			getSemaphore.acquire();
			lock.lock();
			while (isEmpty()) {
				getCondition.await();
			}
			for (int i = 0; i < producePosition.length; i++) {
				if (null != producePosition[i]) {
					obj = producePosition[i];
					System.out.println(Thread.currentThread().getName() + "消费了" + obj);
					producePosition[i] = null;
					break;
				}
			}
			setCondition.signalAll();
			lock.unlock();
		} catch (InterruptedException e) {
			e.printStackTrace();
		} finally {
			getSemaphore.release();
		}

		return obj;
	}

	public static void main(String[] args) {
		RepastService service = new RepastService();
		ExecutorService es = Executors.newCachedThreadPool();
		for (int i = 0; i < 60; i++) {
			es.execute(new Thread(new ThreadSet(service, "商品" + i)));
		}
		es.shutdown();
		ExecutorService es2 = Executors.newCachedThreadPool();
		ArrayList<Future<Object>> getList = new ArrayList<>();
		for (int i = 0; i < 60; i++) {
			getList.add(es2.submit(new CallAbleGet(service)));
		}
		try {
			for (Future<?> fo : getList) {
				System.out.println(fo.get());
			}
		} catch (InterruptedException e) {
			e.printStackTrace();
		} catch (ExecutionException e) {
			e.printStackTrace();
		}finally {
			es2.shutdown();
		}
	}
}
package mavenTest.Semaphore;

class ThreadSet implements Runnable{
	private RepastService service;
	private Object obj;
	public ThreadSet(RepastService service,Object obj) {
		super();
		this.obj =obj;
		this.service =service;
	}
	@Override
	public void run() {
		service.set(obj);
	}
}

package mavenTest.Semaphore;

import java.util.concurrent.Callable;

public class CallAbleGet implements Callable<Object> {
	private RepastService service;

	public CallAbleGet(RepastService service) {
		super();
		this.service = service;
	}

	@Override
	public Object call() throws Exception {
		return service.get();
	}
}