java并发包工具(java.util.Concurrent)

时间:2022-08-15 17:37:02

一、CyclicBarrier

作用:所有线程准备好才进行,只要一条线程没准备好,都不进行

用法:所有线程准备好以后调用CyclicBarrier的await方法,然后主线程执行CyclicBarrier的countDown方法

实现需求:n个运动员(n个线程),全部准备好了才一起起跑。代码如下

 1 package concurrent019;
 2 import java.io.IOException;  
 3 import java.util.Random;  
 4 import java.util.concurrent.BrokenBarrierException;  
 5 import java.util.concurrent.CyclicBarrier;  
 6 import java.util.concurrent.ExecutorService;  
 7 import java.util.concurrent.Executors; 
 8 public class UseCyclicBarrier {
 9 
10     static class Runner implements Runnable {  
11         private CyclicBarrier barrier;  
12         private String name;  
13         
14         public Runner(CyclicBarrier barrier, String name) {  
15             this.barrier = barrier;  
16             this.name = name;  
17         }  
18         @Override  
19         public void run() {  
20             try {  
21                 Thread.sleep(1000 * (new Random()).nextInt(5));  
22                 System.out.println(name + " 准备OK.");  
23                 barrier.await();  
24             } catch (InterruptedException e) {  
25                 e.printStackTrace();  
26             } catch (BrokenBarrierException e) {  
27                 e.printStackTrace();  
28             }  
29             System.out.println(name + " Go!!");  
30         }  
31     } 
32     
33     public static void main(String[] args) throws IOException, InterruptedException {  
34         CyclicBarrier barrier = new CyclicBarrier(3);  // 3 
35         ExecutorService executor = Executors.newFixedThreadPool(3);  
36         
37         executor.submit(new Thread(new Runner(barrier, "zhangsan")));  
38         executor.submit(new Thread(new Runner(barrier, "lisi")));  
39         executor.submit(new Thread(new Runner(barrier, "wangwu")));  
40   
41         executor.shutdown();  
42     }  
43   
44 }  

 

 

 

二、CountDownLacth

作用:监听某些初始化操作,等初始化执行完毕以后,通知主线程继续工作(例如zk的初始化)

用法:将需要别的线程来通知的线程调用CountDownLacth的await方法,其他线程执行了之后,允许这个线程执行的时候,执行CountDownLacth的countDown方法

实现需求:启动3个线程,1线程等待2和3线程执行完毕以后再执行,代码如下:

 1 package concurrent019;
 2 
 3 import java.util.concurrent.CountDownLatch;
 4 
 5 public class UseCountDownLatch {
 6 
 7     public static void main(String[] args) {
 8         
 9         final CountDownLatch countDown = new CountDownLatch(2);
10         
11         Thread t1 = new Thread(new Runnable() {
12             @Override
13             public void run() {
14                 try {
15                     System.out.println("进入线程t1" + "等待其他线程处理完成...");
16                     countDown.await();
17                     System.out.println("t1线程继续执行...");
18                 } catch (InterruptedException e) {
19                     e.printStackTrace();
20                 }
21             }
22         },"t1");
23         
24         Thread t2 = new Thread(new Runnable() {
25             @Override
26             public void run() {
27                 try {
28                     System.out.println("t2线程进行初始化操作...");
29                     Thread.sleep(3000);
30                     System.out.println("t2线程初始化完毕,通知t1线程继续...");
31                     countDown.countDown();
32                 } catch (InterruptedException e) {
33                     e.printStackTrace();
34                 }
35             }
36         });
37         Thread t3 = new Thread(new Runnable() {
38             @Override
39             public void run() {
40                 try {
41                     System.out.println("t3线程进行初始化操作...");
42                     Thread.sleep(4000);
43                     System.out.println("t3线程初始化完毕,通知t1线程继续...");
44                     countDown.countDown();
45                 } catch (InterruptedException e) {
46                     e.printStackTrace();
47                 }
48             }
49         });
50         
51         t1.start();
52         t2.start();
53         t3.start();
54         
55     }
56 }

 

