java.util.concurrent包下的几个常用类

时间:2022-11-30 19:53:39

本文的参考地址:http://blog.csdn.net/xsl1990/article/details/18564097

1.Callable<V>

Callable<V>与Runnable类似,理解Callable<V>可以从比较其与Runnable的区别开始:

1)从使用上:实现的Callable<V>的类需要实现call()方法,此方法有返回对象V;而Runnable的子类需要实现run()方法,但没有返回值;
2)如果直接调用Callable<V>的子类的call()方法,代码是同步顺序执行的;而Runnable的子类是线程,是代码异步执行。
3)将Callable子类submit()给线程池去运行,那么在时间上几个Callable的子类的执行是异步的。
即:如果一个Callable执行需要5s,那么直接调用Callable.call(),执行3次需要15s;
而将Callable子类交个线程执行3次,在池可用的情况下,只需要5s。这就是基本的将任务拆分异步执行的做法。
4)callable与future的组合用法:
(什么是Future?Future 表示异步计算的结果。其用于获取线程池执行callable后的结果,这个结果封装为Future类。详细可以参看Future的API,有示例。)
一种就像上面所说,对一个大任务进行分制处理;
另一种就是对一个任务的多种实现方法共同执行,任何一个返回计算结果,则其他的任务就没有执行的必要。选取耗时最少的结果执行。

2.Semaphore

一个计数信号量,主要用于控制多线程对共同资源库访问的限制。
典型的实例:1)公共厕所的蹲位……,10人等待5个蹲位的测试,满员后就只能出一个进一个。
2)地下车位,要有空余才能放行
3)共享文件IO数等
与线程池的区别:线程池是控制线程的数量,信号量是控制共享资源的并发访问量。
实例:Semaphore avialable = new Semaphore(int x,boolean y);
x:可用资源数;y:公平竞争或非公平竞争(公平竞争会导致排队,等待最久的线程先获取资源)
用法:在获取工作资源前,用Semaphore.acquire()获取资源,如果资源不可用则阻塞,直到获取资源;操作完后,用Semaphore.release()归还资源
代码示例:(具体管理资源池的示例,可以参考API的示例)

[java] view plain copy
  1. public class SemaphoreTest {  
  2.     private static final int NUMBER = 5;    //限制资源访问数  
  3.     private static final Semaphore avialable = new Semaphore(NUMBER,true);  
  4.     public static void main(String[] args) {  
  5.         ExecutorService pool = Executors.newCachedThreadPool();  
  6.         Runnable r = new Runnable(){  
  7.             public void run(){  
  8.                 try {  
  9.                     avialable.acquire();    //此方法阻塞  
  10.                     Thread.sleep(10*1000);  
  11.                     System.out.println(getNow()+"--"+Thread.currentThread().getName()+"--执行完毕");  
  12.                     avialable.release();  
  13.                 } catch (InterruptedException e) {  
  14.                     e.printStackTrace();  
  15.                 }  
  16.             }  
  17.         };  
  18.         System.out.println(avialable.availablePermits());  
  19.         for(int i=0;i<10;i++){  
  20.             pool.execute(r);  
  21.         }  
  22.         System.out.println(avialable.availablePermits());  
  23.         pool.shutdown();  
  24.     }  
  25.       
  26.     public static String getNow(){  
  27.         SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");  
  28.         return sdf.format(new Date());  
  29.     }  
  30. }  

3.ReentrantLock与Condition

1.ReentrantLock:可重入互斥锁。使用上与synchronized关键字对比理解:
1.1)synchronized示例:

[java] view plain copy
  1. synchronized(object){  
  2.         //do process to object  
  3.     }  

1.2)ReentrantLock示例:(api)
    [java] view plain copy
  1. private final ReentrantLock lock = new ReentrantLock();  
  2.    public void m() {   
  3.      lock.lock();  // block until condition holds  
  4.      try {  
  5.        // ... method body  
  6.      } finally {  
  7.        lock.unlock()  
  8.      }  
  9.    }  

