java并发之(4):Semaphore信号量、CounDownLatch计数锁存器和CyclicBarrier循环栅栏

时间:2022-10-27 18:00:54

简介

    java.util.concurrent包是Java 5的一个重大改进,java.util.concurrent包提供了多种线程间同步和通信的机制,比如Executors, Queues, Timing, Synchronizers和Concurrent Collections等。与synchronized关键字和Object.notify()等方法相比,这些类和方法的抽象层次都较高。Effective Java中提到,其中比较重要的同步和通信机制有Executor框架、Concurrent Collections和Synchronizers三种。  

    其中Synchronizers包含了五种: Semaphore信号量,CounDownLatch倒计时锁存器,CyclicBarrier循环栅栏,Phaser和Exchanger。 JCIP中提到,Exchanger可以看做一种特殊的Barrier。Effective Java 提到用的比较多的主要是Semaphore信号量和CounDownLatch倒计时锁存器。本文主要讲解我认为比较重要的Semaphore信号量、CounDownLatch计数锁存器和CyclibBarrier。每一种都按照它们的概念、jdk实现、所提供的方法和使用(traveler或者jdk, or sample code)来进行介绍。


1 Semaphore

semaphore,信号量,是众多synchronizer中的一个。在操作系统中就存在互斥量和信号量这样的概念。 semaphore跟锁机制存在一定的相似性,semaphore也是一种锁机制,所不同的是,reentrantLock是只允许一个线程获得锁,而信号量持有多个许可(permits),允许多个线程获得许可并执行。从这个意义上看,重入锁是许可只有1的信号量。它们所提供的方法也非常接近。

 

1.1 实现

跟ReentrantLock一样,Semaphore也是以AQS为基础来实现的。

1.1.1 构造函数:

非公平版本:

1     public Semaphore(int permits) { 2 sync = new NonfairSync(permits); 3 }

 

可以选择是否公平的版本:

1     public Semaphore(int permits, boolean fair) { 2 sync = fair ? new FairSync(permits) : new NonfairSync(permits); 3 }

 

1.1.2 其他方法

跟ReentrantLock不同的是,每种acquire方法都分为有参数的和不带参数的两个版本:
acquire() :

1     public void acquire() throws InterruptedException { 2 sync.acquireSharedInterruptibly(1); 3 }

 

acquire(int permits)

1     public void acquire(int permits) throws InterruptedException { 2 if (permits < 0) throw new IllegalArgumentException(); 3  sync.acquireSharedInterruptibly(permits); 4 }

 

与此类似的还有:

acquireUninterruptibly()&acquireUninterruptibly(int)

tryAcquire()& tryAcquire(int) 

tryAcquire(long,TimeUnit)& tryAcquire(int, long,TimeUnit)

release()& release(int)

java并发之(4):Semaphore信号量、CounDownLatch计数锁存器和CyclicBarrier循环栅栏