ps:CyclicBarrier和CountDownLacth的区别

CyclicBarrier是所有线程等待一个线程的信号,CountDownLacth是一个线程等待所有线程的信号

 

三、Callable、Future

Future实现的就是前面讲的Future模式.

实现功能:主线程在执行的过程中,调用了一个或者多个耗时的操作,需要把这个耗时的操作提到另几个线程操作(这几个耗时的操作可以并行执行)

代码实现:

 1 package concurrent019;
 2 
 3 import java.util.concurrent.Callable;
 4 import java.util.concurrent.ExecutorService;
 5 import java.util.concurrent.Executors;
 6 import java.util.concurrent.FutureTask;
 7 
 8 public class UseFuture implements Callable<String>{
 9     private String para;
10     
11     public UseFuture(String para){
12         this.para = para;
13     }
14     
15     /**
16      * 这里是真实的业务逻辑,其执行可能很慢
17      */
18     @Override
19     public String call() throws Exception {
20         //模拟执行耗时
21         Thread.sleep(3000);
22         String result = this.para + "处理完成";
23         return result;
24     }
25     
26     //主控制函数
27     public static void main(String[] args) throws Exception {
28         String queryStr = "query";
29         //构造FutureTask,并且传入需要真正进行业务逻辑处理的类,该类一定是实现了Callable接口的类
30         FutureTask<String> future = new FutureTask<String>(new UseFuture(queryStr));
31         //创建一个固定线程的线程池且线程数为1,
32         ExecutorService executor = Executors.newFixedThreadPool(1);
33         //这里提交任务future,则开启线程执行RealData的call()方法执行
34         executor.submit(future);
35         System.out.println("请求完毕");
36         try {
37             //这里可以做额外的数据操作,也就是主程序执行其他业务逻辑
38             Thread.sleep(2000);
39         } catch (Exception e) {
40             e.printStackTrace();
41         }
42         //调用获取数据方法,如果call()方法没有执行完成,则依然会进行等待
43         System.out.println("数据:" + future.get()); // 其实这块应该开启线程处理
44         executor.shutdown();
45     }
46 
47 }

 

四、Semaphore(信号量)

Semaphore是计数信号量。Semaphore管理一系列许可证。每个acquire方法阻塞,直到有一个许可证可以获得然后拿走一个许可证;每个release方法增加一个许可证,这可能会释放一个阻塞的acquire方法。然而,其实并没有实际的许可证这个对象,Semaphore只是维持了一个可获得许可证的数量。(用于限流)

实现需求:总共有20条线程,但是最多只能放5条线程进来执行

代码实例:

 1 package concurrent019;
 2 
 3 import java.util.concurrent.ExecutorService;  
 4 import java.util.concurrent.Executors;  
 5 import java.util.concurrent.Semaphore;  
 6   
 7 public class UseSemaphore {  
 8   
 9     public static void main(String[] args) {  
10         // 线程池  
11         ExecutorService exec = Executors.newCachedThreadPool();  
12         // 只能5个线程同时访问  
13         final Semaphore semp = new Semaphore(5);  
14         // 模拟20个客户端访问  
15         for (int index = 0; index < 20; index++) {  
16             final int NO = index;  
17             Runnable run = new Runnable() {  
18                 public void run() {  
19                     try {  
20                         // 获取许可  
21                         semp.acquire();  
22                         System.out.println("Accessing: " + NO);  
23                         //模拟实际业务逻辑
24                         Thread.sleep((long) (Math.random() * 10000));  
25                         // 访问完后,释放  
26                         semp.release();  
27                     } catch (InterruptedException e) {  
28                     }  
29                 }  
30             };  
31             exec.execute(run);  
32         } 
33         
34         try {
35             Thread.sleep(10);
36         } catch (InterruptedException e) {
37             e.printStackTrace();
38         }
39         
40         //System.out.println(semp.getQueueLength());
41         
42         
43         
44         // 退出线程池  
45         exec.shutdown();  
46     }  
47   
48 }