由1.1)和1.2)的示例很好理解,ReetantLock也就是一个锁,线程执行某段代码时,需要争用此类实例的锁,用完后要显示的释放此锁。
至于具体区别,后面在说……
2.Condition:此类是同步的条件对象,每个Condition实例绑定到一个ReetrantLock中,以便争用同一个锁的多线程之间可以通过Condition的状态来获取通知。
注意:使用Condition前,首先要获得ReentantLock,当条件不满足线程1等待时,ReentrantLock会被释放,以能让其他线程争用,其他线程获得reentrantLock,然后满足条件,唤醒线程1继续执行。
这与wait()方法是一样的,调用wait()的代码块要被包含在synchronized块中,而当线程r1调用了objectA.wait()方法后,同步对象的锁会释放,以能让其他线程争用;其他线程获取同步对象锁,完成任务,调用objectA.notify(),让r1继续执行。代码示例如下。


代码示例1(调用 condition.await();会释放lock锁 ):

[java] view plain copy
  1. public class ConditionTest {  
  2.     private static final ReentrantLock lock = new ReentrantLock(true);  
  3.     //从锁中创建一个绑定条件  
  4.     private static final Condition condition = lock.newCondition();  
  5.       
  6.     private static int count = 1;  
  7.   
  8.     public static void main(String[] args) {  
  9.           
  10.         Runnable r1 = new Runnable(){  
  11.             public void run(){  
  12.                 lock.lock();  
  13.                 try{  
  14.                     while(count<=5){  
  15.                         System.out.println(Thread.currentThread().getName()+"--"+count++);  
  16.                         Thread.sleep(1000);  
  17.                     }  
  18.                     condition.signal();     //线程r1释放条件信号,以唤醒r2中处于await的代码继续执行。  
  19.                 } catch (InterruptedException e) {  
  20.                     e.printStackTrace();  
  21.                 }finally{  
  22.                     lock.unlock();  
  23.                 }  
  24.             }  
  25.         };  
  26.           
  27.         Runnable r2 = new Runnable(){  
  28.             public void run(){  
  29.                 lock.lock();  
  30.                 try{  
  31.                     if(count<=5){  
  32.                         System.out.println("----$$$---");  
  33.                         condition.await();  //但调用await()后,lock锁会被释放,让线程r1能获取到,与Object.wait()方法一样  
  34.                         System.out.println("----------");  
  35.                     }  
  36.                     while(count<=10){  
  37.                         System.out.println(Thread.currentThread().getName()+"--"+count++);  
  38.                         Thread.sleep(1000);  
  39.                     }  
  40.                 } catch (InterruptedException e) {  
  41.                     e.printStackTrace();  
  42.                 }finally{  
  43.                     lock.unlock();  
  44.                 }  
  45.             }  
  46.         };  
  47.   
  48.         new Thread(r2).start(); //让r2先执行,先获得lock锁,但条件不满足,让r2等待await。  
  49.         try {  
  50.             Thread.sleep(100);  //这里休眠主要是用于测试r2.await()会释放lock锁,被r1获取  
  51.         } catch (InterruptedException e) {  
  52.             e.printStackTrace();  
  53.         }  
  54.         new Thread(r1).start();  
  55.     }  
  56. }  
代码示例2(此例子来自于Condition的API):

[java] view plain copy
  1. public class ConditionMain {  
  2.   
  3.     public static void main(String[] args) {  
  4.         final BoundleBuffer buf = new ConditionMain().new BoundleBuffer();  
  5.         new Thread(new Runnable(){  
  6.             public void run() {  
  7.                 for(int i=0;i<1000;i++){  
  8.                     try {  
  9.                         buf.put(i);  
  10.                         System.out.println("入值:"+i);  
  11.                         Thread.sleep(200);  
  12.                     } catch (InterruptedException e) {  
  13.                         e.printStackTrace();  
  14.                     }  
  15.                 }  
  16.             }  
  17.         }).start();  
  18.         new Thread(new Runnable(){  
  19.             public void run() {  
  20.                 for(int i=0;i<1000;i++){  
  21.                     try {  
  22.                         int x = buf.take();  
  23.                         System.out.println("出值:"+x);  
  24.                         Thread.sleep(2000);  
  25.                     } catch (InterruptedException e) {  
  26.                         e.printStackTrace();  
  27.                     }  
  28.                 }  
  29.             }  
  30.         }).start();  
  31.     }  
  32.   
  33.     public class BoundleBuffer {  
  34.         final Lock lock = new ReentrantLock();  
  35.         final Condition notFull = lock.newCondition();  
  36.         final Condition notEmpty = lock.newCondition();  
  37.   
  38.         final Integer[] items = new Integer[10];  
  39.         int putptr, takeptr, count;  
  40.   
  41.         public void put(int x) throws InterruptedException {  
  42.             System .out.println("put wait lock");  
  43.             lock.lock();  
  44.             System .out.println("put get lock");  
  45.             try {  
  46.                 while (count == items.length){  
  47.                     System.out.println("buffer full, please wait");  
  48.                     notFull.await();  
  49.                 }  
  50.                 items[putptr] = x;  
  51.                 if (++putptr == items.length)  
  52.                     putptr = 0;  
  53.                 ++count;  
  54.                 notEmpty.signal();  
  55.             } finally {  
  56.                 lock.unlock();  
  57.             }  
  58.         }  
  59.         public int take() throws InterruptedException {  
  60.             System .out.println("take wait lock");  
  61.             lock.lock();  
  62.             System .out.println("take get lock");  
  63.             try {  
  64.                 while (count == 0){  
  65.                     System.out.println("no elements, please wait");  
  66.                     notEmpty.await();  
  67.                 }  
  68.                 int x = items[takeptr];  
  69.                 if (++takeptr == items.length)  
  70.                     takeptr = 0;  
  71.                 --count;  
  72.                 notFull.signal();  
  73.                 return x;  
  74.             } finally {  
  75.                 lock.unlock();  
  76.             }  
  77.         }  
  78.     }  
  79. }  

