JAVA并发框架之Semaphore实现生产者与消费者模型

时间:2020-12-16 17:39:45

分类: Java技术

     锁和信号量(Semaphore)是实现多线程同步的两种常用的手段。信号量需要初始化一个许可值,许可值可以大于0,也可以小于0,也可以等于0.
     如果大于0,表示,还有许可证可以发放,线程不会被阻塞;
     如果小于或者等于0,表示,没有许可证可以发放了,线程被阻塞住了。

     它有两个常用的操作, acquire()申请许可证,如果有,就可以获得,如果没有就等待了。
                         release(),归还许可证,保证循环使用。


     看一个例子,就会明白了,还是实现上次的那个生产者和消费者的例子。
     我们假设有一个篮子,最多可以放3个苹果,有多个人可以放苹果,也有多个人可以拿走苹果。

public class Apple {
    private String name;
    public Apple(String name){
       thisnamename;
    }
        @Override
        public String toString() {
               // TODO Auto-generated method stub
               return name ;
       }

}

public class Basket {
        private List bascket =new ArrayList(10);
       Semaphore mutex new Semaphore(1);
       Semaphore isFull new Semaphore(10);
       
       Semaphore isEmpty new Semaphore(0);
       
        public void put(Apple appthrows InterruptedException{
               //大于0,就放行
               //acquire,就是减操作,如果小于0,就阻塞
               //release,就是加操作,如果大于0,就不会被阻塞
               isFullacquire();
               try{
                      mutexacquire();
                  bascket.add( app);
              }
               finally{
                      mutex.release();
                      isEmpty.release();
              }
       }
       
        public Apple take() throws InterruptedException{
              Apple app;
               isEmptyacquire();
               try{
                      mutexacquire();
                      appbascket.remove(0);
              }
               finally{
                      mutex.release();
                      isFull.release();
              }
               return app ;
       }

}

//消费者
public class Consumer implements Runnable{
        private Basket bascket ;
        private String name ;
        public Consumer(Basket bascket ,String name ){
               this .bascket =bascket ;
               this .name =name ;
       }
        public void run(){
               while (true ){
                      try {
                           System. out .println(name +":consumer" +bascket .take());
                     } catch (InterruptedException e1) {
                            // TODO Auto-generated catch block
                            e1.printStackTrace();
                     }
                      try {
                           Thread. sleep(1000);
                     } catch (InterruptedException ) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                     }
              }
       }

}

//生产者
public class Producer implements Runnable{
        private Basket bascket ;
        private String name ;
        public Producer(Basket bascket ,String name ){
               this .bascket =bascket ;
               this .name =name ;
       }
        public void run(){
              
               while (true ){
                      try {
                           System. out .println(name +"produce.." );
                            bascketput( new Apple( "name" +new Random()));
                     } catch (InterruptedException e) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                     }
                      try {
                           Thread. sleep(1000);
                     } catch (InterruptedException ) {
                            // TODO Auto-generated catch block
                            e.printStackTrace();
                     }
                     
              }
              
       }

}

public class TestDemo {
        public static void main(String args[]){
              Basket bascketnew Basket();
              Consumer c1new Consumer(bascket ,"c1" );
              
              Producer p1new Producer(bascket ,"p1" );
              Producer p2new Producer(bascket ,"p2" );
              
               //线程池管理
              ExecutorService service = Executors. newCachedThreadPool();
               service.execute( c1);
               service.execute( p1);
               service.execute( p2);
       }
}

一定要注意,上面acquire的顺序,如果不正确,所有的线程就会被阻塞了。
信号量的实现原理会在源代码中进行分析。