Java并发---java.util.concurrent包下的一些组件

时间:2022-11-08 17:36:40

java.util.concurrent

JDK1.5引入了java.util.concurrent包,里边很有多有用的组件,我们挑选一些来学习

  1. CountDownLatch
  2. CyclicBarrier
  3. BlockingQueue
    3.1 ArrayBlockingQueue
    3.2 DelayQueue
    3.3 LinkedBlockingQueue
    3.4 PriorityBlockingQueue
    3.5 SynchronousQueue
  4. Semaphore
  5. Exchanger

CountDownLatch

CountDownLatch就是一个线程等待其它线程完成各自工作后再执行。CountDownLatch被设计为只触发一次,计数值不能被重置。如果要重置可以使用CyclicBarrier。使用CountDownLatch的两个方法分别是await和countDown。下面来看一段实例代码

package concurrency.zxx;

import java.util.Random;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CountDownLatchTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        CountDownLatch cdOrder = new CountDownLatch(1);
        CountDownLatch cdAnswer = new CountDownLatch(3);
        
        for (int i = 0; i < 3; i++) {
            Runnable runnable = new Runnable() {
                public void run() {
                    try {
                        System.out.println("线程" + Thread.currentThread().getName() + 
                                "正在准备接收命令");
                        cdOrder.await();   // 阻塞直到cdOrder调用countDown为0
                        System.out.println("线程" + Thread.currentThread().getName() +
                                "已接收命令");
                        TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                        System.out.println("线程" + Thread.currentThread().getName() + 
                                "线程回应处理结束");
                        cdAnswer.countDown();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            executor.execute(runnable);
        }
        
        try {
            TimeUnit.SECONDS.sleep(new Random().nextInt(5));
            System.out.println("线程" + Thread.currentThread().getName() + 
                    "即将发送命令");
            cdOrder.countDown(); 
            System.out.println("线程" + Thread.currentThread().getName() + 
                    "发送命令");
            cdAnswer.await();   // 阻塞
            System.out.println("线程" + Thread.currentThread().getName() +
                    "已收到所有响应结果");
        } catch (Exception e) {
            e.printStackTrace();
        }
        
        executor.shutdown();
    }
}

CyclicBarrier

CyclicBarrier适用于这样的情况:你希望创建一组任务,它们并行地执行工作,然后在下一个步骤之前等待,直至所有任务都完成。

package concurrency.zxx;

import java.util.Random;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

public class CyclicBarrierTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        CyclicBarrier cb = new CyclicBarrier(3);
        for (int i = 0; i < 3; i++) {
            Runnable runnable = new Runnable() {
                public void run() {
                    try {
                        Random random = new Random();
                        TimeUnit.SECONDS.sleep(random.nextInt(10));
                        System.out.println("线程" + Thread.currentThread().getName() + 
                                "即将到达集合地点1,当前已有" + (cb.getNumberWaiting()+1) + 
                                "个到达," + (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊" : "正在等候"));
                        cb.await();
                        
                        TimeUnit.SECONDS.sleep(random.nextInt(10));
                        System.out.println("线程" + Thread.currentThread().getName() + 
                                "即将到达集合地点2,当前已有" + (cb.getNumberWaiting()+1) + 
                                "个到达," + (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊" : "正在等候"));
                        cb.await();
                        
                        TimeUnit.SECONDS.sleep(random.nextInt(10));
                        System.out.println("线程" + Thread.currentThread().getName() + 
                                "即将到达集合地点3,当前已有" + (cb.getNumberWaiting()+1) + 
                                "个到达," + (cb.getNumberWaiting() == 2 ? "都到齐了,继续走啊" : "正在等候"));
                        cb.await();
                        
                        
                        
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                }
            };
            executor.execute(runnable);
        }
        executor.shutdown();
    }
}

再来看一个代码,节选自《Java编程思想》

package concurrency.thinking;

import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

class Horse implements Runnable {
    private static int counter = 0;
    private final int id = counter++;
    private int strides = 0;
    private static Random rand = new Random(47);
    private static CyclicBarrier barrier;
    public Horse(CyclicBarrier b) {
        barrier = b;
    }
    public synchronized int getStrides() {
        return strides;
    }
    public void run() {
        try {
            while(!Thread.interrupted()) {
                synchronized(this) {
                    strides += rand.nextInt(3);
                }
                barrier.await();
            }
        } catch(InterruptedException e) {
            
        } catch(BrokenBarrierException e) {
            throw new RuntimeException(e);
        }
    }
    public String toString() {
        return "Horse " + id + " ";
    }
    public String tracks() {
        StringBuilder s = new StringBuilder();
        for(int i = 0; i < getStrides(); i++) {
            s.append("*");
        }
        s.append(id);
        return s.toString();
    }
}


public class HorseRace {
    static final int FINISH_LINE = 75;
    private List<Horse> horses = new ArrayList<Horse>();
    private ExecutorService exec = Executors.newCachedThreadPool();
    private CyclicBarrier barrier;
    public HorseRace(int nHorses, final int pause) {
        barrier = new CyclicBarrier(nHorses, new Runnable() { // 可以向CyclicBarrier提供一个Runnable, 当技术之到达0时自动执行
            public void run() {
                StringBuilder s = new StringBuilder();
                for(int i = 0; i < FINISH_LINE; i++) {
                    s.append("=");
                }
                System.out.println(s);
                for (Horse horse : horses) {
                    System.out.println(horse.tracks());
                }
                for (Horse horse : horses) {
                    if (horse.getStrides() >= FINISH_LINE) {
                        System.out.println(horse + "won!");
                        exec.shutdownNow();
                        return;
                    }
                }
                try {
                    TimeUnit.MILLISECONDS.sleep(pause);
                } catch (InterruptedException e) {
                    System.out.println("barrier-action sleep interrupted");
                }
            }
        });
        for (int i = 0; i < nHorses; i++) {
            Horse horse = new Horse(barrier);
            horses.add(horse);
            exec.execute(horse);
        }
    }
    
    public static void main(String[] args) {
        int nHorses = 7;
        int pause = 200;
        
        new HorseRace(nHorses, pause);
    }
}

BlockingQueue

BlockingQueue当队列中没有数据的情况下,消费者端的所有线程都会被自动阻塞(挂起),直到有数据放入队列。当队列中填满数据的情况下,生产者端的所有线程都会被自动阻塞(挂起),直到队列中有空的位置,线程被自动唤醒。
BlockingQueue只是java.util.concurrent包中的一个接口,而在具体使用时,我们用到的是它的实现类,当然这些实现类也位于java.util.concurrent包中。BlockingQueue的实现类主要有以下几种:

  1. ArrayBlockingQueue
  2. LinkedBlockingQueue
  3. DelayQueue
  4. PriorityBlockingQueue
  5. SynchronousQueue

ArrayBlockingQueue

基于数组的阻塞队列实现,在ArrayBlockingQueue内部,维护了一个定长数组,以便缓存队列中的数据对象,这是一个常用的阻塞队列,除了一个定长数组外,ArrayBlockingQueue内部还保存着两个整形变量,分别标识着队列的头部和尾部在数组中的位置。
ArrayBlockingQueue在生产者放入数据和消费者获取数据,都是共用同一个锁对象,由此也意味着两者无法真正并行运行,这点尤其不同于LinkedBlockingQueue;按照实现原理来分析,ArrayBlockingQueue完全可以采用分离锁,从而实现生产者和消费者操作的完全并行运行。Doug Lea之所以没这样去做,也许是因为ArrayBlockingQueue的数据写入和获取操作已经足够轻巧,以至于引入独立的锁机制,除了给代码带来额外的复杂性外,其在性能上完全占不到任何便宜。 ArrayBlockingQueue和LinkedBlockingQueue间还有一个明显的不同之处在于,前者在插入或删除元素时不会产生或销毁任何额外的对象实例,而后者则会生成一个额外的Node对象。这在长时间内需要高效并发地处理大批量数据的系统中,其对于GC的影响还是存在一定的区别。而在创建ArrayBlockingQueue时,我们还可以控制对象的内部锁是否采用公平锁,默认采用非公平锁。

使用方法可以参见上一篇博文, 使用ArrayBlockingQueue实现生产者和消费者

LinkedBlockingQueue

基于链表的阻塞队列,同ArrayListBlockingQueue类似,其内部也维持着一个数据缓冲队列(该队列由一个链表构成),当生产者往队列中放入一个数据时,队列会从生产者手中获取数据,并缓存在队列内部,而生产者立即返回;只有当队列缓冲区达到最大值缓存容量时(LinkedBlockingQueue可以通过构造函数指定该值),才会阻塞生产者队列,直到消费者从队列中消费掉一份数据,生产者线程会被唤醒,反之对于消费者这端的处理也基于同样的原理。而LinkedBlockingQueue之所以能够高效的处理并发数据,还因为其对于生产者端和消费者端分别采用了独立的锁来控制数据同步,这也意味着在高并发的情况下生产者和消费者可以并行地操作队列中的数据,以此来提高整个队列的并发性能。
作为开发者,我们需要注意的是,如果构造一个LinkedBlockingQueue对象,而没有指定其容量大小,LinkedBlockingQueue会默认一个类似无限大小的容量(Integer.MAX_VALUE),这样的话,如果生产者的速度一旦大于消费者的速度,也许还没有等到队列满阻塞产生,系统内存就有可能已被消耗殆尽了。

DelayQueue

这是一个*的BlockingQueue,用于放置实现了Delayed接口的对象,其中的对象只能在其到期时才能从队列中取走。这种队列是有序的,即队头对象的延迟到期时间最长。常见的例子是用它来管理一个超时未响应的连接队列

PriorityBlockingQueue

基于优先级的阻塞队列(优先级的判断通过构造函数传入的Compator对象来决定),但需要注意的是PriorityBlockingQueue并不会阻塞数据生产者,而只会在没有可消费的数据时,阻塞数据的消费者。因此使用的时候要特别注意,生产者生产数据的速度绝对不能快于消费者消费数据的速度,否则时间一长,会最终耗尽所有的可用堆内存空间。在实现PriorityBlockingQueue时,内部控制线程同步的锁采用的是公平锁。

SynchronousQueue

一种无缓冲的等待队列,类似于无中介的直接交易,有点像原始社会中的生产者和消费者,生产者拿着产品去集市销售给产品的最终消费者,而消费者必须亲自去集市找到所要商品的直接生产者,如果一方没有找到合适的目标,那么对不起,大家都在集市等待。相对于有缓冲的BlockingQueue来说,少了一个中间经销商的环节(缓冲区),如果有经销商,生产者直接把产品批发给经销商,而无需在意经销商最终会将这些产品卖给那些消费者,由于经销商可以库存一部分商品,因此相对于直接交易模式,总体来说采用中间经销商的模式会吞吐量高一些(可以批量买卖);但另一方面,又因为经销商的引入,使得产品从生产者到消费者中间增加了额外的交易环节,单个产品的及时响应性能可能会降低。
声明一个SynchronousQueue有两种不同的方式,它们之间有着不太一样的行为。公平模式和非公平模式的区别:

  • 如果采用公平模式:SynchronousQueue会采用公平锁,并配合一个FIFO队列来阻塞多余的生产者和消费者,从而体系整体的公平策略;
  • 但如果是非公平模式(SynchronousQueue默认):SynchronousQueue采用非公平锁,同时配合一个LIFO队列来管理多余的生产者和消费者,如果生产者和消费者的处理速度有差距,则很容易出现饥渴的情况,即可能有某些生产者或者是消费者的数据永远都得不到处理。

Semaphore

正常的锁在任何时刻都只允许一个任务访问一项资源,而Semaphore信号量允许n个任务同时访问这个资源。感觉就是操作系统里的信号量的pv操作

package concurrency.zxx;

import java.util.Random;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;

public class SemaphoreTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Semaphore semaphore = new Semaphore(3);
        for (int i = 0; i < 10; i++) {
            Runnable runnable = new Runnable() {
                @Override
                public void run() {
                    try {
                        semaphore.acquire();
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    
                    System.out.println("线程" + Thread.currentThread().getName() +
                            "进入,当前已有" + (3 - semaphore.availablePermits()));
                    
                    try {
                        TimeUnit.SECONDS.sleep(new Random().nextInt(5));
                    } catch (InterruptedException e) {
                        e.printStackTrace();
                    }
                    
                    System.out.println("线程" + Thread.currentThread().getName() +
                            "即将离开");
                    
                    semaphore.release();
                    
                    System.out.println("线程" + Thread.currentThread().getName() +
                            "离开,当前已有" + (3 - semaphore.availablePermits()));
                    
                }
            };
            executor.execute(runnable);
        }
        executor.shutdown();
    }
}

Exchanger

Exchanger是在两个任务之间交换对象的栅栏。当这些任务进入栅栏时,它们各自拥有一个对象,当它们离开时,它们都拥有之前对象持有的对象。它的典型应用场景是:一个任务在创建对象,这些对象的生产代价很高昂,而另一个对象在消费这些对象。

package concurrency.zxx;

import java.util.concurrent.Exchanger;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;

public class ExchangerTest {
    public static void main(String[] args) {
        ExecutorService executor = Executors.newCachedThreadPool();
        Exchanger<String> exchanger = new Exchanger<>();

        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String data1 = "hello";
                    System.out.println("线程" + Thread.currentThread().getName() + 
                            "要交换的数据为" + data1);
                    String data2 = exchanger.exchange(data1);
                    System.out.println("线程" + Thread.currentThread().getName() + 
                            "得到的数据为" + data2);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        
        executor.execute(new Runnable() {
            @Override
            public void run() {
                try {
                    String data1 = "word";
                    System.out.println("线程" + Thread.currentThread().getName() + 
                            "要交换的数据为" + data1);
                    String data2 = exchanger.exchange(data1);
                    System.out.println("线程" + Thread.currentThread().getName() + 
                            "得到的数据为" + data2);
                } catch (Exception e) {
                    e.printStackTrace();
                }
            }
        });
        
        executor.shutdown();
    }
}

参考资料

http://blog.csdn.net/suifeng3051/article/details/48807423
《张孝祥Java并发》
《Java编程思想》