4.BlockingQueue

简单介绍。这是一个阻塞的队列超类接口,concurrent包下很多架构都基于这个队列。BlockingQueue是一个接口,此接口的实现类有:ArrayBlockingQueue, DelayQueue, LinkedBlockingDeque, LinkedBlockingQueue, PriorityBlockingQueue, SynchronousQueue 。每个类的具体使用可以参考API。 这些实现类都遵从共同的接口定义(一目了然,具体参考api):
[html] view plain copy
  1.     抛出异常    特殊值         阻塞      超时   
  2. 插入  add(e)      offer(e)    put(e)      offer(e, time, unit)   
  3. 移除  remove()    poll()      take()      poll(time, unit)   
  4. 检查  element()   peek()      不可用         不可用   

5.CompletionService

1.CompletionService是一个接口,用来保存一组异步求解的任务结果集。api的解释是:将新生产的异步任务与已完成的任务结果集分离开来。
2.CompletionService依赖于一个特定的Executor来执行任务。实际就是此接口需要多线程处理一个共同的任务,这些多线程由一个指定的线程池来管理。CompletionService的实现类ExecutorCompletionService。
3.api的官方代码示例参考ExecutorCompletionService类的api(一个通用分制概念的函数)。
4.使用示例:如有时我们需要一次插入大批量数据,那么可能我们需要将1w条数据分开插,异步执行。如果某个异步任务失败那么我们还要重插,那可以用CompletionService来实现。下面是简单代码:
(代码中1w条数据分成10份,每次插1000条,如果成功则返回true,如果失败则返回false。那么忽略数据库的东西,我们假设插1w条数据需10s,插1k条数据需1s,那么下面的代码分制后,插入10条数据需要2s。为什么是2s呢?因为我们开的线程池是8线程,10个异步任务就有两个需要等待池资源,所以是2s,如果将下面的8改为10,则只需要1s。)
[java] view plain copy
  1. public class CompletionServiceTest {  
  2.   
  3.     public static void main(String[] args) {  
  4.         ExecutorService pool = Executors.newFixedThreadPool(8);     //需要2s,如果将8改成10,则只需要1s  
  5.         CompletionService<Boolean> cs = new ExecutorCompletionService<Boolean>(pool);  
  6.         Callable<Boolean> task = new Callable<Boolean>(){  
  7.             public Boolean call(){  
  8.                 try {  
  9.                     Thread.sleep(1000);  
  10.                     System.out.println("插入1000条数据完成");  
  11.                 } catch (InterruptedException e) {  
  12.                     e.printStackTrace();  
  13.                 }  
  14.                 return true;  
  15.             };  
  16.         };  
  17.         System.out.println(getNow()+"--开始插入数据");  
  18.         for(int i=0;i<10;i++){  
  19.             cs.submit(task);              
  20.         }  
  21.         for(int i=0;i<10;i++){  
  22.             try {  
  23.                 //ExecutorCompletionService.take()方法是阻塞的,如果当前没有完成的任务则阻塞  
  24.                 System.out.println(cs.take().get());  
  25.                 //实际使用时,take()方法获取的结果可用于处理,如果插入失败,则可以进行重试或记录等操作  
  26.             } catch (InterruptedException|ExecutionException e) {  
  27.                 e.printStackTrace();  
  28.             }  
  29.         }  
  30.         System.out.println(getNow()+"--插入数据完成");  
  31.         pool.shutdown();  
  32.     }  
  33.   
  34.     public static String getNow(){  
  35.         SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");  
  36.         return sdf.format(new Date());  
  37.     }  
  38. }  