1.2 使用示例:

 1 import java.util.concurrent.ExecutorService;
 2 import java.util.concurrent.Executors;
 3 import java.util.concurrent.Semaphore;
 4 
 5 public class TIJ_semaphore {
 6     public static void main(String[] args) {
 7         ExecutorService exec = Executors.newCachedThreadPool();
 8         final Semaphore semp = new Semaphore(5); // 5 permits
 9 
10         for (int index = 0; index < 20; index++) {
11             final int NO = index;
12             Runnable run = new Runnable() {
13                 public void run() {
14                     try {
// if 1 permit avaliable, thread will get a permits and go; if no permit avaliable, thread will block until 1 avaliable
15 semp.acquire();
16 System.out.println("Accessing: " + NO); 17 Thread.sleep((long) (10000); 18 semp.release(); 19 } catch (InterruptedException e) { 20 } 21 } 22 }; 23 exec.execute(run); 24 } 25 exec.shutdown(); 26 }

 程序输出结果为:

java并发之(4):Semaphore信号量、CounDownLatch计数锁存器和CyclicBarrier循环栅栏java并发之(4):Semaphore信号量、CounDownLatch计数锁存器和CyclicBarrier循环栅栏
 1 Accessing: 0
 2 Accessing: 2
 3 Accessing: 3
 4 Accessing: 4
 5 Accessing: 1
 6 (等待10s)
 7 Accessing: 5
 8 Accessing: 6
 9 Accessing: 14
10 Accessing: 8
11 Accessing: 7
12 (等待10s)
13 Accessing: 10
14 Accessing: 9
15 Accessing: 11
16 Accessing: 15
17 Accessing: 12
18 (等待10s)
19 Accessing: 13
20 Accessing: 16
21 Accessing: 17
22 Accessing: 19
23 Accessing: 18
View Code

 

 

2 CountDownLatch

2.1 实现

内部使用AQS实现


2.2 方法
await()

等待,无超时,可以被中断

1 public void await() throws InterruptedException {
2         sync.acquireSharedInterruptibly(1);
3     }

 

boolean await(long,timeUnit):

如果等待超时,则返回false; 如果时间为0或者为负,则立刻返回。

1  public boolean await(long timeout, TimeUnit unit)
2         throws InterruptedException {
3         return sync.tryAcquireSharedNanos(1, unit.toNanos(timeout));
4     }

 

countDown():

把Latch的计数减1,如果计数到达0,则释放所有正在等待的线程。

1 public void countDown() {
2         sync.releaseShared(1);
3     }

 

2.3 使用:

绝大多数synchronizer在jdk中没有使用,原因很简单:这些synchronizer是抽象层次较高的,所以一般只有应用程序才会直接使用。

而在nts生产环境中,只有一处admin.rest.api.RestRequestSender使用了CountDownLatch:

1    public Map<String, List<JSONObject>> doDelete(final String reqUri, final HttpHeaders _headers)
2          throws RestAPIException
3    {
4       setMethod(Method.DELETE);
5       setUriTemplate(reqUri);
6       headers = _headers;
7       return processBroadcast(); 8    }

 doDelete调用了processBraodcast:

   private Map<String, List<JSONObject>> processBroadcast() throws RestAPIException
   {
      ...
      final Map<String, SaaSServerContext> dServers = RestRequestSender.deployedServers;
      if (!dServers.isEmpty())
      {
         final CountDownLatch doneSignal = new CountDownLatch(dServers.size());
         ...
         for (final String key : dServers.keySet())
         {
            ...
            executor.submit(new RRSRunnable(doneSignal, context, results, key, totalRecordsCounter));
         }

         try
         {
            if (!doneSignal.await(Configuration.NTS_MDM_API_BROADCAST_TIMEOUT.getInt(), TimeUnit.MINUTES)) // if timeout will retrun false
            {
               XLog.warning("MDM api broadcast timed out after " + Configuration.NTS_MDM_API_BROADCAST_TIMEOUT.getInt()
                     + " minutes. Timeout is set via notes.ini key "
                     + Configuration.NTS_MDM_API_BROADCAST_TIMEOUT.getNotesIniName());
            }
         }
         catch (final InterruptedException ie)
         {
            throw new RestAPIException("Interrupted", ie);
         }
        
      }

      return results; // if doneSingnal.await has been intrerrupted,  will this line still execute? 
   }

 RRSRunnable代码如下:

class RRSRunnable implements Runnable
   {
      private final CountDownLatch    doneSignal;
      ...

      RRSRunnable(final CountDownLatch doneSignal, final SaaSServerContext context,
                  final Map<String, List<JSONObject>> results, final String serverKey,
                  final MaxRecordCounter recordCounter)
      {
         this.doneSignal = doneSignal;
         ...
      }

      @Override
      public void run()
      {
         ...
         try
         {
            processRequest(responses, context.getHostName(), client, context, recordCounter);
            final long elapsedTime = System.currentTimeMillis() - startTime;
            XLog.fine("Traveler API request completed. orgid=" + orgId + ";request=" + reqUri + ";pool=" + serverKey
                  + ";time=" + elapsedTime + "ms");
         }
         catch (final RestAPIException rae)
         {
            exceptionServerKey = serverKey;
            exception = rae;
         }
         finally
         {
            doneSignal.countDown();
         }
      }
   }

 

2.4 更加一般的Latch:

CountDownLatch是一种特殊的Latch,jcip第八章用countdownLatch实现了一种valueLatch。

2.4.1 nts Latch简介:

在nts生产代码中也实现了一种Latch,Latch允许多个线程间的协作:在这种Latch中,有working thread和latching thread之分:

workingThread在做一些工作,latchingThread希望当这些工作完成的时候,锁存这些工作,然后得到workingThread的工作结果。workingThread和latchingThread共享一个Latch对象,workingThread会调用start方法,通知它正在开始针对特定Object的工作已经开始了。同时,latchingThread将调用latch方法,并传进它希望等待的Object。 当workingThread完成对某一Object(start方法传入的)的工作后,它将调用finish方法,传入该对象,以及工作的结果对象。当finish方法被调用后,调用latch方法的线程被唤醒,返回工作结果给latch方法的调用者。多个线程可以锁存同一个将要完成某些工作的object。一旦任意一个线程调用了finish方法,他们都将被唤醒并返回结果对象。如果调用latch方法时,针对latch对象的工作还没有开始,线程立刻返回,并不会block. 所以start(Object)应该首先被调用。

workingThread调用start(Object)方法,表明它开始工作。 同时,latchingThread调用latch(Object,long)方法,等待workingThread的执行完成。 workingThread执行finish(Object,Object)方法,表示工作完成,此时,latchingThread醒来。start(Object) finish(Object,Object) --> working thread 第二个参数为结果。 ?
latch(Object,long) --> latching thread

2.4.2 nts Latch 实现:

start:

 1    public boolean start(final Object obj)  2    {
 3       final long timeStart = System.currentTimeMillis();
 4       boolean rv = false;
 5       Barrier b = null;  6       synchronized (this)
 7       {
 8          if (!latched.containsKey(obj))  9          {
10             b = new Barrier("Latch:" + name + "_Obj:" + obj, 1);
11  latched.put(obj, b); // latched is a synchronizedHashMap 12             rv = true;
13          }
14       }
15       XLog.exiting("name=" + name, "obj=" + obj, "barrier=" + b, "rv=" + rv, ("Elapsed time="
16             + (System.currentTimeMillis() - timeStart) + "ms"));
17       return rv;
18    }

 

finish:

 1    public void finish(final Object obj, final Object result)
 2    {
 3       final long timeStart = System.currentTimeMillis();
 4       final Barrier b;
 5       synchronized (this)
 6       {
 7          b = latched.remove(obj);
 8          if (null != b)
 9          {
10             // there are waiters that need the result
11             b.result = result;
12             try
13             {
14                b.enter(0); 15             }
16             catch (final InterruptedException e)
17             {
18                // ignored
19             }
20          }
21       }
22       XLog.exiting("name=" + name, "obj=" + obj, "result=" + result, "barrier=" + b, ("Elapsed time="
23             + (System.currentTimeMillis() - timeStart) + "ms"));
24    }

 


3 CyclicBarrier

CountDownLatch一般用于某个线程A等待若干个其他线程执行完任务之后,它(一个线程)才执行; 而CyclicBarrier一般用于一组线程互相等待至某个状态,然后这一组线程再同时执行。

 

3.1 实现

使用Lock和Condition实现。不同于AQS。(Condition是基于AQS实现的)

CyclicBarrier包含下面的域:

1     /** The lock for guarding barrier entry */
2     private final ReentrantLock lock = new ReentrantLock();
3     /** Condition to wait on until tripped */
4     private final Condition trip = lock.newCondition();

   

3.1.1 构造函数

当在等待栅栏的线程个数到达预定义的个数时,barrier 发生trip, 但是因为没有预定义的动作,所以不执行任何动作。

1     public CyclicBarrier(int parties) { 2 this(parties, null); 3 }

当barrier发生trip时,会由最后一个进入该barrier的线程执行特定的动作:

1    public CyclicBarrier(int parties, Runnable barrierAction) { 2 if (parties <= 0) throw new IllegalArgumentException(); 3 this.parties = parties; 4 this.count = parties; 5 this.barrierCommand = barrierAction; 6 }

 

CyclicBarrier方法: 

await():

 public int await() throws InterruptedException, BrokenBarrierException {
        try {
            return dowait(false, 0L);
        } catch (TimeoutException toe) {
            throw new Error(toe); // cannot happen
        }
    }

 await(Long time, TimeUtil unit):

1     public int await(long timeout, TimeUnit unit) 2 throws InterruptedException, 3  BrokenBarrierException, 4  TimeoutException { 5 return dowait(true, unit.toNanos(timeout)); 6 }

上述两种方法都调用了dowait方法:

 1  private int dowait(boolean timed, long nanos)
 2         throws InterruptedException, BrokenBarrierException,
 3                TimeoutException {
 4         final ReentrantLock lock = this.lock;
 5  lock.lock();  6         try {
final Generation g = generation;
7 ... 8 if (Thread.interrupted()) { 9 breakBarrier(); 10 throw new InterruptedException(); 11 } 12 13 int index = --count; 14 if (index == 0) { // tripped 15 boolean ranAction = false; 16 try { 17 final Runnable command = barrierCommand; 18 if (command != null) 19 command.run(); 20 ranAction = true; 21 nextGeneration(); 22 return 0; 23 } finally { 24 if (!ranAction) 25 breakBarrier(); 26 } 27 } 28 29 // loop until tripped, broken, interrupted, or timed out 30 for (;;) { 31 try { 32 if (!timed) 33 /* The lock associated with this Condition is atomically
     released and the current thread becomes disabled for thread scheduling
     purposes */

trip.await();
34 else if (nanos > 0L) 35 nanos = trip.awaitNanos(nanos); 36 } catch (InterruptedException ie) { 37 if (g == generation && ! g.broken) { 38 breakBarrier(); 39 throw ie; 40 } else { 41 // We're about to finish waiting even if we had not 42 // been interrupted, so this interrupt is deemed to 43 // "belong" to subsequent execution. 44 Thread.currentThread().interrupt(); 45 } 46 } 47 48 if (g.broken) 49 throw new BrokenBarrierException(); 50 51 if (g != generation) 52 return index; 53 54 if (timed && nanos <= 0L) { 55 breakBarrier(); 56 throw new TimeoutException(); 57 } 58 } 59 } finally { 60 lock.unlock(); 61 } 62 }

breakBarrier:

1     private void breakBarrier() {
2         generation.broken = true;
3         count = parties;
4  trip.signalAll(); 5     }

 

3.2 使用:

在jdk和traveler code中没有使用。这符合Effective Java 69中的描述。在实际使用中较少见到。

 

4 Synchronizere与wait()/wait(long)/wait(long,int)/notify()/notifyAll()的比较:

    在effective java 69中提到,wait()/wait(long)/wait(long,int)/notify()/notifyAll()不易使用且容易出错。一般来讲应该优选更高级的concurrent container 或者 synchronizer。 其中synchronizer比较常用的是Semaphore和CountDownLatch,而CyclicBarrier和Exchanger 则使用的比较少。说从易用性上来讲,wait()/notify()/notifyAll()更象是汇编语言,并发容器和synchronizer更像是高级语言。 但我在nts代码中看到了很多wait/notify,而Semaphore和CountDownLatch则用的很少。

    Lock/Condition()是使用AQS实现的,Lock/Condition() 组合可以用来替代Object.wait()/notify()。 而高级的synchronizer: CountDownLatch& Semaphore也是基于AQS实现的。所以理论上,可以替代wait/notify。 Semaphore和CountDownLatch也都包含了跟wait(long timeout)相对应的方法。

    考虑以下在下面的例子中,是否可以用Semaphore& CountDownLatch来代替wait/notify ?

  1  public Connection getConnection(final boolean highpriority) throws SQLException
  2    {
  3       final long sTime = System.currentTimeMillis();
  4       Connection conn = null; 
  6       boolean createConnection = false;
  7       boolean waitConnection = false;
  8 
  9       try
 10       {
 11          while (true)  12          {
 13             createConnection = false;
 14             waitConnection = false;
 15         
 17             synchronized (dbConnections)
 18             {
 19                
 20                final List<Connection> dbConnectionsToBeFreedTemp = new ArrayList<Connection>();
 21                synchronized (dbConnectionsToBeFreed)  22                {
 23                   if (!dbConnectionsToBeFreed.isEmpty())
 24                   {
 25                      dbConnectionsToBeFreedTemp.addAll(dbConnectionsToBeFreed);
 26                      dbConnectionsToBeFreed.clear();
 27                      if (1 == dbConnectionsToBeFreedTemp.size())
 28                      {
 29                         // only one, so only notify one 
 30  dbConnectionsToBeFreed.notify(); // Semaphore.release()  31                      }
 32                      else
 33                      {
 34  dbConnectionsToBeFreed.notifyAll(); // CountDownLatch.await()  35                      }
 36                   }
 37                   if (isThrottDown && connectionCount <= Tier.ONE.value * .20F)
 38                   {
 39                      isThrottDown = false;
 40                      dbConnectionsToBeFreed.notifyAll();
 41                   }
 42                }
 43 
 44                if (!dbConnectionsToBeFreedTemp.isEmpty())
 45                {
 46                   for (final Connection connToFree : dbConnectionsToBeFreedTemp) 
 47                   {
 48                      if (!closeConnectionIfAgedOut(connToFree))
 49                      {
 50                         dbConnections.add(connToFree);
 51                      }
 52                   }
 53                   dbConnectionsToBeFreedTemp.clear();
 54                }
 55 
 56              
 57                if (Configuration.NTS_DB_CONNECTION_THROTTLING.getBoolean())
 58                {
 59                   getCurrentTier();
 60                 
 61                   if ((!dbConnections.isEmpty() && !isThrottDown)
 62                         && (highpriority || ((connectionCount - dbConnections.size()) < currentTier.value))) //Prevent non-high priority requests from grabbing freed high priority connections if connection is capped
 63                   {
 64                      conn = dbConnections.remove(0);
 65                   }
 66 
 67                   if (null == conn)
 68                   {
 69                      // See if we should create a new connection or have to wait
 70                      // if none are in the stack, then see if we should make another
 71                      if ((connectionCount < currentTier.value || (highpriority && (connectionCount < maxConnectionCount)))
 72                            && !isThrottDown)
 73                      {
 74                         
 75                         createConnection = true;
 76                         connectionCount++;
 77                      }
 78                      else if (!isScheduled && conn == null && !highpriority && !createConnection
 79                            && currentTier != Tier.THREE && !isThrottDown)
 80                      {
 81                         WallClock.getInstance().addAlarm(ALARM_NAME,
 82                               Configuration.NTS_DB_POOL_STEP_INTERVAL_TIMER.getInt(), alarmStepUpTier);
 83                         isScheduled = true;
 84                         waitConnection = true;
 85                      }
 86                      else if (conn == null && !createConnection && currentTier == Tier.THREE
 87                            && connectionCount - dbConnections.size() >= Tier.THREE.value && !isScheduled)
 88                      {
 89                         WallClock.getInstance().addAlarm(ALARM_THROTTLE_DOWN,
 90                               Configuration.NTS_DB_POOL_STEP_INTERVAL_TIMER.getInt(), alarmThrottleDown);
 91                         isScheduled = true;
 92                         waitConnection = true;
 93                      }
 94                      else
 95                      {
 96                         // Connections are maxed out, so we have to wait
 97                         waitConnection = true;
 98                      }
 99                   }
100                }
101                else
102                {
103                   if (!dbConnections.isEmpty()
104                         && (highpriority || ((connectionCount - dbConnections.size()) <= maxConnectionCount)))
105                   {
106                      conn = dbConnections.remove(0);
107                   }
108 
109                   if (null == conn) 110                   {
111                      // See if we should create a new connection or have to wait
112                      // if none are in the stack, then see if we should make another
113                      if ((connectionCount < maxConnectionCount) || highpriority) 114  { 115                         
116                         createConnection = true; 117                         connectionCount++; 118  } 119                      else
120  { 121                         // Connections are maxed out, so we have to wait
122                         waitConnection = true; 123  } 124                   }
125                }
126             }
127 
128             if (null != conn) 129             {
130                // we have a Connection, so we are done
131                break;
132             }
133             else if (createConnection) 134             {
135                if (!isDerby)
136                {
137                   // update user and password from Configuration
138                   connProps.put("user", Configuration.NTS_DBUSER.getString());
139                   connProps.put("password", Configuration.NTS_DBPASSWORD.getString());
140                }
141 
142                try
143                {
144                   conn = DriverManager.getConnection(url, connProps); 
145                }
146                catch (final SQLException sqle)
147                {
148                   connectionCount--; // count must be decremented since a connection was never created
149                   DatabaseStatus.reportException(sqle);
150                   throw sqle;
151                }
152 
153                if (conn != null)
154                {
155                   DatabaseStatus.clearException();
156                   if (peakConnections < connectionCount)
157                   {
158                      peakConnections = connectionCount;
159                      Stats.setStat(Stats.DB_POOL_PEAK_CONNECTIONS_COUNT, peakConnections);
160                      Stats.setStat(Stats.DB_POOL_PEAK_CONNECTIONS_TIME, new Date().toString());
161                   }
162                   allConnections.put(Integer.valueOf(conn.hashCode()), PersistentStore.createDeathTimeStamp());
163                   
164                   break;
165                }
166                else if (Configuration.NTS_DB_CONNECTION_THROTTLING.getBoolean())
167                {
168                   // Unexpected Database exceptions will result in connection == null.
169                   // Instead of retrying the connection, throw a checked exception.
170                   throw new DBConnectionsAllUsedException("There are no DB Connections available");
171                }
172                else
173                {
174                   // don't break which will do the retry
175                }
176             }
177             else if (waitConnection) 178             {
179                try
180                {
181                   Stats.inc(Stats.DB_THREADS_WAITING_FOR_CONNECTION);
182                   synchronized (dbConnectionsToBeFreed)
183                   {
184  dbConnectionsToBeFreed.wait(); // Semaphore.acquire() CountDownLatch.countdown() 185                   }
186                }
187                finally
188                {
189                   Stats.dec(Stats.DB_THREADS_WAITING_FOR_CONNECTION);
190                }
191                // don't break which will do the retry
192             }
193             else
194             {
195                // should never hit this case
196                break;
197             }
198          }
199       }
200       catch (final InterruptedException ie)
201       {
202          // expected
203          throw new SQLException("Exception waiting for DB connection.", ie);
204       }
205       finally
206       {
207         
208       }
209 
210       return conn;
211    }

 


5 进一步问题:

5.1 CyclicBarrier方法,await(): 为什么要提供这两种方法来包裹wrapper dowait方法?

 

5.2 Semaphore和CountDownLatch互换

 

 1 final Semaphore sem = new Semaphore(0);  2 for (int i = 0; i < num_threads; ++ i)
 3 {
 4   Thread t = new Thread() {
 5     public void run()
 6     {
 7       try
 8       {
 9         doStuff();
10       }
11       finally
12       {
13  sem.release(); 14       }
15     }
16   };
17   t.start();
18 }
19 
20 sem.acquire(num_threads);

 

 

 

 1 final CountDownLatch latch = new CountDownLatch(num_threads);
 2 for (int i = 0; i < num_threads; ++ i)
 3 {
 4   Thread t = new Thread() {
 5     public void run()
 6     {
 7       try
 8       {
 9         doStuff();
10       }
11       finally
12       {
13         latch.countDown();
14       }
15     }
16   };
17   t.start();
18 }
19 
20 latch.await();

 

 


6 参考文献
 http://www.cnblogs.com/dolphin0520/p/3920397.html

 

 

 ----- 单例 -----

单例与static方法的区别: 一个是实例,可以传递,另外一个不可以。(好像也没什么用,传递单例)。

http://*.com/questions/519520/difference-between-static-class-and-singleton-pattern#