5.CompletionService与Callable<V>+Future的对比:
在上面的Callable中说过,Callable+Future能实现任务的分治,但是有个问题就是:不知道call()什么时候完成,需要人为控制等待。
而jdk通过CompetionService已经将此麻烦简化,通过CompletionService将异步任务完成的与未完成的区分开来(正如api的描述),我们只用去取即可。
CompletionService有什么好处呢?
如上所说:1)将已完成的任务和未完成的任务分开了,无需开发者操心;2)隐藏了Future类,简化了代码的使用。真想点个赞!

6.CountDownLatch

1.CountDownLatch:api解释:一个同步辅助类,在完成一组正在其他线程中执行的操作之前,它允许一个或多个线程一直等待。个人理解是CountDownLatch让可以让一组线程同时执行,然后在这组线程全部执行完前,可以让另一个线程等待。
就好像跑步比赛,10个选手依次就位,哨声响才同时出发;所有选手都通过终点,才能公布成绩。那么CountDownLatch就可以控制10个选手同时出发,和公布成绩的时间。
CountDownLatch 是一个通用同步工具,它有很多用途。将计数 1 初始化的 CountDownLatch 用作一个简单的开/关锁存器,或入口:在通过调用 countDown() 的线程打开入口前,所有调用 await 的线程都一直在入口处等待。用 N 初始化的 CountDownLatch 可以使一个线程在 N 个线程完成某项操作之前一直等待,或者使其在某项操作完成 N 次之前一直等待。 
CountDownLatch startSignal = new CountDownLatch(1);
CountDownLatch doneSignal = new CountDownLatch(N);


代码示例可参考api的示例。(重要)
2.代码示例:

参考链接中的示例:http://blog.csdn.net/xsl1990/article/details/18564097

个人示例:

[java] view plain copy
  1. public class CountDownLatchTest {  
  2.     private static SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");  
  3.     public static void main(String[] args) {  
  4.         final CountDownLatch start = new CountDownLatch(1); //用一个信号控制一组线程的开始,初始化为1  
  5.         final CountDownLatch end = new CountDownLatch(10);  //要等待N个线程的结束,初始化为N,这里是10  
  6.         Runnable r = new Runnable(){  
  7.             public void run(){  
  8.                 try {  
  9.                     start.await();  //阻塞,这样start.countDown()到0,所有阻塞在start.await()处的线程一起执行  
  10.                     Thread.sleep((long) (Math.random()*10000));  
  11.                     System.out.println(getNow()+"--"+Thread.currentThread().getName()+"--执行完成");  
  12.                     end.countDown();//非阻塞,每个线程执行完,让end--,这样10个线程执行完end倒数到0,主线程的end.await()就可以继续执行  
  13.                 } catch (InterruptedException e) {  
  14.                     e.printStackTrace();  
  15.                 }  
  16.             }  
  17.         };  
  18.         for(int i=0;i<10;i++){  
  19.             new Thread(r).start();  //虽然开始了10个线程,但所有线程都阻塞在start.await()处  
  20.         }  
  21.         System.out.println(getNow()+"--线程全部启动完毕,休眠3s再让10个线程一起执行");  
  22.         try {  
  23.             Thread.sleep(3*1000);  
  24.         } catch (InterruptedException e) {  
  25.             e.printStackTrace();  
  26.         }  
  27.         System.out.println(getNow()+"--开始");  
  28.         start.countDown();  //start初始值为1,countDown()变成0,触发10个线程一起执行  
  29.         try {  
  30.             end.await();        //阻塞,等10个线程都执行完了才继续往下。  
  31.         } catch (InterruptedException e) {  
  32.             e.printStackTrace();  
  33.         }  
  34.         System.out.println(getNow()+"--10个线程都执行完了,主线程继续往下执行!");  
  35.     }  
  36.     private static String getNow(){  
  37.         return sdf.format(new Date());  
  38.     }  
  39. }  

7.CyclicBarrier

1.一个同步辅助类,它允许一组线程互相等待,直到到达某个公共屏障点。也就是说,这一组线程的执行分几个节点,每个节点往下执行,都需等待其他线程,这就需要这种等待具有循环性。CyclicBarrier在这样的情况下就很有用。
2.CyclicBarrier与CountDownLacth的区别:
1)CountDownLacth用于一个线程与一组线程之间的相互等待。常用的就是一个主线程与一组分治线程之间的等待:主线程发号令,一组线程同时执行;一组线程依次执行完,再唤醒主线程继续执行;
CyclicBarrier用于一组线程执行时,每个线程执行有多个节点,每个节点的处理需要相互等待。如:对5个文件进行处理,按行将各个文件数字挑出来合并成一行,排序,并输出到另一个文件,那每次处理都需要等待5个线程读入下一行。(api示例可供参考)
2)CountDownLacth的处理机制是:初始化一个值N(相当于一组线程有N个),每个线程调用一次countDown(),那么cdLatch减1,等所有线程都调用过countDown(),那么cdLatch值达到0,那么线程从await()处接着玩下执行。
CyclicBarrier的处理机制是:初始化一个值N(相当于一组线程有N个),每个线程调用一次await(),那么barrier加1,等所有线程都调用过await(),那么barrier值达到初始值N,所有线程接着往下执行,并将barrier值重置为0,再次循环下一个屏障;
3)由2)可以知道,CountDownLatch只可以使用一次,而CyclicBarrier是可以循环使用的。
3.个人用于理解的示例:

[java] view plain copy
  1. public class CyclicBarrierTest {  
  2.     private static final CyclicBarrier barrier = new CyclicBarrier(5,  
  3.             new Runnable(){  
  4.                 public void run(){  //每次线程到达屏障点,此方法都会执行  
  5.                     System.out.println("\n--------barrier action--------\n");  
  6.                 }  
  7.             });  
  8.     public static void main(String[] args) {  
  9.         for(int i=0;i<5;i++){  
  10.             new Thread(new CyclicBarrierTest().new Worker()).start();  
  11.         }  
  12.     }  
  13.     class Worker implements Runnable{  
  14.         public void run(){  
  15.             try {  
  16.                 System.out.println(Thread.currentThread().getName()+"--第一阶段");  
  17.                 Thread.sleep(getRl());  
  18.                 barrier.await();    //每一次await()都会阻塞,等5个线程都执行到这一步(相当于barrier++操作,加到初始化值5),才继续往下执行  
  19.                 System.out.println(Thread.currentThread().getName()+"--第二阶段");  
  20.                 Thread.sleep(getRl());  
  21.                 barrier.await();    //每一次5个线程都到达共同的屏障节点,会执行barrier初始化参数中定义的Runnable.run()  
  22.                 System.out.println(Thread.currentThread().getName()+"--第三阶段");  
  23.                 Thread.sleep(getRl());  
  24.                 barrier.await();  
  25.                 System.out.println(Thread.currentThread().getName()+"--第四阶段");  
  26.                 Thread.sleep(getRl());  
  27.                 barrier.await();  
  28.                 System.out.println(Thread.currentThread().getName()+"--第五阶段");  
  29.                 Thread.sleep(getRl());  
  30.                 barrier.await();  
  31.                 System.out.println(Thread.currentThread().getName()+"--结束");  
  32.             } catch (InterruptedException | BrokenBarrierException e) {  
  33.                 e.printStackTrace();  
  34.             }  
  35.         }  
  36.     }  
  37.     public static long getRl(){  
  38.         return Math.round(10000);  
  39.     }  
  40. }  

4.参考api的示例。
api的示例自己看,就是加深印象。
但是api中有一点描述:如果屏障操作在执行时不依赖于正挂起的线程,则线程组中的任何线程在获得释放时都能执行该操作。为方便此操作,每次调用 await() 都将返回能到达屏障处的线程的索引。然后,您可以选择哪个线程应该执行屏障操作,例如: 
[java] view plain copy
  1. if (barrier.await() == 0) {  
  2. <span style="white-space:pre">    </span> // log the completion of this iteration  
  3. }  
就是说,barrier.await()还会返回一个int值。这个返回值到底是什么呢?不是返回的线程的索引,返回的是:N-进入等待线程数,如5个线程,5线程都进入等待,那返回值就是0(具体可以参看源码)。那么barrier.await()==0也可以看做是一个N线程都达到公共屏障的信号,然后在此条件下处理原本需要放在Runnable参数中的逻辑。不用担心多线程会多次执行此逻辑,N个线程只有一个线程barrier.await()==0。

8.Exchanger

1.Exchanger可以在对中对元素进行配对和交换的线程的同步点。api上不是太好理解,个人理解说白了就是两个线程交换各自使用的指定内存数据。
2.场景:
api中有示例,两个线程A、B,各自有一个数据类型相同的变量a、b,A线程往a中填数据(生产),B线程从b中取数据(消费)。具体如何让a、b在内存发生关联,就由Exchanger完成。
api中说:Exchanger 可能被视为 SynchronousQueue 的双向形式。怎么理解呢?传统的SynchronousQueue存取需要同步,就是A放入需要等待B取出,B取出需要等待A放入,在时间上要同步进行。而Exchanger在B取出的时候,A是同步在放入的。即:1)A放入a,a满,然后与B交换内存,那A就可以操作b(b空),而B可以操作a;2)等b被A存满,a被B用完,再交换;3)那A又填充a,B又消费b,依次循环。两个内存在一定程度上是同时被操作的,在时间上不需要同步。
再理解就是:如果生产需要5s,消费需要5s。SynchronousQueue一次存取需要10s,而Exchanger只需要5s。4.注意事项:
目前只知道Exchanger只能发生在两个线程之间。但实际上Exchanger的源码是有多个插槽(Slot),交换是通过线程ID的hash值来定位的。目前还没搞懂?待后续。
如果一组线程aGroup操作a内存,一组线程bGroup操作b内存,如何交换?能不能交换?
3.代码示例:

[java] view plain copy
  1. public class ExchangerTest {  
  2.     private SimpleDateFormat sdf = new SimpleDateFormat("mm:ss");  
  3.     private static Exchanger<Queue<Integer>> changer = new Exchanger<Queue<Integer>>();  
  4.   
  5.     public static void main(String[] args) {  
  6.         new Thread(new ExchangerTest().new ProducerLoop()).start();  
  7.         new Thread(new ExchangerTest().new ConsumerLoop()).start();  
  8.     }  
  9.     class ProducerLoop implements Runnable{  
  10.         private Queue<Integer> pBuffer = new LinkedBlockingQueue<Integer>();  
  11.         private final int maxnum = 10;  
  12.   
  13.         @Override  
  14.         public void run() {  
  15.             try{  
  16.                 for(;;){  
  17.                     Thread.sleep(500);  
  18.                     pBuffer.offer((int) Math.round(Math.random()*100));  
  19.                     if(pBuffer.size() == maxnum){  
  20.                         System.out.println(getNow()+"--producer交换前");  
  21.                         pBuffer = changer.exchange(pBuffer);  
  22.                         System.out.println(getNow()+"--producer交换后");  
  23.                     }  
  24.                 }  
  25.             }catch(Exception e){  
  26.                 e.printStackTrace();  
  27.             }  
  28.         }  
  29.     }     
  30.     class ConsumerLoop implements Runnable{  
  31.         private Queue<Integer> cBuffer = new LinkedBlockingQueue<Integer>();  
  32.           
  33.         @Override  
  34.         public void run() {  
  35.             try{  
  36.                 for(;;){  
  37.                     if(cBuffer.size() == 0){  
  38.                         System.out.println("\n"+getNow()+"--consumer交换前");  
  39.                         cBuffer = changer.exchange(cBuffer);  
  40.                         System.out.println(getNow()+"--consumer交换后");  
  41.                     }  
  42.                     System.out.print(cBuffer.poll()+" ");  
  43.                     Thread.sleep(500);  
  44.                 }  
  45.             }catch(Exception e){  
  46.                 e.printStackTrace();  
  47.             }  
  48.         }  
  49.     }     
  50.     private String getNow(){  
  51.         return sdf.format(new Date());  
  52.     }  
  53. }  
4.注意事项:
目前只知道Exchanger只能发生在两个线程之间。但实际上Exchanger的源码是有多个插槽(Slot),交换是通过线程ID的hash值来定位的。目前还没搞懂?待后续。
如果一组线程aGroup操作a内存,一组线程bGroup操作b内存,如何交换?能不能交换?

9.Phaser

Phaser是jdk1.7的新特性。其功能类似CyclicBarrier和CountDownLatch,但其功能更灵活,更强大,支持动态调整需要控制的线程数。不重复了。参考链接:

http://whitesock.iteye.com/blog/1135457